A vibe coded tangled fork which supports pijul.
at 1237bf9f58e4ba5d13d5437f2f82a2078572e229 271 lines 6.8 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "tangled.org/core/knotmirror/config" 16 "tangled.org/core/knotmirror/db" 17 "tangled.org/core/knotmirror/models" 18 "tangled.org/core/log" 19) 20 21type Resyncer struct { 22 logger *slog.Logger 23 db *sql.DB 24 gitm GitMirrorManager 25 26 claimJobMu sync.Mutex 27 28 runningJobs map[syntax.ATURI]context.CancelFunc 29 runningJobsMu sync.Mutex 30 31 repoFetchTimeout time.Duration 32 manualResyncTimeout time.Duration 33 34 parallelism int 35} 36 37func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer { 38 return &Resyncer{ 39 logger: log.SubLogger(l, "resyncer"), 40 db: db, 41 gitm: gitm, 42 43 runningJobs: make(map[syntax.ATURI]context.CancelFunc), 44 45 repoFetchTimeout: cfg.GitRepoFetchTimeout, 46 parallelism: cfg.ResyncParallelism, 47 48 manualResyncTimeout: 30 * time.Minute, 49 } 50} 51 52func (r *Resyncer) Start(ctx context.Context) { 53 for i := 0; i < r.parallelism; i++ { 54 go r.runResyncWorker(ctx, i) 55 } 56} 57 58func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 59 l := r.logger.With("worker", workerID) 60 for { 61 select { 62 case <-ctx.Done(): 63 l.Info("resync worker shutting down", "error", ctx.Err()) 64 return 65 default: 66 } 67 repoAt, found, err := r.claimResyncJob(ctx) 68 if err != nil { 69 l.Error("failed to claim resync job", "error", err) 70 time.Sleep(time.Second) 71 continue 72 } 73 if !found { 74 time.Sleep(time.Second) 75 continue 76 } 77 l.Info("processing resync", "aturi", repoAt) 78 if err := r.resyncRepo(ctx, repoAt); err != nil { 79 l.Error("resync failed", "aturi", repoAt, "error", err) 80 } 81 } 82} 83 84func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) { 85 r.runningJobsMu.Lock() 86 defer r.runningJobsMu.Unlock() 87 88 if _, exists := r.runningJobs[repo]; exists { 89 return 90 } 91 r.runningJobs[repo] = cancel 92} 93 94func (r *Resyncer) unregisterRunning(repo syntax.ATURI) { 95 r.runningJobsMu.Lock() 96 defer r.runningJobsMu.Unlock() 97 98 delete(r.runningJobs, repo) 99} 100 101func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) { 102 r.runningJobsMu.Lock() 103 defer r.runningJobsMu.Unlock() 104 105 cancel, ok := r.runningJobs[repo] 106 if !ok { 107 return 108 } 109 delete(r.runningJobs, repo) 110 cancel() 111} 112 113// TriggerResyncJob manually triggers the resync job 114func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error { 115 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 116 if err != nil { 117 return fmt.Errorf("failed to get repo: %w", err) 118 } 119 if repo == nil { 120 return fmt.Errorf("repo not found: %s", repoAt) 121 } 122 123 if repo.State == models.RepoStateResyncing { 124 return fmt.Errorf("repo already resyncing") 125 } 126 127 repo.State = models.RepoStatePending 128 repo.RetryAfter = -1 // resyncer will prioritize this 129 130 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 131 return fmt.Errorf("updating repo state to pending %w", err) 132 } 133 return nil 134} 135 136func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 137 // use mutex to prevent duplicated jobs 138 r.claimJobMu.Lock() 139 defer r.claimJobMu.Unlock() 140 141 var repoAt syntax.ATURI 142 now := time.Now().Unix() 143 if err := r.db.QueryRowContext(ctx, 144 `update repos 145 set state = $1 146 where at_uri = ( 147 select at_uri from repos 148 where state in ($2, $3, $4) 149 and (retry_after = -1 or retry_after = 0 or retry_after < $5) 150 limit 1 151 ) 152 returning at_uri 153 `, 154 models.RepoStateResyncing, 155 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 156 now, 157 ).Scan(&repoAt); err != nil { 158 if errors.Is(err, sql.ErrNoRows) { 159 return "", false, nil 160 } 161 return "", false, err 162 } 163 164 return repoAt, true, nil 165} 166 167func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 168 // ctx, span := tracer.Start(ctx, "resyncRepo") 169 // span.SetAttributes(attribute.String("aturi", repoAt)) 170 // defer span.End() 171 172 resyncsStarted.Inc() 173 startTime := time.Now() 174 175 jobCtx, cancel := context.WithCancel(ctx) 176 r.registerRunning(repoAt, cancel) 177 defer r.unregisterRunning(repoAt) 178 179 success, err := r.doResync(jobCtx, repoAt) 180 if !success { 181 resyncsFailed.Inc() 182 resyncDuration.Observe(time.Since(startTime).Seconds()) 183 return r.handleResyncFailure(ctx, repoAt, err) 184 } 185 186 resyncsCompleted.Inc() 187 resyncDuration.Observe(time.Since(startTime).Seconds()) 188 return nil 189} 190 191func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 192 // ctx, span := tracer.Start(ctx, "doResync") 193 // span.SetAttributes(attribute.String("aturi", repoAt)) 194 // defer span.End() 195 196 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 197 if err != nil { 198 return false, fmt.Errorf("failed to get repo: %w", err) 199 } 200 if repo == nil { // untracked repo, skip 201 return false, nil 202 } 203 204 // TODO: check if Knot is on backoff list. If so, return (false, nil) 205 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 206 207 timeout := r.repoFetchTimeout 208 if repo.RetryAfter == -1 { 209 timeout = r.manualResyncTimeout 210 } 211 fetchCtx, cancel := context.WithTimeout(ctx, timeout) 212 defer cancel() 213 214 if err := r.gitm.Sync(fetchCtx, repo); err != nil { 215 return false, err 216 } 217 218 // repo.GitRev = <processed git.refUpdate revision> 219 // repo.RepoSha = <sha256 sum of git refs> 220 repo.State = models.RepoStateActive 221 repo.ErrorMsg = "" 222 repo.RetryCount = 0 223 repo.RetryAfter = 0 224 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 225 return false, fmt.Errorf("updating repo state to active %w", err) 226 } 227 return true, nil 228} 229 230func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 231 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 232 var state models.RepoState 233 var errMsg string 234 if err == nil { 235 state = models.RepoStateDesynchronized 236 errMsg = "" 237 } else { 238 state = models.RepoStateError 239 errMsg = err.Error() 240 } 241 242 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 243 if err != nil { 244 return fmt.Errorf("failed to get repo: %w", err) 245 } 246 if repo == nil { 247 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 248 } 249 250 // start a 1 min & go up to 1 hr between retries 251 var retryCount = repo.RetryCount + 1 252 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 253 254 // remove null bytes 255 errMsg = strings.ReplaceAll(errMsg, "\x00", "") 256 257 repo.State = state 258 repo.ErrorMsg = errMsg 259 repo.RetryCount = retryCount 260 repo.RetryAfter = retryAfter 261 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 262 return fmt.Errorf("failed to update repo state: %w", err) 263 } 264 return err 265} 266 267func backoff(retries int, max int) time.Duration { 268 dur := min(1<<retries, max) 269 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 270 return time.Second*time.Duration(dur) + jitter 271}