A vibe coded tangled fork which supports pijul.
at e620e86a00c541e18e9e8097f97e51962631c5cd 264 lines 6.5 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "net/url" 11 "os" 12 "path" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "tangled.org/core/knotmirror/config" 18 "tangled.org/core/knotmirror/db" 19 "tangled.org/core/knotmirror/models" 20 "tangled.org/core/log" 21) 22 23type Resyncer struct { 24 logger *slog.Logger 25 db *sql.DB 26 27 claimJobMu sync.Mutex 28 29 repoBasePath string 30 repoFetchTimeout time.Duration 31 knotUseSSL bool 32 33 parallelism int 34} 35 36func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer { 37 return &Resyncer{ 38 logger: log.SubLogger(l, "resyncer"), 39 db: db, 40 repoBasePath: cfg.GitRepoBasePath, 41 repoFetchTimeout: cfg.GitRepoFetchTimeout, 42 knotUseSSL: cfg.KnotUseSSL, 43 parallelism: cfg.ResyncParallelism, 44 } 45} 46 47func (r *Resyncer) Start(ctx context.Context) { 48 for i := 0; i < r.parallelism; i++ { 49 go r.runResyncWorker(ctx, i) 50 } 51} 52 53func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 54 l := r.logger.With("worker", workerID) 55 for { 56 select { 57 case <-ctx.Done(): 58 l.Info("resync worker shutting down", "error", ctx.Err()) 59 return 60 default: 61 } 62 repoAt, found, err := r.claimResyncJob(ctx) 63 if err != nil { 64 l.Error("failed to claim resync job", "error", err) 65 time.Sleep(time.Second) 66 continue 67 } 68 if !found { 69 time.Sleep(time.Second) 70 continue 71 } 72 l.Info("processing resync", "aturi", repoAt) 73 if err := r.resyncRepo(ctx, repoAt); err != nil { 74 l.Error("resync failed", "aturi", repoAt, "error", err) 75 } 76 } 77} 78 79func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 80 // use mutex to prevent duplicated jobs 81 r.claimJobMu.Lock() 82 defer r.claimJobMu.Unlock() 83 84 var repoAt syntax.ATURI 85 now := time.Now().Unix() 86 if err := r.db.QueryRowContext(ctx, 87 `update repos 88 set state = ? 89 where at_uri = ( 90 select at_uri from repos 91 where state in (?, ?, ?) 92 and (retry_after = 0 or retry_after < ?) 93 limit 1 94 ) 95 returning at_uri 96 `, 97 models.RepoStateResyncing, 98 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 99 now, 100 ).Scan(&repoAt); err != nil { 101 if errors.Is(err, sql.ErrNoRows) { 102 return "", false, nil 103 } 104 return "", false, err 105 } 106 107 return repoAt, true, nil 108} 109 110func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 111 // ctx, span := tracer.Start(ctx, "resyncRepo") 112 // span.SetAttributes(attribute.String("aturi", repoAt)) 113 // defer span.End() 114 115 resyncsStarted.Inc() 116 startTime := time.Now() 117 118 success, err := r.doResync(ctx, repoAt) 119 if !success { 120 resyncsFailed.Inc() 121 resyncDuration.Observe(time.Since(startTime).Seconds()) 122 return r.handleResyncFailure(ctx, repoAt, err) 123 } 124 125 resyncsCompleted.Inc() 126 resyncDuration.Observe(time.Since(startTime).Seconds()) 127 return nil 128} 129 130func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 131 // ctx, span := tracer.Start(ctx, "doResync") 132 // span.SetAttributes(attribute.String("aturi", repoAt)) 133 // defer span.End() 134 135 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 136 if err != nil { 137 return false, fmt.Errorf("failed to get repo: %w", err) 138 } 139 if repo == nil { // untracked repo, skip 140 return false, nil 141 } 142 143 repoPath := r.repoPath(repo) 144 l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath) 145 146 remoteUrl, err := r.repoRemoteURL(repo) 147 if err != nil { 148 return false, fmt.Errorf("parsing knot url: %w", err) 149 } 150 l = l.With("url", remoteUrl) 151 152 ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 153 defer cancel() 154 155 // TODO: check if Knot is on backoff list. If so, return (false, nil) 156 // TODO: use r.repoFetchTimeout on fetch 157 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 158 159 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 160 gitclient := &CliGitMirrorClient{} 161 162 exist, err := isDir(repoPath) 163 if err != nil { 164 return false, fmt.Errorf("checking repo path: %w", err) 165 } 166 if !exist { 167 if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil { 168 return false, err 169 } 170 } else { 171 if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil { 172 return false, err 173 } 174 } 175 176 // repo.GitRev = <processed git.refUpdate revision> 177 // repo.RepoSha = <sha256 sum of git refs> 178 repo.State = models.RepoStateActive 179 repo.ErrorMsg = "" 180 repo.RetryCount = 0 181 repo.RetryAfter = 0 182 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 183 return false, fmt.Errorf("updating repo state to active %w", err) 184 } 185 return true, nil 186} 187 188func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 189 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 190 var state models.RepoState 191 var errMsg string 192 if err == nil { 193 state = models.RepoStateDesynchronized 194 errMsg = "" 195 } else { 196 state = models.RepoStateError 197 errMsg = err.Error() 198 } 199 200 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 201 if err != nil { 202 return fmt.Errorf("failed to get repo: %w", err) 203 } 204 if repo == nil { 205 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 206 } 207 208 var retryCount = repo.RetryCount + 1 209 var retryAfter int64 210 if retryCount >= 10 { 211 state = models.RepoStateSuspended 212 errMsg = fmt.Sprintf("too many resync fails: %s", errMsg) 213 retryAfter = 0 214 } else { 215 // start a 1 min & go up to 1 hr between retries 216 retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 217 } 218 219 repo.State = state 220 repo.ErrorMsg = errMsg 221 repo.RetryCount = retryCount 222 repo.RetryAfter = retryAfter 223 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 224 return dbErr 225 } 226 return err 227} 228 229func (r *Resyncer) repoPath(repo *models.Repo) string { 230 return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) 231} 232 233func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) { 234 u, err := url.Parse(repo.KnotDomain) 235 if err != nil { 236 return "", err 237 } 238 if u.Scheme == "" { 239 if r.knotUseSSL { 240 u.Scheme = "https" 241 } else { 242 u.Scheme = "http" 243 } 244 } 245 u = u.JoinPath(repo.DidSlashRepo()) 246 return u.String(), nil 247} 248 249func backoff(retries int, max int) time.Duration { 250 dur := min(1<<retries, max) 251 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 252 return time.Second*time.Duration(dur) + jitter 253} 254 255func isDir(path string) (bool, error) { 256 info, err := os.Stat(path) 257 if err == nil && info.IsDir() { 258 return true, nil 259 } 260 if os.IsNotExist(err) { 261 return false, nil 262 } 263 return false, err 264}