A vibe coded tangled fork which supports pijul.
at master 360 lines 9.2 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "net/http" 11 "net/url" 12 "strings" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "tangled.org/core/knotmirror/config" 18 "tangled.org/core/knotmirror/db" 19 "tangled.org/core/knotmirror/models" 20 "tangled.org/core/log" 21) 22 23type Resyncer struct { 24 logger *slog.Logger 25 db *sql.DB 26 gitm GitMirrorManager 27 cfg *config.Config 28 29 claimJobMu sync.Mutex 30 31 runningJobs map[syntax.ATURI]context.CancelFunc 32 runningJobsMu sync.Mutex 33 34 repoFetchTimeout time.Duration 35 manualResyncTimeout time.Duration 36 parallelism int 37 38 knotBackoff map[string]time.Time 39 knotBackoffMu sync.RWMutex 40} 41 42func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer { 43 return &Resyncer{ 44 logger: log.SubLogger(l, "resyncer"), 45 db: db, 46 gitm: gitm, 47 cfg: cfg, 48 49 runningJobs: make(map[syntax.ATURI]context.CancelFunc), 50 51 repoFetchTimeout: cfg.GitRepoFetchTimeout, 52 manualResyncTimeout: 30 * time.Minute, 53 parallelism: cfg.ResyncParallelism, 54 55 knotBackoff: make(map[string]time.Time), 56 } 57} 58 59func (r *Resyncer) Start(ctx context.Context) { 60 for i := 0; i < r.parallelism; i++ { 61 go r.runResyncWorker(ctx, i) 62 } 63} 64 65func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 66 l := r.logger.With("worker", workerID) 67 for { 68 select { 69 case <-ctx.Done(): 70 l.Info("resync worker shutting down", "error", ctx.Err()) 71 return 72 default: 73 } 74 repoAt, found, err := r.claimResyncJob(ctx) 75 if err != nil { 76 l.Error("failed to claim resync job", "error", err) 77 time.Sleep(time.Second) 78 continue 79 } 80 if !found { 81 time.Sleep(time.Second) 82 continue 83 } 84 l.Info("processing resync", "aturi", repoAt) 85 if err := r.resyncRepo(ctx, repoAt); err != nil { 86 l.Error("resync failed", "aturi", repoAt, "error", err) 87 } 88 } 89} 90 91func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) { 92 r.runningJobsMu.Lock() 93 defer r.runningJobsMu.Unlock() 94 95 if _, exists := r.runningJobs[repo]; exists { 96 return 97 } 98 r.runningJobs[repo] = cancel 99} 100 101func (r *Resyncer) unregisterRunning(repo syntax.ATURI) { 102 r.runningJobsMu.Lock() 103 defer r.runningJobsMu.Unlock() 104 105 delete(r.runningJobs, repo) 106} 107 108func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) { 109 r.runningJobsMu.Lock() 110 defer r.runningJobsMu.Unlock() 111 112 cancel, ok := r.runningJobs[repo] 113 if !ok { 114 return 115 } 116 delete(r.runningJobs, repo) 117 cancel() 118} 119 120// TriggerResyncJob manually triggers the resync job 121func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error { 122 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 123 if err != nil { 124 return fmt.Errorf("failed to get repo: %w", err) 125 } 126 if repo == nil { 127 return fmt.Errorf("repo not found: %s", repoAt) 128 } 129 130 if repo.State == models.RepoStateResyncing { 131 return fmt.Errorf("repo already resyncing") 132 } 133 134 repo.State = models.RepoStatePending 135 repo.RetryAfter = -1 // resyncer will prioritize this 136 137 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 138 return fmt.Errorf("updating repo state to pending %w", err) 139 } 140 return nil 141} 142 143func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 144 // use mutex to prevent duplicated jobs 145 r.claimJobMu.Lock() 146 defer r.claimJobMu.Unlock() 147 148 var repoAt syntax.ATURI 149 now := time.Now().Unix() 150 if err := r.db.QueryRowContext(ctx, 151 `update repos 152 set state = $1 153 where at_uri = ( 154 select at_uri from repos 155 where state in ($2, $3, $4) 156 and (retry_after = -1 or retry_after = 0 or retry_after < $5) 157 order by 158 (retry_after = -1) desc, 159 (retry_after = 0) desc, 160 retry_after 161 limit 1 162 ) 163 returning at_uri 164 `, 165 models.RepoStateResyncing, 166 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 167 now, 168 ).Scan(&repoAt); err != nil { 169 if errors.Is(err, sql.ErrNoRows) { 170 return "", false, nil 171 } 172 return "", false, err 173 } 174 175 return repoAt, true, nil 176} 177 178func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 179 // ctx, span := tracer.Start(ctx, "resyncRepo") 180 // span.SetAttributes(attribute.String("aturi", repoAt)) 181 // defer span.End() 182 183 resyncsStarted.Inc() 184 startTime := time.Now() 185 186 jobCtx, cancel := context.WithCancel(ctx) 187 r.registerRunning(repoAt, cancel) 188 defer r.unregisterRunning(repoAt) 189 190 success, err := r.doResync(jobCtx, repoAt) 191 if !success { 192 resyncsFailed.Inc() 193 resyncDuration.Observe(time.Since(startTime).Seconds()) 194 return r.handleResyncFailure(ctx, repoAt, err) 195 } 196 197 resyncsCompleted.Inc() 198 resyncDuration.Observe(time.Since(startTime).Seconds()) 199 return nil 200} 201 202func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 203 // ctx, span := tracer.Start(ctx, "doResync") 204 // span.SetAttributes(attribute.String("aturi", repoAt)) 205 // defer span.End() 206 207 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 208 if err != nil { 209 return false, fmt.Errorf("failed to get repo: %w", err) 210 } 211 if repo == nil { // untracked repo, skip 212 return false, nil 213 } 214 215 r.knotBackoffMu.RLock() 216 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain] 217 r.knotBackoffMu.RUnlock() 218 if inBackoff && time.Now().Before(backoffUntil) { 219 return false, nil 220 } 221 222 // HACK: check knot reachability with short timeout before running actual fetch. 223 // This is crucial as git-cli doesn't support http connection timeout. 224 // `http.lowSpeedTime` is only applied _after_ the connection. 225 if err := r.checkKnotReachability(ctx, repo); err != nil { 226 if isRateLimitError(err) { 227 r.knotBackoffMu.Lock() 228 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second) 229 r.knotBackoffMu.Unlock() 230 return false, nil 231 } 232 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online 233 return false, fmt.Errorf("knot unreachable: %w", err) 234 } 235 236 timeout := r.repoFetchTimeout 237 if repo.RetryAfter == -1 { 238 timeout = r.manualResyncTimeout 239 } 240 fetchCtx, cancel := context.WithTimeout(ctx, timeout) 241 defer cancel() 242 243 if err := r.gitm.Sync(fetchCtx, repo); err != nil { 244 return false, err 245 } 246 247 // repo.GitRev = <processed git.refUpdate revision> 248 // repo.RepoSha = <sha256 sum of git refs> 249 repo.State = models.RepoStateActive 250 repo.ErrorMsg = "" 251 repo.RetryCount = 0 252 repo.RetryAfter = 0 253 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 254 return false, fmt.Errorf("updating repo state to active %w", err) 255 } 256 return true, nil 257} 258 259type knotStatusError struct { 260 StatusCode int 261} 262 263func (ke *knotStatusError) Error() string { 264 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode) 265} 266 267func isRateLimitError(err error) bool { 268 var knotErr *knotStatusError 269 if errors.As(err, &knotErr) { 270 return knotErr.StatusCode == http.StatusTooManyRequests 271 } 272 return false 273} 274 275// checkKnotReachability checks if Knot is reachable and is valid git remote server 276func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error { 277 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), r.cfg.KnotUseSSL) 278 if err != nil { 279 return err 280 } 281 282 repoUrl += "/info/refs?service=git-upload-pack" 283 284 r.logger.Debug("checking knot reachability", "url", repoUrl) 285 286 client := http.Client{ 287 Timeout: 30 * time.Second, 288 } 289 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil) 290 if err != nil { 291 return err 292 } 293 req.Header.Set("User-Agent", "git/2.x") 294 req.Header.Set("Accept", "*/*") 295 296 resp, err := client.Do(req) 297 if err != nil { 298 var uerr *url.Error 299 if errors.As(err, &uerr) { 300 return fmt.Errorf("request failed: %w", uerr.Unwrap()) 301 } 302 return fmt.Errorf("request failed: %w", err) 303 } 304 defer resp.Body.Close() 305 306 if resp.StatusCode != http.StatusOK { 307 return &knotStatusError{resp.StatusCode} 308 } 309 310 // check if target is git server 311 ct := resp.Header.Get("Content-Type") 312 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") { 313 return fmt.Errorf("unexpected content-type: %s", ct) 314 } 315 316 return nil 317} 318 319func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 320 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 321 var state models.RepoState 322 var errMsg string 323 if err == nil { 324 state = models.RepoStateDesynchronized 325 errMsg = "" 326 } else { 327 state = models.RepoStateError 328 errMsg = err.Error() 329 } 330 331 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 332 if err != nil { 333 return fmt.Errorf("failed to get repo: %w", err) 334 } 335 if repo == nil { 336 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 337 } 338 339 // start a 1 min & go up to 1 hr between retries 340 var retryCount = repo.RetryCount + 1 341 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 342 343 // remove null bytes 344 errMsg = strings.ReplaceAll(errMsg, "\x00", "") 345 346 repo.State = state 347 repo.ErrorMsg = errMsg 348 repo.RetryCount = retryCount 349 repo.RetryAfter = retryAfter 350 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 351 return fmt.Errorf("failed to update repo state: %w", err) 352 } 353 return nil 354} 355 356func backoff(retries int, max int) time.Duration { 357 dur := min(1<<retries, max) 358 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 359 return time.Second*time.Duration(dur) + jitter 360}