A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "net/http"
11 "net/url"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "tangled.org/core/knotmirror/config"
18 "tangled.org/core/knotmirror/db"
19 "tangled.org/core/knotmirror/models"
20 "tangled.org/core/log"
21)
22
23type Resyncer struct {
24 logger *slog.Logger
25 db *sql.DB
26 gitm GitMirrorManager
27
28 claimJobMu sync.Mutex
29
30 runningJobs map[syntax.ATURI]context.CancelFunc
31 runningJobsMu sync.Mutex
32
33 repoFetchTimeout time.Duration
34 manualResyncTimeout time.Duration
35 parallelism int
36
37 knotBackoff map[string]time.Time
38 knotBackoffMu sync.RWMutex
39}
40
41func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer {
42 return &Resyncer{
43 logger: log.SubLogger(l, "resyncer"),
44 db: db,
45 gitm: gitm,
46
47 runningJobs: make(map[syntax.ATURI]context.CancelFunc),
48
49 repoFetchTimeout: cfg.GitRepoFetchTimeout,
50 manualResyncTimeout: 30 * time.Minute,
51 parallelism: cfg.ResyncParallelism,
52
53 knotBackoff: make(map[string]time.Time),
54 }
55}
56
57func (r *Resyncer) Start(ctx context.Context) {
58 for i := 0; i < r.parallelism; i++ {
59 go r.runResyncWorker(ctx, i)
60 }
61}
62
63func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
64 l := r.logger.With("worker", workerID)
65 for {
66 select {
67 case <-ctx.Done():
68 l.Info("resync worker shutting down", "error", ctx.Err())
69 return
70 default:
71 }
72 repoAt, found, err := r.claimResyncJob(ctx)
73 if err != nil {
74 l.Error("failed to claim resync job", "error", err)
75 time.Sleep(time.Second)
76 continue
77 }
78 if !found {
79 time.Sleep(time.Second)
80 continue
81 }
82 l.Info("processing resync", "aturi", repoAt)
83 if err := r.resyncRepo(ctx, repoAt); err != nil {
84 l.Error("resync failed", "aturi", repoAt, "error", err)
85 }
86 }
87}
88
89func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) {
90 r.runningJobsMu.Lock()
91 defer r.runningJobsMu.Unlock()
92
93 if _, exists := r.runningJobs[repo]; exists {
94 return
95 }
96 r.runningJobs[repo] = cancel
97}
98
99func (r *Resyncer) unregisterRunning(repo syntax.ATURI) {
100 r.runningJobsMu.Lock()
101 defer r.runningJobsMu.Unlock()
102
103 delete(r.runningJobs, repo)
104}
105
106func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) {
107 r.runningJobsMu.Lock()
108 defer r.runningJobsMu.Unlock()
109
110 cancel, ok := r.runningJobs[repo]
111 if !ok {
112 return
113 }
114 delete(r.runningJobs, repo)
115 cancel()
116}
117
118// TriggerResyncJob manually triggers the resync job
119func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error {
120 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
121 if err != nil {
122 return fmt.Errorf("failed to get repo: %w", err)
123 }
124 if repo == nil {
125 return fmt.Errorf("repo not found: %s", repoAt)
126 }
127
128 if repo.State == models.RepoStateResyncing {
129 return fmt.Errorf("repo already resyncing")
130 }
131
132 repo.State = models.RepoStatePending
133 repo.RetryAfter = -1 // resyncer will prioritize this
134
135 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
136 return fmt.Errorf("updating repo state to pending %w", err)
137 }
138 return nil
139}
140
141func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) {
142 // use mutex to prevent duplicated jobs
143 r.claimJobMu.Lock()
144 defer r.claimJobMu.Unlock()
145
146 var repoAt syntax.ATURI
147 now := time.Now().Unix()
148 if err := r.db.QueryRowContext(ctx,
149 `update repos
150 set state = $1
151 where at_uri = (
152 select at_uri from repos
153 where state in ($2, $3, $4)
154 and (retry_after = -1 or retry_after = 0 or retry_after < $5)
155 order by
156 (retry_after = -1) desc,
157 (retry_after = 0) desc,
158 retry_after
159 limit 1
160 )
161 returning at_uri
162 `,
163 models.RepoStateResyncing,
164 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
165 now,
166 ).Scan(&repoAt); err != nil {
167 if errors.Is(err, sql.ErrNoRows) {
168 return "", false, nil
169 }
170 return "", false, err
171 }
172
173 return repoAt, true, nil
174}
175
176func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error {
177 // ctx, span := tracer.Start(ctx, "resyncRepo")
178 // span.SetAttributes(attribute.String("aturi", repoAt))
179 // defer span.End()
180
181 resyncsStarted.Inc()
182 startTime := time.Now()
183
184 jobCtx, cancel := context.WithCancel(ctx)
185 r.registerRunning(repoAt, cancel)
186 defer r.unregisterRunning(repoAt)
187
188 success, err := r.doResync(jobCtx, repoAt)
189 if !success {
190 resyncsFailed.Inc()
191 resyncDuration.Observe(time.Since(startTime).Seconds())
192 return r.handleResyncFailure(ctx, repoAt, err)
193 }
194
195 resyncsCompleted.Inc()
196 resyncDuration.Observe(time.Since(startTime).Seconds())
197 return nil
198}
199
200func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) {
201 // ctx, span := tracer.Start(ctx, "doResync")
202 // span.SetAttributes(attribute.String("aturi", repoAt))
203 // defer span.End()
204
205 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
206 if err != nil {
207 return false, fmt.Errorf("failed to get repo: %w", err)
208 }
209 if repo == nil { // untracked repo, skip
210 return false, nil
211 }
212
213 r.knotBackoffMu.RLock()
214 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain]
215 r.knotBackoffMu.RUnlock()
216 if inBackoff && time.Now().Before(backoffUntil) {
217 return false, nil
218 }
219
220 // HACK: check knot reachability with short timeout before running actual fetch.
221 // This is crucial as git-cli doesn't support http connection timeout.
222 // `http.lowSpeedTime` is only applied _after_ the connection.
223 if err := r.checkKnotReachability(ctx, repo); err != nil {
224 return false, fmt.Errorf("knot unreachable: %w", err)
225 }
226
227 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list
228 // we can use http statuscode for that.
229
230 timeout := r.repoFetchTimeout
231 if repo.RetryAfter == -1 {
232 timeout = r.manualResyncTimeout
233 }
234 fetchCtx, cancel := context.WithTimeout(ctx, timeout)
235 defer cancel()
236
237 if err := r.gitm.Sync(fetchCtx, repo); err != nil {
238 return false, err
239 }
240
241 // repo.GitRev = <processed git.refUpdate revision>
242 // repo.RepoSha = <sha256 sum of git refs>
243 repo.State = models.RepoStateActive
244 repo.ErrorMsg = ""
245 repo.RetryCount = 0
246 repo.RetryAfter = 0
247 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
248 return false, fmt.Errorf("updating repo state to active %w", err)
249 }
250 return true, nil
251}
252
253// checkKnotReachability checks if Knot is reachable and is valid git remote server
254func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error {
255 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), true)
256 if err != nil {
257 return err
258 }
259
260 repoUrl += "/info/refs?service=git-upload-pack"
261
262 client := http.Client{
263 Timeout: 30 * time.Second,
264 }
265 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil)
266 if err != nil {
267 return err
268 }
269 req.Header.Set("User-Agent", "git/2.x")
270 req.Header.Set("Accept", "*/*")
271
272 resp, err := client.Do(req)
273 if err != nil {
274 var uerr *url.Error
275 if errors.As(err, &uerr) {
276 return fmt.Errorf("request failed: %w", uerr.Unwrap())
277 }
278 return fmt.Errorf("request failed: %w", err)
279 }
280 defer resp.Body.Close()
281
282 if resp.StatusCode != http.StatusOK {
283 return fmt.Errorf("unexpected status: %s", resp.Status)
284 }
285
286 // check if target is git server
287 ct := resp.Header.Get("Content-Type")
288 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") {
289 return fmt.Errorf("unexpected content-type: %s", ct)
290 }
291
292 return nil
293}
294
295func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error {
296 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err)
297 var state models.RepoState
298 var errMsg string
299 if err == nil {
300 state = models.RepoStateDesynchronized
301 errMsg = ""
302 } else {
303 state = models.RepoStateError
304 errMsg = err.Error()
305 }
306
307 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
308 if err != nil {
309 return fmt.Errorf("failed to get repo: %w", err)
310 }
311 if repo == nil {
312 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt)
313 }
314
315 // start a 1 min & go up to 1 hr between retries
316 var retryCount = repo.RetryCount + 1
317 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
318
319 // remove null bytes
320 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
321
322 repo.State = state
323 repo.ErrorMsg = errMsg
324 repo.RetryCount = retryCount
325 repo.RetryAfter = retryAfter
326 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil {
327 return fmt.Errorf("failed to update repo state: %w", err)
328 }
329 return err
330}
331
332func backoff(retries int, max int) time.Duration {
333 dur := min(1<<retries, max)
334 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
335 return time.Second*time.Duration(dur) + jitter
336}