A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "tangled.org/core/knotmirror/config"
16 "tangled.org/core/knotmirror/db"
17 "tangled.org/core/knotmirror/models"
18 "tangled.org/core/log"
19)
20
21type Resyncer struct {
22 logger *slog.Logger
23 db *sql.DB
24 gitm GitMirrorManager
25
26 claimJobMu sync.Mutex
27
28 repoFetchTimeout time.Duration
29
30 parallelism int
31}
32
33func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer {
34 return &Resyncer{
35 logger: log.SubLogger(l, "resyncer"),
36 db: db,
37 gitm: gitm,
38
39 repoFetchTimeout: cfg.GitRepoFetchTimeout,
40 parallelism: cfg.ResyncParallelism,
41 }
42}
43
44func (r *Resyncer) Start(ctx context.Context) {
45 for i := 0; i < r.parallelism; i++ {
46 go r.runResyncWorker(ctx, i)
47 }
48}
49
50func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
51 l := r.logger.With("worker", workerID)
52 for {
53 select {
54 case <-ctx.Done():
55 l.Info("resync worker shutting down", "error", ctx.Err())
56 return
57 default:
58 }
59 repoAt, found, err := r.claimResyncJob(ctx)
60 if err != nil {
61 l.Error("failed to claim resync job", "error", err)
62 time.Sleep(time.Second)
63 continue
64 }
65 if !found {
66 time.Sleep(time.Second)
67 continue
68 }
69 l.Info("processing resync", "aturi", repoAt)
70 if err := r.resyncRepo(ctx, repoAt); err != nil {
71 l.Error("resync failed", "aturi", repoAt, "error", err)
72 }
73 }
74}
75
76func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) {
77 // use mutex to prevent duplicated jobs
78 r.claimJobMu.Lock()
79 defer r.claimJobMu.Unlock()
80
81 var repoAt syntax.ATURI
82 now := time.Now().Unix()
83 if err := r.db.QueryRowContext(ctx,
84 `update repos
85 set state = $1
86 where at_uri = (
87 select at_uri from repos
88 where state in ($2, $3, $4)
89 and (retry_after = 0 or retry_after < $5)
90 limit 1
91 )
92 returning at_uri
93 `,
94 models.RepoStateResyncing,
95 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
96 now,
97 ).Scan(&repoAt); err != nil {
98 if errors.Is(err, sql.ErrNoRows) {
99 return "", false, nil
100 }
101 return "", false, err
102 }
103
104 return repoAt, true, nil
105}
106
107func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error {
108 // ctx, span := tracer.Start(ctx, "resyncRepo")
109 // span.SetAttributes(attribute.String("aturi", repoAt))
110 // defer span.End()
111
112 resyncsStarted.Inc()
113 startTime := time.Now()
114
115 success, err := r.doResync(ctx, repoAt)
116 if !success {
117 resyncsFailed.Inc()
118 resyncDuration.Observe(time.Since(startTime).Seconds())
119 return r.handleResyncFailure(ctx, repoAt, err)
120 }
121
122 resyncsCompleted.Inc()
123 resyncDuration.Observe(time.Since(startTime).Seconds())
124 return nil
125}
126
127func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) {
128 // ctx, span := tracer.Start(ctx, "doResync")
129 // span.SetAttributes(attribute.String("aturi", repoAt))
130 // defer span.End()
131
132 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
133 if err != nil {
134 return false, fmt.Errorf("failed to get repo: %w", err)
135 }
136 if repo == nil { // untracked repo, skip
137 return false, nil
138 }
139
140 // TODO: check if Knot is on backoff list. If so, return (false, nil)
141 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list
142
143 fetchCtx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout)
144 defer cancel()
145
146 if err := r.gitm.Sync(fetchCtx, repo); err != nil {
147 return false, err
148 }
149
150 // repo.GitRev = <processed git.refUpdate revision>
151 // repo.RepoSha = <sha256 sum of git refs>
152 repo.State = models.RepoStateActive
153 repo.ErrorMsg = ""
154 repo.RetryCount = 0
155 repo.RetryAfter = 0
156 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
157 return false, fmt.Errorf("updating repo state to active %w", err)
158 }
159 return true, nil
160}
161
162func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error {
163 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err)
164 var state models.RepoState
165 var errMsg string
166 if err == nil {
167 state = models.RepoStateDesynchronized
168 errMsg = ""
169 } else {
170 state = models.RepoStateError
171 errMsg = err.Error()
172 }
173
174 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
175 if err != nil {
176 return fmt.Errorf("failed to get repo: %w", err)
177 }
178 if repo == nil {
179 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt)
180 }
181
182 // start a 1 min & go up to 1 hr between retries
183 var retryCount = repo.RetryCount + 1
184 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
185
186 // remove null bytes
187 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
188
189 repo.State = state
190 repo.ErrorMsg = errMsg
191 repo.RetryCount = retryCount
192 repo.RetryAfter = retryAfter
193 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil {
194 return fmt.Errorf("failed to update repo state: %w", err)
195 }
196 return err
197}
198
199func backoff(retries int, max int) time.Duration {
200 dur := min(1<<retries, max)
201 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
202 return time.Second*time.Duration(dur) + jitter
203}