A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "net/http"
11 "net/url"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "tangled.org/core/knotmirror/config"
18 "tangled.org/core/knotmirror/db"
19 "tangled.org/core/knotmirror/models"
20 "tangled.org/core/log"
21)
22
23type Resyncer struct {
24 logger *slog.Logger
25 db *sql.DB
26 gitm GitMirrorManager
27 cfg *config.Config
28
29 claimJobMu sync.Mutex
30
31 runningJobs map[syntax.ATURI]context.CancelFunc
32 runningJobsMu sync.Mutex
33
34 repoFetchTimeout time.Duration
35 manualResyncTimeout time.Duration
36 parallelism int
37
38 knotBackoff map[string]time.Time
39 knotBackoffMu sync.RWMutex
40}
41
42func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer {
43 return &Resyncer{
44 logger: log.SubLogger(l, "resyncer"),
45 db: db,
46 gitm: gitm,
47 cfg: cfg,
48
49 runningJobs: make(map[syntax.ATURI]context.CancelFunc),
50
51 repoFetchTimeout: cfg.GitRepoFetchTimeout,
52 manualResyncTimeout: 30 * time.Minute,
53 parallelism: cfg.ResyncParallelism,
54
55 knotBackoff: make(map[string]time.Time),
56 }
57}
58
59func (r *Resyncer) Start(ctx context.Context) {
60 for i := 0; i < r.parallelism; i++ {
61 go r.runResyncWorker(ctx, i)
62 }
63}
64
65func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
66 l := r.logger.With("worker", workerID)
67 for {
68 select {
69 case <-ctx.Done():
70 l.Info("resync worker shutting down", "error", ctx.Err())
71 return
72 default:
73 }
74 repoAt, found, err := r.claimResyncJob(ctx)
75 if err != nil {
76 l.Error("failed to claim resync job", "error", err)
77 time.Sleep(time.Second)
78 continue
79 }
80 if !found {
81 time.Sleep(time.Second)
82 continue
83 }
84 l.Info("processing resync", "aturi", repoAt)
85 if err := r.resyncRepo(ctx, repoAt); err != nil {
86 l.Error("resync failed", "aturi", repoAt, "error", err)
87 }
88 }
89}
90
91func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) {
92 r.runningJobsMu.Lock()
93 defer r.runningJobsMu.Unlock()
94
95 if _, exists := r.runningJobs[repo]; exists {
96 return
97 }
98 r.runningJobs[repo] = cancel
99}
100
101func (r *Resyncer) unregisterRunning(repo syntax.ATURI) {
102 r.runningJobsMu.Lock()
103 defer r.runningJobsMu.Unlock()
104
105 delete(r.runningJobs, repo)
106}
107
108func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) {
109 r.runningJobsMu.Lock()
110 defer r.runningJobsMu.Unlock()
111
112 cancel, ok := r.runningJobs[repo]
113 if !ok {
114 return
115 }
116 delete(r.runningJobs, repo)
117 cancel()
118}
119
120// TriggerResyncJob manually triggers the resync job
121func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error {
122 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
123 if err != nil {
124 return fmt.Errorf("failed to get repo: %w", err)
125 }
126 if repo == nil {
127 return fmt.Errorf("repo not found: %s", repoAt)
128 }
129
130 if repo.State == models.RepoStateResyncing {
131 return fmt.Errorf("repo already resyncing")
132 }
133
134 repo.State = models.RepoStatePending
135 repo.RetryAfter = -1 // resyncer will prioritize this
136
137 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
138 return fmt.Errorf("updating repo state to pending %w", err)
139 }
140 return nil
141}
142
143func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) {
144 // use mutex to prevent duplicated jobs
145 r.claimJobMu.Lock()
146 defer r.claimJobMu.Unlock()
147
148 var repoAt syntax.ATURI
149 now := time.Now().Unix()
150 if err := r.db.QueryRowContext(ctx,
151 `update repos
152 set state = $1
153 where at_uri = (
154 select at_uri from repos
155 where state in ($2, $3, $4)
156 and (retry_after = -1 or retry_after = 0 or retry_after < $5)
157 order by
158 (retry_after = -1) desc,
159 (retry_after = 0) desc,
160 retry_after
161 limit 1
162 )
163 returning at_uri
164 `,
165 models.RepoStateResyncing,
166 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
167 now,
168 ).Scan(&repoAt); err != nil {
169 if errors.Is(err, sql.ErrNoRows) {
170 return "", false, nil
171 }
172 return "", false, err
173 }
174
175 return repoAt, true, nil
176}
177
178func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error {
179 // ctx, span := tracer.Start(ctx, "resyncRepo")
180 // span.SetAttributes(attribute.String("aturi", repoAt))
181 // defer span.End()
182
183 resyncsStarted.Inc()
184 startTime := time.Now()
185
186 jobCtx, cancel := context.WithCancel(ctx)
187 r.registerRunning(repoAt, cancel)
188 defer r.unregisterRunning(repoAt)
189
190 success, err := r.doResync(jobCtx, repoAt)
191 if !success {
192 resyncsFailed.Inc()
193 resyncDuration.Observe(time.Since(startTime).Seconds())
194 return r.handleResyncFailure(ctx, repoAt, err)
195 }
196
197 resyncsCompleted.Inc()
198 resyncDuration.Observe(time.Since(startTime).Seconds())
199 return nil
200}
201
202func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) {
203 // ctx, span := tracer.Start(ctx, "doResync")
204 // span.SetAttributes(attribute.String("aturi", repoAt))
205 // defer span.End()
206
207 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
208 if err != nil {
209 return false, fmt.Errorf("failed to get repo: %w", err)
210 }
211 if repo == nil { // untracked repo, skip
212 return false, nil
213 }
214
215 r.knotBackoffMu.RLock()
216 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain]
217 r.knotBackoffMu.RUnlock()
218 if inBackoff && time.Now().Before(backoffUntil) {
219 return false, nil
220 }
221
222 // HACK: check knot reachability with short timeout before running actual fetch.
223 // This is crucial as git-cli doesn't support http connection timeout.
224 // `http.lowSpeedTime` is only applied _after_ the connection.
225 if err := r.checkKnotReachability(ctx, repo); err != nil {
226 if isRateLimitError(err) {
227 r.knotBackoffMu.Lock()
228 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second)
229 r.knotBackoffMu.Unlock()
230 return false, nil
231 }
232 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online
233 return false, fmt.Errorf("knot unreachable: %w", err)
234 }
235
236 timeout := r.repoFetchTimeout
237 if repo.RetryAfter == -1 {
238 timeout = r.manualResyncTimeout
239 }
240 fetchCtx, cancel := context.WithTimeout(ctx, timeout)
241 defer cancel()
242
243 if err := r.gitm.Sync(fetchCtx, repo); err != nil {
244 return false, err
245 }
246
247 // repo.GitRev = <processed git.refUpdate revision>
248 // repo.RepoSha = <sha256 sum of git refs>
249 repo.State = models.RepoStateActive
250 repo.ErrorMsg = ""
251 repo.RetryCount = 0
252 repo.RetryAfter = 0
253 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
254 return false, fmt.Errorf("updating repo state to active %w", err)
255 }
256 return true, nil
257}
258
259type knotStatusError struct {
260 StatusCode int
261}
262
263func (ke *knotStatusError) Error() string {
264 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode)
265}
266
267func isRateLimitError(err error) bool {
268 var knotErr *knotStatusError
269 if errors.As(err, &knotErr) {
270 return knotErr.StatusCode == http.StatusTooManyRequests
271 }
272 return false
273}
274
275// checkKnotReachability checks if Knot is reachable and is valid git remote server
276func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error {
277 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), r.cfg.KnotUseSSL)
278 if err != nil {
279 return err
280 }
281
282 repoUrl += "/info/refs?service=git-upload-pack"
283
284 r.logger.Debug("checking knot reachability", "url", repoUrl)
285
286 client := http.Client{
287 Timeout: 30 * time.Second,
288 }
289 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil)
290 if err != nil {
291 return err
292 }
293 req.Header.Set("User-Agent", "git/2.x")
294 req.Header.Set("Accept", "*/*")
295
296 resp, err := client.Do(req)
297 if err != nil {
298 var uerr *url.Error
299 if errors.As(err, &uerr) {
300 return fmt.Errorf("request failed: %w", uerr.Unwrap())
301 }
302 return fmt.Errorf("request failed: %w", err)
303 }
304 defer resp.Body.Close()
305
306 if resp.StatusCode != http.StatusOK {
307 return &knotStatusError{resp.StatusCode}
308 }
309
310 // check if target is git server
311 ct := resp.Header.Get("Content-Type")
312 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") {
313 return fmt.Errorf("unexpected content-type: %s", ct)
314 }
315
316 return nil
317}
318
319func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error {
320 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err)
321 var state models.RepoState
322 var errMsg string
323 if err == nil {
324 state = models.RepoStateDesynchronized
325 errMsg = ""
326 } else {
327 state = models.RepoStateError
328 errMsg = err.Error()
329 }
330
331 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
332 if err != nil {
333 return fmt.Errorf("failed to get repo: %w", err)
334 }
335 if repo == nil {
336 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt)
337 }
338
339 // start a 1 min & go up to 1 hr between retries
340 var retryCount = repo.RetryCount + 1
341 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
342
343 // remove null bytes
344 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
345
346 repo.State = state
347 repo.ErrorMsg = errMsg
348 repo.RetryCount = retryCount
349 repo.RetryAfter = retryAfter
350 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
351 return fmt.Errorf("failed to update repo state: %w", err)
352 }
353 return nil
354}
355
356func backoff(retries int, max int) time.Duration {
357 dur := min(1<<retries, max)
358 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
359 return time.Second*time.Duration(dur) + jitter
360}