A vibe coded tangled fork which supports pijul.
at 7a94f57f787deaec9b6c50d5672a34f077cee116 203 lines 5.1 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "tangled.org/core/knotmirror/config" 16 "tangled.org/core/knotmirror/db" 17 "tangled.org/core/knotmirror/models" 18 "tangled.org/core/log" 19) 20 21type Resyncer struct { 22 logger *slog.Logger 23 db *sql.DB 24 gitm GitMirrorManager 25 26 claimJobMu sync.Mutex 27 28 repoFetchTimeout time.Duration 29 30 parallelism int 31} 32 33func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer { 34 return &Resyncer{ 35 logger: log.SubLogger(l, "resyncer"), 36 db: db, 37 gitm: gitm, 38 39 repoFetchTimeout: cfg.GitRepoFetchTimeout, 40 parallelism: cfg.ResyncParallelism, 41 } 42} 43 44func (r *Resyncer) Start(ctx context.Context) { 45 for i := 0; i < r.parallelism; i++ { 46 go r.runResyncWorker(ctx, i) 47 } 48} 49 50func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 51 l := r.logger.With("worker", workerID) 52 for { 53 select { 54 case <-ctx.Done(): 55 l.Info("resync worker shutting down", "error", ctx.Err()) 56 return 57 default: 58 } 59 repoAt, found, err := r.claimResyncJob(ctx) 60 if err != nil { 61 l.Error("failed to claim resync job", "error", err) 62 time.Sleep(time.Second) 63 continue 64 } 65 if !found { 66 time.Sleep(time.Second) 67 continue 68 } 69 l.Info("processing resync", "aturi", repoAt) 70 if err := r.resyncRepo(ctx, repoAt); err != nil { 71 l.Error("resync failed", "aturi", repoAt, "error", err) 72 } 73 } 74} 75 76func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 77 // use mutex to prevent duplicated jobs 78 r.claimJobMu.Lock() 79 defer r.claimJobMu.Unlock() 80 81 var repoAt syntax.ATURI 82 now := time.Now().Unix() 83 if err := r.db.QueryRowContext(ctx, 84 `update repos 85 set state = $1 86 where at_uri = ( 87 select at_uri from repos 88 where state in ($2, $3, $4) 89 and (retry_after = 0 or retry_after < $5) 90 limit 1 91 ) 92 returning at_uri 93 `, 94 models.RepoStateResyncing, 95 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 96 now, 97 ).Scan(&repoAt); err != nil { 98 if errors.Is(err, sql.ErrNoRows) { 99 return "", false, nil 100 } 101 return "", false, err 102 } 103 104 return repoAt, true, nil 105} 106 107func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 108 // ctx, span := tracer.Start(ctx, "resyncRepo") 109 // span.SetAttributes(attribute.String("aturi", repoAt)) 110 // defer span.End() 111 112 resyncsStarted.Inc() 113 startTime := time.Now() 114 115 success, err := r.doResync(ctx, repoAt) 116 if !success { 117 resyncsFailed.Inc() 118 resyncDuration.Observe(time.Since(startTime).Seconds()) 119 return r.handleResyncFailure(ctx, repoAt, err) 120 } 121 122 resyncsCompleted.Inc() 123 resyncDuration.Observe(time.Since(startTime).Seconds()) 124 return nil 125} 126 127func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 128 // ctx, span := tracer.Start(ctx, "doResync") 129 // span.SetAttributes(attribute.String("aturi", repoAt)) 130 // defer span.End() 131 132 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 133 if err != nil { 134 return false, fmt.Errorf("failed to get repo: %w", err) 135 } 136 if repo == nil { // untracked repo, skip 137 return false, nil 138 } 139 140 // TODO: check if Knot is on backoff list. If so, return (false, nil) 141 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 142 143 fetchCtx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 144 defer cancel() 145 146 if err := r.gitm.Sync(fetchCtx, repo); err != nil { 147 return false, err 148 } 149 150 // repo.GitRev = <processed git.refUpdate revision> 151 // repo.RepoSha = <sha256 sum of git refs> 152 repo.State = models.RepoStateActive 153 repo.ErrorMsg = "" 154 repo.RetryCount = 0 155 repo.RetryAfter = 0 156 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 157 return false, fmt.Errorf("updating repo state to active %w", err) 158 } 159 return true, nil 160} 161 162func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 163 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 164 var state models.RepoState 165 var errMsg string 166 if err == nil { 167 state = models.RepoStateDesynchronized 168 errMsg = "" 169 } else { 170 state = models.RepoStateError 171 errMsg = err.Error() 172 } 173 174 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 175 if err != nil { 176 return fmt.Errorf("failed to get repo: %w", err) 177 } 178 if repo == nil { 179 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 180 } 181 182 // start a 1 min & go up to 1 hr between retries 183 var retryCount = repo.RetryCount + 1 184 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 185 186 // remove null bytes 187 errMsg = strings.ReplaceAll(errMsg, "\x00", "") 188 189 repo.State = state 190 repo.ErrorMsg = errMsg 191 repo.RetryCount = retryCount 192 repo.RetryAfter = retryAfter 193 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 194 return fmt.Errorf("failed to update repo state: %w", err) 195 } 196 return err 197} 198 199func backoff(retries int, max int) time.Duration { 200 dur := min(1<<retries, max) 201 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 202 return time.Second*time.Duration(dur) + jitter 203}