A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "tangled.org/core/knotmirror/config"
16 "tangled.org/core/knotmirror/db"
17 "tangled.org/core/knotmirror/models"
18 "tangled.org/core/log"
19)
20
21type Resyncer struct {
22 logger *slog.Logger
23 db *sql.DB
24 gitm GitMirrorManager
25
26 claimJobMu sync.Mutex
27
28 runningJobs map[syntax.ATURI]context.CancelFunc
29 runningJobsMu sync.Mutex
30
31 repoFetchTimeout time.Duration
32 manualResyncTimeout time.Duration
33
34 parallelism int
35}
36
37func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer {
38 return &Resyncer{
39 logger: log.SubLogger(l, "resyncer"),
40 db: db,
41 gitm: gitm,
42
43 runningJobs: make(map[syntax.ATURI]context.CancelFunc),
44
45 repoFetchTimeout: cfg.GitRepoFetchTimeout,
46 parallelism: cfg.ResyncParallelism,
47
48 manualResyncTimeout: 30 * time.Minute,
49 }
50}
51
52func (r *Resyncer) Start(ctx context.Context) {
53 for i := 0; i < r.parallelism; i++ {
54 go r.runResyncWorker(ctx, i)
55 }
56}
57
58func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
59 l := r.logger.With("worker", workerID)
60 for {
61 select {
62 case <-ctx.Done():
63 l.Info("resync worker shutting down", "error", ctx.Err())
64 return
65 default:
66 }
67 repoAt, found, err := r.claimResyncJob(ctx)
68 if err != nil {
69 l.Error("failed to claim resync job", "error", err)
70 time.Sleep(time.Second)
71 continue
72 }
73 if !found {
74 time.Sleep(time.Second)
75 continue
76 }
77 l.Info("processing resync", "aturi", repoAt)
78 if err := r.resyncRepo(ctx, repoAt); err != nil {
79 l.Error("resync failed", "aturi", repoAt, "error", err)
80 }
81 }
82}
83
84func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) {
85 r.runningJobsMu.Lock()
86 defer r.runningJobsMu.Unlock()
87
88 if _, exists := r.runningJobs[repo]; exists {
89 return
90 }
91 r.runningJobs[repo] = cancel
92}
93
94func (r *Resyncer) unregisterRunning(repo syntax.ATURI) {
95 r.runningJobsMu.Lock()
96 defer r.runningJobsMu.Unlock()
97
98 delete(r.runningJobs, repo)
99}
100
101func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) {
102 r.runningJobsMu.Lock()
103 defer r.runningJobsMu.Unlock()
104
105 cancel, ok := r.runningJobs[repo]
106 if !ok {
107 return
108 }
109 delete(r.runningJobs, repo)
110 cancel()
111}
112
113// TriggerResyncJob manually triggers the resync job
114func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error {
115 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
116 if err != nil {
117 return fmt.Errorf("failed to get repo: %w", err)
118 }
119 if repo == nil {
120 return fmt.Errorf("repo not found: %s", repoAt)
121 }
122
123 if repo.State == models.RepoStateResyncing {
124 return fmt.Errorf("repo already resyncing")
125 }
126
127 repo.State = models.RepoStatePending
128 repo.RetryAfter = -1 // resyncer will prioritize this
129
130 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
131 return fmt.Errorf("updating repo state to pending %w", err)
132 }
133 return nil
134}
135
136func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) {
137 // use mutex to prevent duplicated jobs
138 r.claimJobMu.Lock()
139 defer r.claimJobMu.Unlock()
140
141 var repoAt syntax.ATURI
142 now := time.Now().Unix()
143 if err := r.db.QueryRowContext(ctx,
144 `update repos
145 set state = $1
146 where at_uri = (
147 select at_uri from repos
148 where state in ($2, $3, $4)
149 and (retry_after = -1 or retry_after = 0 or retry_after < $5)
150 limit 1
151 )
152 returning at_uri
153 `,
154 models.RepoStateResyncing,
155 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
156 now,
157 ).Scan(&repoAt); err != nil {
158 if errors.Is(err, sql.ErrNoRows) {
159 return "", false, nil
160 }
161 return "", false, err
162 }
163
164 return repoAt, true, nil
165}
166
167func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error {
168 // ctx, span := tracer.Start(ctx, "resyncRepo")
169 // span.SetAttributes(attribute.String("aturi", repoAt))
170 // defer span.End()
171
172 resyncsStarted.Inc()
173 startTime := time.Now()
174
175 jobCtx, cancel := context.WithCancel(ctx)
176 r.registerRunning(repoAt, cancel)
177 defer r.unregisterRunning(repoAt)
178
179 success, err := r.doResync(jobCtx, repoAt)
180 if !success {
181 resyncsFailed.Inc()
182 resyncDuration.Observe(time.Since(startTime).Seconds())
183 return r.handleResyncFailure(ctx, repoAt, err)
184 }
185
186 resyncsCompleted.Inc()
187 resyncDuration.Observe(time.Since(startTime).Seconds())
188 return nil
189}
190
191func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) {
192 // ctx, span := tracer.Start(ctx, "doResync")
193 // span.SetAttributes(attribute.String("aturi", repoAt))
194 // defer span.End()
195
196 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
197 if err != nil {
198 return false, fmt.Errorf("failed to get repo: %w", err)
199 }
200 if repo == nil { // untracked repo, skip
201 return false, nil
202 }
203
204 // TODO: check if Knot is on backoff list. If so, return (false, nil)
205 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list
206
207 timeout := r.repoFetchTimeout
208 if repo.RetryAfter == -1 {
209 timeout = r.manualResyncTimeout
210 }
211 fetchCtx, cancel := context.WithTimeout(ctx, timeout)
212 defer cancel()
213
214 if err := r.gitm.Sync(fetchCtx, repo); err != nil {
215 return false, err
216 }
217
218 // repo.GitRev = <processed git.refUpdate revision>
219 // repo.RepoSha = <sha256 sum of git refs>
220 repo.State = models.RepoStateActive
221 repo.ErrorMsg = ""
222 repo.RetryCount = 0
223 repo.RetryAfter = 0
224 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
225 return false, fmt.Errorf("updating repo state to active %w", err)
226 }
227 return true, nil
228}
229
230func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error {
231 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err)
232 var state models.RepoState
233 var errMsg string
234 if err == nil {
235 state = models.RepoStateDesynchronized
236 errMsg = ""
237 } else {
238 state = models.RepoStateError
239 errMsg = err.Error()
240 }
241
242 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
243 if err != nil {
244 return fmt.Errorf("failed to get repo: %w", err)
245 }
246 if repo == nil {
247 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt)
248 }
249
250 // start a 1 min & go up to 1 hr between retries
251 var retryCount = repo.RetryCount + 1
252 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
253
254 // remove null bytes
255 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
256
257 repo.State = state
258 repo.ErrorMsg = errMsg
259 repo.RetryCount = retryCount
260 repo.RetryAfter = retryAfter
261 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil {
262 return fmt.Errorf("failed to update repo state: %w", err)
263 }
264 return err
265}
266
267func backoff(retries int, max int) time.Duration {
268 dur := min(1<<retries, max)
269 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
270 return time.Second*time.Duration(dur) + jitter
271}