A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "net/url"
11 "os"
12 "path"
13 "strings"
14 "sync"
15 "time"
16
17 "github.com/bluesky-social/indigo/atproto/syntax"
18 "tangled.org/core/knotmirror/config"
19 "tangled.org/core/knotmirror/db"
20 "tangled.org/core/knotmirror/models"
21 "tangled.org/core/log"
22)
23
24type Resyncer struct {
25 logger *slog.Logger
26 db *sql.DB
27
28 claimJobMu sync.Mutex
29
30 repoBasePath string
31 repoFetchTimeout time.Duration
32 knotUseSSL bool
33
34 parallelism int
35}
36
37func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer {
38 return &Resyncer{
39 logger: log.SubLogger(l, "resyncer"),
40 db: db,
41 repoBasePath: cfg.GitRepoBasePath,
42 repoFetchTimeout: cfg.GitRepoFetchTimeout,
43 knotUseSSL: cfg.KnotUseSSL,
44 parallelism: cfg.ResyncParallelism,
45 }
46}
47
48func (r *Resyncer) Start(ctx context.Context) {
49 for i := 0; i < r.parallelism; i++ {
50 go r.runResyncWorker(ctx, i)
51 }
52}
53
54func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
55 l := r.logger.With("worker", workerID)
56 for {
57 select {
58 case <-ctx.Done():
59 l.Info("resync worker shutting down", "error", ctx.Err())
60 return
61 default:
62 }
63 repoAt, found, err := r.claimResyncJob(ctx)
64 if err != nil {
65 l.Error("failed to claim resync job", "error", err)
66 time.Sleep(time.Second)
67 continue
68 }
69 if !found {
70 time.Sleep(time.Second)
71 continue
72 }
73 l.Info("processing resync", "aturi", repoAt)
74 if err := r.resyncRepo(ctx, repoAt); err != nil {
75 l.Error("resync failed", "aturi", repoAt, "error", err)
76 }
77 }
78}
79
80func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) {
81 // use mutex to prevent duplicated jobs
82 r.claimJobMu.Lock()
83 defer r.claimJobMu.Unlock()
84
85 var repoAt syntax.ATURI
86 now := time.Now().Unix()
87 if err := r.db.QueryRowContext(ctx,
88 `update repos
89 set state = $1
90 where at_uri = (
91 select at_uri from repos
92 where state in ($2, $3, $4)
93 and (retry_after = 0 or retry_after < $5)
94 limit 1
95 )
96 returning at_uri
97 `,
98 models.RepoStateResyncing,
99 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
100 now,
101 ).Scan(&repoAt); err != nil {
102 if errors.Is(err, sql.ErrNoRows) {
103 return "", false, nil
104 }
105 return "", false, err
106 }
107
108 return repoAt, true, nil
109}
110
111func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error {
112 // ctx, span := tracer.Start(ctx, "resyncRepo")
113 // span.SetAttributes(attribute.String("aturi", repoAt))
114 // defer span.End()
115
116 resyncsStarted.Inc()
117 startTime := time.Now()
118
119 success, err := r.doResync(ctx, repoAt)
120 if !success {
121 resyncsFailed.Inc()
122 resyncDuration.Observe(time.Since(startTime).Seconds())
123 return r.handleResyncFailure(ctx, repoAt, err)
124 }
125
126 resyncsCompleted.Inc()
127 resyncDuration.Observe(time.Since(startTime).Seconds())
128 return nil
129}
130
131func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) {
132 // ctx, span := tracer.Start(ctx, "doResync")
133 // span.SetAttributes(attribute.String("aturi", repoAt))
134 // defer span.End()
135
136 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
137 if err != nil {
138 return false, fmt.Errorf("failed to get repo: %w", err)
139 }
140 if repo == nil { // untracked repo, skip
141 return false, nil
142 }
143
144 repoPath := r.repoPath(repo)
145 l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath)
146
147 remoteUrl, err := r.repoRemoteURL(repo)
148 if err != nil {
149 return false, fmt.Errorf("parsing knot url: %w", err)
150 }
151 l = l.With("url", remoteUrl)
152
153 ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout)
154 defer cancel()
155
156 // TODO: check if Knot is on backoff list. If so, return (false, nil)
157 // TODO: use r.repoFetchTimeout on fetch
158 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list
159
160 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive.
161 gitclient := &CliGitMirrorClient{}
162
163 exist, err := isDir(repoPath)
164 if err != nil {
165 return false, fmt.Errorf("checking repo path: %w", err)
166 }
167 if !exist {
168 if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil {
169 return false, err
170 }
171 } else {
172 if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil {
173 return false, err
174 }
175 }
176
177 // repo.GitRev = <processed git.refUpdate revision>
178 // repo.RepoSha = <sha256 sum of git refs>
179 repo.State = models.RepoStateActive
180 repo.ErrorMsg = ""
181 repo.RetryCount = 0
182 repo.RetryAfter = 0
183 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
184 return false, fmt.Errorf("updating repo state to active %w", err)
185 }
186 return true, nil
187}
188
189func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error {
190 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err)
191 var state models.RepoState
192 var errMsg string
193 if err == nil {
194 state = models.RepoStateDesynchronized
195 errMsg = ""
196 } else {
197 state = models.RepoStateError
198 errMsg = err.Error()
199 }
200
201 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
202 if err != nil {
203 return fmt.Errorf("failed to get repo: %w", err)
204 }
205 if repo == nil {
206 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt)
207 }
208
209 var retryCount = repo.RetryCount + 1
210 var retryAfter int64
211 if retryCount >= 10 {
212 state = models.RepoStateSuspended
213 errMsg = fmt.Sprintf("too many resync fails: %s", errMsg)
214 retryAfter = 0
215 } else {
216 // start a 1 min & go up to 1 hr between retries
217 retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
218 }
219
220 // remove null bytes
221 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
222
223 repo.State = state
224 repo.ErrorMsg = errMsg
225 repo.RetryCount = retryCount
226 repo.RetryAfter = retryAfter
227 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil {
228 return fmt.Errorf("failed to update repo state: %w", err)
229 }
230 return err
231}
232
233func (r *Resyncer) repoPath(repo *models.Repo) string {
234 return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String())
235}
236
237func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) {
238 u, err := url.Parse(repo.KnotDomain)
239 if err != nil {
240 return "", err
241 }
242 if u.Scheme == "" {
243 if r.knotUseSSL {
244 u.Scheme = "https"
245 } else {
246 u.Scheme = "http"
247 }
248 }
249 u = u.JoinPath(repo.DidSlashRepo())
250 return u.String(), nil
251}
252
253func backoff(retries int, max int) time.Duration {
254 dur := min(1<<retries, max)
255 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
256 return time.Second*time.Duration(dur) + jitter
257}
258
259func isDir(path string) (bool, error) {
260 info, err := os.Stat(path)
261 if err == nil && info.IsDir() {
262 return true, nil
263 }
264 if os.IsNotExist(err) {
265 return false, nil
266 }
267 return false, err
268}