package knotmirror import ( "context" "database/sql" "errors" "fmt" "log/slog" "math/rand" "net/url" "os" "path" "strings" "sync" "time" "github.com/bluesky-social/indigo/atproto/syntax" "tangled.org/core/knotmirror/config" "tangled.org/core/knotmirror/db" "tangled.org/core/knotmirror/models" "tangled.org/core/log" ) type Resyncer struct { logger *slog.Logger db *sql.DB claimJobMu sync.Mutex repoBasePath string repoFetchTimeout time.Duration knotUseSSL bool parallelism int } func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer { return &Resyncer{ logger: log.SubLogger(l, "resyncer"), db: db, repoBasePath: cfg.GitRepoBasePath, repoFetchTimeout: cfg.GitRepoFetchTimeout, knotUseSSL: cfg.KnotUseSSL, parallelism: cfg.ResyncParallelism, } } func (r *Resyncer) Start(ctx context.Context) { for i := 0; i < r.parallelism; i++ { go r.runResyncWorker(ctx, i) } } func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { l := r.logger.With("worker", workerID) for { select { case <-ctx.Done(): l.Info("resync worker shutting down", "error", ctx.Err()) return default: } repoAt, found, err := r.claimResyncJob(ctx) if err != nil { l.Error("failed to claim resync job", "error", err) time.Sleep(time.Second) continue } if !found { time.Sleep(time.Second) continue } l.Info("processing resync", "aturi", repoAt) if err := r.resyncRepo(ctx, repoAt); err != nil { l.Error("resync failed", "aturi", repoAt, "error", err) } } } func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { // use mutex to prevent duplicated jobs r.claimJobMu.Lock() defer r.claimJobMu.Unlock() var repoAt syntax.ATURI now := time.Now().Unix() if err := r.db.QueryRowContext(ctx, `update repos set state = $1 where at_uri = ( select at_uri from repos where state in ($2, $3, $4) and (retry_after = 0 or retry_after < $5) limit 1 ) returning at_uri `, models.RepoStateResyncing, models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, now, ).Scan(&repoAt); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", false, nil } return "", false, err } return repoAt, true, nil } func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { // ctx, span := tracer.Start(ctx, "resyncRepo") // span.SetAttributes(attribute.String("aturi", repoAt)) // defer span.End() resyncsStarted.Inc() startTime := time.Now() success, err := r.doResync(ctx, repoAt) if !success { resyncsFailed.Inc() resyncDuration.Observe(time.Since(startTime).Seconds()) return r.handleResyncFailure(ctx, repoAt, err) } resyncsCompleted.Inc() resyncDuration.Observe(time.Since(startTime).Seconds()) return nil } func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { // ctx, span := tracer.Start(ctx, "doResync") // span.SetAttributes(attribute.String("aturi", repoAt)) // defer span.End() repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) if err != nil { return false, fmt.Errorf("failed to get repo: %w", err) } if repo == nil { // untracked repo, skip return false, nil } repoPath := r.repoPath(repo) l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath) remoteUrl, err := r.repoRemoteURL(repo) if err != nil { return false, fmt.Errorf("parsing knot url: %w", err) } l = l.With("url", remoteUrl) ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) defer cancel() // TODO: check if Knot is on backoff list. If so, return (false, nil) // TODO: use r.repoFetchTimeout on fetch // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. gitclient := &CliGitMirrorClient{} exist, err := isDir(repoPath) if err != nil { return false, fmt.Errorf("checking repo path: %w", err) } if !exist { if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil { return false, err } } else { if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil { return false, err } } // repo.GitRev = // repo.RepoSha = repo.State = models.RepoStateActive repo.ErrorMsg = "" repo.RetryCount = 0 repo.RetryAfter = 0 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { return false, fmt.Errorf("updating repo state to active %w", err) } return true, nil } func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) var state models.RepoState var errMsg string if err == nil { state = models.RepoStateDesynchronized errMsg = "" } else { state = models.RepoStateError errMsg = err.Error() } repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) if err != nil { return fmt.Errorf("failed to get repo: %w", err) } if repo == nil { return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) } var retryCount = repo.RetryCount + 1 var retryAfter int64 if retryCount >= 10 { state = models.RepoStateSuspended errMsg = fmt.Sprintf("too many resync fails: %s", errMsg) retryAfter = 0 } else { // start a 1 min & go up to 1 hr between retries retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() } // remove null bytes errMsg = strings.ReplaceAll(errMsg, "\x00", "") repo.State = state repo.ErrorMsg = errMsg repo.RetryCount = retryCount repo.RetryAfter = retryAfter if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { return fmt.Errorf("failed to update repo state: %w", err) } return err } func (r *Resyncer) repoPath(repo *models.Repo) string { return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) } func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) { u, err := url.Parse(repo.KnotDomain) if err != nil { return "", err } if u.Scheme == "" { if r.knotUseSSL { u.Scheme = "https" } else { u.Scheme = "http" } } u = u.JoinPath(repo.DidSlashRepo()) return u.String(), nil } func backoff(retries int, max int) time.Duration { dur := min(1<