A vibe coded tangled fork which supports pijul.
at dda7e5c4760ecd9809464b15499aa4c2bfc41d72 268 lines 6.7 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "net/url" 11 "os" 12 "path" 13 "strings" 14 "sync" 15 "time" 16 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 "tangled.org/core/knotmirror/config" 19 "tangled.org/core/knotmirror/db" 20 "tangled.org/core/knotmirror/models" 21 "tangled.org/core/log" 22) 23 24type Resyncer struct { 25 logger *slog.Logger 26 db *sql.DB 27 28 claimJobMu sync.Mutex 29 30 repoBasePath string 31 repoFetchTimeout time.Duration 32 knotUseSSL bool 33 34 parallelism int 35} 36 37func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer { 38 return &Resyncer{ 39 logger: log.SubLogger(l, "resyncer"), 40 db: db, 41 repoBasePath: cfg.GitRepoBasePath, 42 repoFetchTimeout: cfg.GitRepoFetchTimeout, 43 knotUseSSL: cfg.KnotUseSSL, 44 parallelism: cfg.ResyncParallelism, 45 } 46} 47 48func (r *Resyncer) Start(ctx context.Context) { 49 for i := 0; i < r.parallelism; i++ { 50 go r.runResyncWorker(ctx, i) 51 } 52} 53 54func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 55 l := r.logger.With("worker", workerID) 56 for { 57 select { 58 case <-ctx.Done(): 59 l.Info("resync worker shutting down", "error", ctx.Err()) 60 return 61 default: 62 } 63 repoAt, found, err := r.claimResyncJob(ctx) 64 if err != nil { 65 l.Error("failed to claim resync job", "error", err) 66 time.Sleep(time.Second) 67 continue 68 } 69 if !found { 70 time.Sleep(time.Second) 71 continue 72 } 73 l.Info("processing resync", "aturi", repoAt) 74 if err := r.resyncRepo(ctx, repoAt); err != nil { 75 l.Error("resync failed", "aturi", repoAt, "error", err) 76 } 77 } 78} 79 80func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 81 // use mutex to prevent duplicated jobs 82 r.claimJobMu.Lock() 83 defer r.claimJobMu.Unlock() 84 85 var repoAt syntax.ATURI 86 now := time.Now().Unix() 87 if err := r.db.QueryRowContext(ctx, 88 `update repos 89 set state = $1 90 where at_uri = ( 91 select at_uri from repos 92 where state in ($2, $3, $4) 93 and (retry_after = 0 or retry_after < $5) 94 limit 1 95 ) 96 returning at_uri 97 `, 98 models.RepoStateResyncing, 99 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 100 now, 101 ).Scan(&repoAt); err != nil { 102 if errors.Is(err, sql.ErrNoRows) { 103 return "", false, nil 104 } 105 return "", false, err 106 } 107 108 return repoAt, true, nil 109} 110 111func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 112 // ctx, span := tracer.Start(ctx, "resyncRepo") 113 // span.SetAttributes(attribute.String("aturi", repoAt)) 114 // defer span.End() 115 116 resyncsStarted.Inc() 117 startTime := time.Now() 118 119 success, err := r.doResync(ctx, repoAt) 120 if !success { 121 resyncsFailed.Inc() 122 resyncDuration.Observe(time.Since(startTime).Seconds()) 123 return r.handleResyncFailure(ctx, repoAt, err) 124 } 125 126 resyncsCompleted.Inc() 127 resyncDuration.Observe(time.Since(startTime).Seconds()) 128 return nil 129} 130 131func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 132 // ctx, span := tracer.Start(ctx, "doResync") 133 // span.SetAttributes(attribute.String("aturi", repoAt)) 134 // defer span.End() 135 136 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 137 if err != nil { 138 return false, fmt.Errorf("failed to get repo: %w", err) 139 } 140 if repo == nil { // untracked repo, skip 141 return false, nil 142 } 143 144 repoPath := r.repoPath(repo) 145 l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath) 146 147 remoteUrl, err := r.repoRemoteURL(repo) 148 if err != nil { 149 return false, fmt.Errorf("parsing knot url: %w", err) 150 } 151 l = l.With("url", remoteUrl) 152 153 ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 154 defer cancel() 155 156 // TODO: check if Knot is on backoff list. If so, return (false, nil) 157 // TODO: use r.repoFetchTimeout on fetch 158 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 159 160 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 161 gitclient := &CliGitMirrorClient{} 162 163 exist, err := isDir(repoPath) 164 if err != nil { 165 return false, fmt.Errorf("checking repo path: %w", err) 166 } 167 if !exist { 168 if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil { 169 return false, err 170 } 171 } else { 172 if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil { 173 return false, err 174 } 175 } 176 177 // repo.GitRev = <processed git.refUpdate revision> 178 // repo.RepoSha = <sha256 sum of git refs> 179 repo.State = models.RepoStateActive 180 repo.ErrorMsg = "" 181 repo.RetryCount = 0 182 repo.RetryAfter = 0 183 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 184 return false, fmt.Errorf("updating repo state to active %w", err) 185 } 186 return true, nil 187} 188 189func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 190 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 191 var state models.RepoState 192 var errMsg string 193 if err == nil { 194 state = models.RepoStateDesynchronized 195 errMsg = "" 196 } else { 197 state = models.RepoStateError 198 errMsg = err.Error() 199 } 200 201 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 202 if err != nil { 203 return fmt.Errorf("failed to get repo: %w", err) 204 } 205 if repo == nil { 206 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 207 } 208 209 var retryCount = repo.RetryCount + 1 210 var retryAfter int64 211 if retryCount >= 10 { 212 state = models.RepoStateSuspended 213 errMsg = fmt.Sprintf("too many resync fails: %s", errMsg) 214 retryAfter = 0 215 } else { 216 // start a 1 min & go up to 1 hr between retries 217 retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 218 } 219 220 // remove null bytes 221 errMsg = strings.ReplaceAll(errMsg, "\x00", "") 222 223 repo.State = state 224 repo.ErrorMsg = errMsg 225 repo.RetryCount = retryCount 226 repo.RetryAfter = retryAfter 227 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 228 return fmt.Errorf("failed to update repo state: %w", err) 229 } 230 return err 231} 232 233func (r *Resyncer) repoPath(repo *models.Repo) string { 234 return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) 235} 236 237func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) { 238 u, err := url.Parse(repo.KnotDomain) 239 if err != nil { 240 return "", err 241 } 242 if u.Scheme == "" { 243 if r.knotUseSSL { 244 u.Scheme = "https" 245 } else { 246 u.Scheme = "http" 247 } 248 } 249 u = u.JoinPath(repo.DidSlashRepo()) 250 return u.String(), nil 251} 252 253func backoff(retries int, max int) time.Duration { 254 dur := min(1<<retries, max) 255 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 256 return time.Second*time.Duration(dur) + jitter 257} 258 259func isDir(path string) (bool, error) { 260 info, err := os.Stat(path) 261 if err == nil && info.IsDir() { 262 return true, nil 263 } 264 if os.IsNotExist(err) { 265 return false, nil 266 } 267 return false, err 268}