A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "net/url"
11 "os"
12 "path"
13 "sync"
14 "time"
15
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "tangled.org/core/knotmirror/config"
18 "tangled.org/core/knotmirror/db"
19 "tangled.org/core/knotmirror/models"
20 "tangled.org/core/log"
21)
22
23type Resyncer struct {
24 logger *slog.Logger
25 db *sql.DB
26
27 claimJobMu sync.Mutex
28
29 repoBasePath string
30 repoFetchTimeout time.Duration
31 knotUseSSL bool
32
33 parallelism int
34}
35
36func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer {
37 return &Resyncer{
38 logger: log.SubLogger(l, "resyncer"),
39 db: db,
40 repoBasePath: cfg.GitRepoBasePath,
41 repoFetchTimeout: cfg.GitRepoFetchTimeout,
42 knotUseSSL: cfg.KnotUseSSL,
43 parallelism: cfg.ResyncParallelism,
44 }
45}
46
47func (r *Resyncer) Start(ctx context.Context) {
48 for i := 0; i < r.parallelism; i++ {
49 go r.runResyncWorker(ctx, i)
50 }
51}
52
53func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
54 l := r.logger.With("worker", workerID)
55 for {
56 select {
57 case <-ctx.Done():
58 l.Info("resync worker shutting down", "error", ctx.Err())
59 return
60 default:
61 }
62 repoAt, found, err := r.claimResyncJob(ctx)
63 if err != nil {
64 l.Error("failed to claim resync job", "error", err)
65 time.Sleep(time.Second)
66 continue
67 }
68 if !found {
69 time.Sleep(time.Second)
70 continue
71 }
72 l.Info("processing resync", "aturi", repoAt)
73 if err := r.resyncRepo(ctx, repoAt); err != nil {
74 l.Error("resync failed", "aturi", repoAt, "error", err)
75 }
76 }
77}
78
79func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) {
80 // use mutex to prevent duplicated jobs
81 r.claimJobMu.Lock()
82 defer r.claimJobMu.Unlock()
83
84 var repoAt syntax.ATURI
85 now := time.Now().Unix()
86 if err := r.db.QueryRowContext(ctx,
87 `update repos
88 set state = ?
89 where at_uri = (
90 select at_uri from repos
91 where state in (?, ?, ?)
92 and (retry_after = 0 or retry_after < ?)
93 limit 1
94 )
95 returning at_uri
96 `,
97 models.RepoStateResyncing,
98 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
99 now,
100 ).Scan(&repoAt); err != nil {
101 if errors.Is(err, sql.ErrNoRows) {
102 return "", false, nil
103 }
104 return "", false, err
105 }
106
107 return repoAt, true, nil
108}
109
110func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error {
111 // ctx, span := tracer.Start(ctx, "resyncRepo")
112 // span.SetAttributes(attribute.String("aturi", repoAt))
113 // defer span.End()
114
115 resyncsStarted.Inc()
116 startTime := time.Now()
117
118 success, err := r.doResync(ctx, repoAt)
119 if !success {
120 resyncsFailed.Inc()
121 resyncDuration.Observe(time.Since(startTime).Seconds())
122 return r.handleResyncFailure(ctx, repoAt, err)
123 }
124
125 resyncsCompleted.Inc()
126 resyncDuration.Observe(time.Since(startTime).Seconds())
127 return nil
128}
129
130func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) {
131 // ctx, span := tracer.Start(ctx, "doResync")
132 // span.SetAttributes(attribute.String("aturi", repoAt))
133 // defer span.End()
134
135 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
136 if err != nil {
137 return false, fmt.Errorf("failed to get repo: %w", err)
138 }
139 if repo == nil { // untracked repo, skip
140 return false, nil
141 }
142
143 repoPath := r.repoPath(repo)
144 l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath)
145
146 remoteUrl, err := r.repoRemoteURL(repo)
147 if err != nil {
148 return false, fmt.Errorf("parsing knot url: %w", err)
149 }
150 l = l.With("url", remoteUrl)
151
152 ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout)
153 defer cancel()
154
155 // TODO: check if Knot is on backoff list. If so, return (false, nil)
156 // TODO: use r.repoFetchTimeout on fetch
157 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list
158
159 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive.
160 gitclient := &CliGitMirrorClient{}
161
162 exist, err := isDir(repoPath)
163 if err != nil {
164 return false, fmt.Errorf("checking repo path: %w", err)
165 }
166 if !exist {
167 if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil {
168 return false, err
169 }
170 } else {
171 if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil {
172 return false, err
173 }
174 }
175
176 // repo.GitRev = <processed git.refUpdate revision>
177 // repo.RepoSha = <sha256 sum of git refs>
178 repo.State = models.RepoStateActive
179 repo.ErrorMsg = ""
180 repo.RetryCount = 0
181 repo.RetryAfter = 0
182 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
183 return false, fmt.Errorf("updating repo state to active %w", err)
184 }
185 return true, nil
186}
187
188func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error {
189 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err)
190 var state models.RepoState
191 var errMsg string
192 if err == nil {
193 state = models.RepoStateDesynchronized
194 errMsg = ""
195 } else {
196 state = models.RepoStateError
197 errMsg = err.Error()
198 }
199
200 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
201 if err != nil {
202 return fmt.Errorf("failed to get repo: %w", err)
203 }
204 if repo == nil {
205 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt)
206 }
207
208 var retryCount = repo.RetryCount + 1
209 var retryAfter int64
210 if retryCount >= 10 {
211 state = models.RepoStateSuspended
212 errMsg = fmt.Sprintf("too many resync fails: %s", errMsg)
213 retryAfter = 0
214 } else {
215 // start a 1 min & go up to 1 hr between retries
216 retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
217 }
218
219 repo.State = state
220 repo.ErrorMsg = errMsg
221 repo.RetryCount = retryCount
222 repo.RetryAfter = retryAfter
223 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil {
224 return dbErr
225 }
226 return err
227}
228
229func (r *Resyncer) repoPath(repo *models.Repo) string {
230 return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String())
231}
232
233func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) {
234 u, err := url.Parse(repo.KnotDomain)
235 if err != nil {
236 return "", err
237 }
238 if u.Scheme == "" {
239 if r.knotUseSSL {
240 u.Scheme = "https"
241 } else {
242 u.Scheme = "http"
243 }
244 }
245 u = u.JoinPath(repo.DidSlashRepo())
246 return u.String(), nil
247}
248
249func backoff(retries int, max int) time.Duration {
250 dur := min(1<<retries, max)
251 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
252 return time.Second*time.Duration(dur) + jitter
253}
254
255func isDir(path string) (bool, error) {
256 info, err := os.Stat(path)
257 if err == nil && info.IsDir() {
258 return true, nil
259 }
260 if os.IsNotExist(err) {
261 return false, nil
262 }
263 return false, err
264}