A vibe coded tangled fork which supports pijul.
at sl/knotmirror 336 lines 8.6 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "net/http" 11 "net/url" 12 "strings" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "tangled.org/core/knotmirror/config" 18 "tangled.org/core/knotmirror/db" 19 "tangled.org/core/knotmirror/models" 20 "tangled.org/core/log" 21) 22 23type Resyncer struct { 24 logger *slog.Logger 25 db *sql.DB 26 gitm GitMirrorManager 27 28 claimJobMu sync.Mutex 29 30 runningJobs map[syntax.ATURI]context.CancelFunc 31 runningJobsMu sync.Mutex 32 33 repoFetchTimeout time.Duration 34 manualResyncTimeout time.Duration 35 parallelism int 36 37 knotBackoff map[string]time.Time 38 knotBackoffMu sync.RWMutex 39} 40 41func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer { 42 return &Resyncer{ 43 logger: log.SubLogger(l, "resyncer"), 44 db: db, 45 gitm: gitm, 46 47 runningJobs: make(map[syntax.ATURI]context.CancelFunc), 48 49 repoFetchTimeout: cfg.GitRepoFetchTimeout, 50 manualResyncTimeout: 30 * time.Minute, 51 parallelism: cfg.ResyncParallelism, 52 53 knotBackoff: make(map[string]time.Time), 54 } 55} 56 57func (r *Resyncer) Start(ctx context.Context) { 58 for i := 0; i < r.parallelism; i++ { 59 go r.runResyncWorker(ctx, i) 60 } 61} 62 63func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 64 l := r.logger.With("worker", workerID) 65 for { 66 select { 67 case <-ctx.Done(): 68 l.Info("resync worker shutting down", "error", ctx.Err()) 69 return 70 default: 71 } 72 repoAt, found, err := r.claimResyncJob(ctx) 73 if err != nil { 74 l.Error("failed to claim resync job", "error", err) 75 time.Sleep(time.Second) 76 continue 77 } 78 if !found { 79 time.Sleep(time.Second) 80 continue 81 } 82 l.Info("processing resync", "aturi", repoAt) 83 if err := r.resyncRepo(ctx, repoAt); err != nil { 84 l.Error("resync failed", "aturi", repoAt, "error", err) 85 } 86 } 87} 88 89func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) { 90 r.runningJobsMu.Lock() 91 defer r.runningJobsMu.Unlock() 92 93 if _, exists := r.runningJobs[repo]; exists { 94 return 95 } 96 r.runningJobs[repo] = cancel 97} 98 99func (r *Resyncer) unregisterRunning(repo syntax.ATURI) { 100 r.runningJobsMu.Lock() 101 defer r.runningJobsMu.Unlock() 102 103 delete(r.runningJobs, repo) 104} 105 106func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) { 107 r.runningJobsMu.Lock() 108 defer r.runningJobsMu.Unlock() 109 110 cancel, ok := r.runningJobs[repo] 111 if !ok { 112 return 113 } 114 delete(r.runningJobs, repo) 115 cancel() 116} 117 118// TriggerResyncJob manually triggers the resync job 119func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error { 120 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 121 if err != nil { 122 return fmt.Errorf("failed to get repo: %w", err) 123 } 124 if repo == nil { 125 return fmt.Errorf("repo not found: %s", repoAt) 126 } 127 128 if repo.State == models.RepoStateResyncing { 129 return fmt.Errorf("repo already resyncing") 130 } 131 132 repo.State = models.RepoStatePending 133 repo.RetryAfter = -1 // resyncer will prioritize this 134 135 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 136 return fmt.Errorf("updating repo state to pending %w", err) 137 } 138 return nil 139} 140 141func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 142 // use mutex to prevent duplicated jobs 143 r.claimJobMu.Lock() 144 defer r.claimJobMu.Unlock() 145 146 var repoAt syntax.ATURI 147 now := time.Now().Unix() 148 if err := r.db.QueryRowContext(ctx, 149 `update repos 150 set state = $1 151 where at_uri = ( 152 select at_uri from repos 153 where state in ($2, $3, $4) 154 and (retry_after = -1 or retry_after = 0 or retry_after < $5) 155 order by 156 (retry_after = -1) desc, 157 (retry_after = 0) desc, 158 retry_after 159 limit 1 160 ) 161 returning at_uri 162 `, 163 models.RepoStateResyncing, 164 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 165 now, 166 ).Scan(&repoAt); err != nil { 167 if errors.Is(err, sql.ErrNoRows) { 168 return "", false, nil 169 } 170 return "", false, err 171 } 172 173 return repoAt, true, nil 174} 175 176func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 177 // ctx, span := tracer.Start(ctx, "resyncRepo") 178 // span.SetAttributes(attribute.String("aturi", repoAt)) 179 // defer span.End() 180 181 resyncsStarted.Inc() 182 startTime := time.Now() 183 184 jobCtx, cancel := context.WithCancel(ctx) 185 r.registerRunning(repoAt, cancel) 186 defer r.unregisterRunning(repoAt) 187 188 success, err := r.doResync(jobCtx, repoAt) 189 if !success { 190 resyncsFailed.Inc() 191 resyncDuration.Observe(time.Since(startTime).Seconds()) 192 return r.handleResyncFailure(ctx, repoAt, err) 193 } 194 195 resyncsCompleted.Inc() 196 resyncDuration.Observe(time.Since(startTime).Seconds()) 197 return nil 198} 199 200func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 201 // ctx, span := tracer.Start(ctx, "doResync") 202 // span.SetAttributes(attribute.String("aturi", repoAt)) 203 // defer span.End() 204 205 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 206 if err != nil { 207 return false, fmt.Errorf("failed to get repo: %w", err) 208 } 209 if repo == nil { // untracked repo, skip 210 return false, nil 211 } 212 213 r.knotBackoffMu.RLock() 214 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain] 215 r.knotBackoffMu.RUnlock() 216 if inBackoff && time.Now().Before(backoffUntil) { 217 return false, nil 218 } 219 220 // HACK: check knot reachability with short timeout before running actual fetch. 221 // This is crucial as git-cli doesn't support http connection timeout. 222 // `http.lowSpeedTime` is only applied _after_ the connection. 223 if err := r.checkKnotReachability(ctx, repo); err != nil { 224 return false, fmt.Errorf("knot unreachable: %w", err) 225 } 226 227 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 228 // we can use http statuscode for that. 229 230 timeout := r.repoFetchTimeout 231 if repo.RetryAfter == -1 { 232 timeout = r.manualResyncTimeout 233 } 234 fetchCtx, cancel := context.WithTimeout(ctx, timeout) 235 defer cancel() 236 237 if err := r.gitm.Sync(fetchCtx, repo); err != nil { 238 return false, err 239 } 240 241 // repo.GitRev = <processed git.refUpdate revision> 242 // repo.RepoSha = <sha256 sum of git refs> 243 repo.State = models.RepoStateActive 244 repo.ErrorMsg = "" 245 repo.RetryCount = 0 246 repo.RetryAfter = 0 247 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 248 return false, fmt.Errorf("updating repo state to active %w", err) 249 } 250 return true, nil 251} 252 253// checkKnotReachability checks if Knot is reachable and is valid git remote server 254func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error { 255 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), true) 256 if err != nil { 257 return err 258 } 259 260 repoUrl += "/info/refs?service=git-upload-pack" 261 262 client := http.Client{ 263 Timeout: 30 * time.Second, 264 } 265 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil) 266 if err != nil { 267 return err 268 } 269 req.Header.Set("User-Agent", "git/2.x") 270 req.Header.Set("Accept", "*/*") 271 272 resp, err := client.Do(req) 273 if err != nil { 274 var uerr *url.Error 275 if errors.As(err, &uerr) { 276 return fmt.Errorf("request failed: %w", uerr.Unwrap()) 277 } 278 return fmt.Errorf("request failed: %w", err) 279 } 280 defer resp.Body.Close() 281 282 if resp.StatusCode != http.StatusOK { 283 return fmt.Errorf("unexpected status: %s", resp.Status) 284 } 285 286 // check if target is git server 287 ct := resp.Header.Get("Content-Type") 288 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") { 289 return fmt.Errorf("unexpected content-type: %s", ct) 290 } 291 292 return nil 293} 294 295func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 296 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 297 var state models.RepoState 298 var errMsg string 299 if err == nil { 300 state = models.RepoStateDesynchronized 301 errMsg = "" 302 } else { 303 state = models.RepoStateError 304 errMsg = err.Error() 305 } 306 307 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 308 if err != nil { 309 return fmt.Errorf("failed to get repo: %w", err) 310 } 311 if repo == nil { 312 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 313 } 314 315 // start a 1 min & go up to 1 hr between retries 316 var retryCount = repo.RetryCount + 1 317 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 318 319 // remove null bytes 320 errMsg = strings.ReplaceAll(errMsg, "\x00", "") 321 322 repo.State = state 323 repo.ErrorMsg = errMsg 324 repo.RetryCount = retryCount 325 repo.RetryAfter = retryAfter 326 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 327 return fmt.Errorf("failed to update repo state: %w", err) 328 } 329 return err 330} 331 332func backoff(retries int, max int) time.Duration { 333 dur := min(1<<retries, max) 334 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 335 return time.Second*time.Duration(dur) + jitter 336}