A vibe coded tangled fork which supports pijul.

knotmirror: move git logic into GitMirrorManager

Signed-off-by: Seongmin Lee <git@boltless.me>

+245 -103
+212 -21
knotmirror/git.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 + "net/url" 8 + "os" 7 9 "os/exec" 10 + "path/filepath" 8 11 "regexp" 9 12 "strings" 10 13 11 14 "github.com/go-git/go-git/v5" 12 15 gitconfig "github.com/go-git/go-git/v5/config" 13 16 "github.com/go-git/go-git/v5/plumbing/transport" 17 + "tangled.org/core/knotmirror/models" 14 18 ) 15 19 16 - type GitMirrorClient interface { 17 - Clone(ctx context.Context, path, url string) error 18 - Fetch(ctx context.Context, path, url string) error 20 + type GitMirrorManager interface { 21 + // RemoteSetUrl updates git repository 'origin' remote 22 + RemoteSetUrl(ctx context.Context, repo *models.Repo) error 23 + // Clone clones the repository as a mirror 24 + Clone(ctx context.Context, repo *models.Repo) error 25 + // Fetch fetches the repository 26 + Fetch(ctx context.Context, repo *models.Repo) error 27 + // Sync mirrors the repository. It will clone the repository if repository doesn't exist. 28 + Sync(ctx context.Context, repo *models.Repo) error 29 + } 30 + 31 + type CliGitMirrorManager struct { 32 + repoBasePath string 33 + knotUseSSL bool 34 + } 35 + 36 + func NewCliGitMirrorManager(repoBasePath string, knotUseSSL bool) *CliGitMirrorManager { 37 + return &CliGitMirrorManager{ 38 + repoBasePath, 39 + knotUseSSL, 40 + } 41 + } 42 + 43 + var _ GitMirrorManager = new(CliGitMirrorManager) 44 + 45 + func (c *CliGitMirrorManager) makeRepoPath(repo *models.Repo) string { 46 + return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String()) 19 47 } 20 48 21 - type CliGitMirrorClient struct{} 49 + func (c *CliGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error { 50 + path := c.makeRepoPath(repo) 51 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 52 + if err != nil { 53 + return fmt.Errorf("constructing repo remote url: %w", err) 54 + } 55 + cmd := exec.CommandContext(ctx, "git", "-C", path, "remote", "set-url", "origin", url) 56 + if out, err := cmd.CombinedOutput(); err != nil { 57 + if ctx.Err() != nil { 58 + return ctx.Err() 59 + } 60 + msg := string(out) 61 + return fmt.Errorf("running 'git remote set-url origin %s': %w\n%s", url, err, msg) 62 + } 63 + return nil 64 + } 22 65 23 - var _ GitMirrorClient = new(CliGitMirrorClient) 66 + func (c *CliGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error { 67 + path := c.makeRepoPath(repo) 68 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 69 + if err != nil { 70 + return fmt.Errorf("constructing repo remote url: %w", err) 71 + } 72 + return c.clone(ctx, path, url) 73 + } 24 74 25 - func (c *CliGitMirrorClient) Clone(ctx context.Context, path, url string) error { 75 + func (c *CliGitMirrorManager) clone(ctx context.Context, path, url string) error { 26 76 cmd := exec.CommandContext(ctx, "git", "clone", "--mirror", url, path) 27 77 if out, err := cmd.CombinedOutput(); err != nil { 28 78 if ctx.Err() != nil { 29 79 return ctx.Err() 30 80 } 31 81 msg := string(out) 32 - if classification := classifyError(msg); classification != nil { 82 + if classification := classifyCliError(msg); classification != nil { 33 83 return classification 34 84 } 35 - return fmt.Errorf("cloning repo: %w\n%s", err, msg) 85 + return fmt.Errorf("running 'git clone --mirror %s': %w\n%s", url, err, msg) 36 86 } 37 87 return nil 38 88 } 39 89 40 - func (c *CliGitMirrorClient) Fetch(ctx context.Context, path, url string) error { 90 + func (c *CliGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error { 91 + path := c.makeRepoPath(repo) 92 + return c.fetch(ctx, path) 93 + } 94 + 95 + func (c *CliGitMirrorManager) fetch(ctx context.Context, path string) error { 96 + // TODO: use `repo.Knot` instead of depending on origin 41 97 cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", "origin") 42 98 if out, err := cmd.CombinedOutput(); err != nil { 43 99 if ctx.Err() != nil { 44 100 return ctx.Err() 45 101 } 46 - return fmt.Errorf("fetching repo: %w\n%s", err, string(out)) 102 + return fmt.Errorf("running 'git fetch': %w\n%s", err, string(out)) 103 + } 104 + return nil 105 + } 106 + 107 + func (c *CliGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error { 108 + path := c.makeRepoPath(repo) 109 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 110 + if err != nil { 111 + return fmt.Errorf("constructing repo remote url: %w", err) 112 + } 113 + 114 + exist, err := isDir(path) 115 + if err != nil { 116 + return fmt.Errorf("checking repo path: %w", err) 117 + } 118 + if !exist { 119 + if err := c.clone(ctx, path, url); err != nil { 120 + return fmt.Errorf("cloning repo: %w", err) 121 + } 122 + } else { 123 + if err := c.fetch(ctx, path); err != nil { 124 + return fmt.Errorf("fetching repo: %w", err) 125 + } 47 126 } 48 127 return nil 49 128 } 50 129 51 130 var ( 52 - ErrDNSFailure = errors.New("git: dns failure (could not resolve host)") 53 - ErrCertExpired = errors.New("git: certificate has expired") 54 - ErrRepoNotFound = errors.New("git: repository not found") 131 + ErrDNSFailure = errors.New("git: knot: dns failure (could not resolve host)") 132 + ErrCertExpired = errors.New("git: knot: certificate has expired") 133 + ErrCertMismatch = errors.New("git: knot: certificate hostname mismatch") 134 + ErrTLSHandshake = errors.New("git: knot: tls handshake failure") 135 + ErrHTTPStatus = errors.New("git: knot: request url returned error") 136 + ErrUnreachable = errors.New("git: knot: could not connect to server") 137 + ErrRepoNotFound = errors.New("git: repo: repository not found") 55 138 ) 56 139 57 140 var ( 58 - reDNS = regexp.MustCompile(`Could not resolve host:`) 141 + reDNSFailure = regexp.MustCompile(`Could not resolve host:`) 59 142 reCertExpired = regexp.MustCompile(`SSL certificate OpenSSL verify result: certificate has expired`) 60 - reRepoNotFound = regexp.MustCompile(`repository '.*' not found`) 143 + reCertMismatch = regexp.MustCompile(`SSL: no alternative certificate subject name matches target hostname`) 144 + reTLSHandshake = regexp.MustCompile(`TLS connect error: (.*)`) 145 + reHTTPStatus = regexp.MustCompile(`The requested URL returned error: (\d\d\d)`) 146 + reUnreachable = regexp.MustCompile(`Could not connect to server`) 147 + reRepoNotFound = regexp.MustCompile(`repository '.*?' not found`) 61 148 ) 62 149 63 - func classifyError(stderr string) error { 150 + // classifyCliError classifies git cli error message. It will return nil for unknown error messages 151 + func classifyCliError(stderr string) error { 64 152 msg := strings.TrimSpace(stderr) 153 + if m := reTLSHandshake.FindStringSubmatch(msg); len(m) > 1 { 154 + return fmt.Errorf("%w: %s", ErrTLSHandshake, m[1]) 155 + } 156 + if m := reHTTPStatus.FindStringSubmatch(msg); len(m) > 1 { 157 + return fmt.Errorf("%w: %s", ErrHTTPStatus, m[1]) 158 + } 65 159 switch { 66 - case reDNS.MatchString(msg): 160 + case reDNSFailure.MatchString(msg): 67 161 return ErrDNSFailure 68 162 case reCertExpired.MatchString(msg): 69 163 return ErrCertExpired 164 + case reCertMismatch.MatchString(msg): 165 + return ErrCertMismatch 166 + case reUnreachable.MatchString(msg): 167 + return ErrUnreachable 70 168 case reRepoNotFound.MatchString(msg): 71 169 return ErrRepoNotFound 72 170 } 73 171 return nil 74 172 } 75 173 76 - type GoGitMirrorClient struct{} 174 + type GoGitMirrorManager struct { 175 + repoBasePath string 176 + knotUseSSL bool 177 + } 77 178 78 - var _ GitMirrorClient = new(GoGitMirrorClient) 179 + func NewGoGitMirrorClient(repoBasePath string, knotUseSSL bool) *GoGitMirrorManager { 180 + return &GoGitMirrorManager{ 181 + repoBasePath, 182 + knotUseSSL, 183 + } 184 + } 79 185 80 - func (c *GoGitMirrorClient) Clone(ctx context.Context, path string, url string) error { 186 + var _ GitMirrorManager = new(GoGitMirrorManager) 187 + 188 + func (c *GoGitMirrorManager) makeRepoPath(repo *models.Repo) string { 189 + return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String()) 190 + } 191 + 192 + func (c *GoGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error { 193 + panic("unimplemented") 194 + } 195 + 196 + func (c *GoGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error { 197 + path := c.makeRepoPath(repo) 198 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 199 + if err != nil { 200 + return fmt.Errorf("constructing repo remote url: %w", err) 201 + } 202 + return c.clone(ctx, path, url) 203 + } 204 + 205 + func (c *GoGitMirrorManager) clone(ctx context.Context, path, url string) error { 81 206 _, err := git.PlainCloneContext(ctx, path, true, &git.CloneOptions{ 82 207 URL: url, 83 208 Mirror: true, ··· 88 213 return nil 89 214 } 90 215 91 - func (c *GoGitMirrorClient) Fetch(ctx context.Context, path string, url string) error { 216 + func (c *GoGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error { 217 + path := c.makeRepoPath(repo) 218 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 219 + if err != nil { 220 + return fmt.Errorf("constructing repo remote url: %w", err) 221 + } 222 + 223 + return c.fetch(ctx, path, url) 224 + } 225 + 226 + func (c *GoGitMirrorManager) fetch(ctx context.Context, path, url string) error { 92 227 gr, err := git.PlainOpen(path) 93 228 if err != nil { 94 229 return fmt.Errorf("opening local repo: %w", err) ··· 103 238 } 104 239 return nil 105 240 } 241 + 242 + func (c *GoGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error { 243 + path := c.makeRepoPath(repo) 244 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 245 + if err != nil { 246 + return fmt.Errorf("constructing repo remote url: %w", err) 247 + } 248 + 249 + exist, err := isDir(path) 250 + if err != nil { 251 + return fmt.Errorf("checking repo path: %w", err) 252 + } 253 + if !exist { 254 + if err := c.clone(ctx, path, url); err != nil { 255 + return fmt.Errorf("cloning repo: %w", err) 256 + } 257 + } else { 258 + if err := c.fetch(ctx, path, url); err != nil { 259 + return fmt.Errorf("fetching repo: %w", err) 260 + } 261 + } 262 + return nil 263 + } 264 + 265 + func makeRepoRemoteUrl(knot, didSlashRepo string, knotUseSSL bool) (string, error) { 266 + if !strings.Contains(knot, "://") { 267 + if knotUseSSL { 268 + knot = "https://" + knot 269 + } else { 270 + knot = "http://" + knot 271 + } 272 + } 273 + 274 + u, err := url.Parse(knot) 275 + if err != nil { 276 + return "", err 277 + } 278 + 279 + if u.Scheme != "http" && u.Scheme != "https" { 280 + return "", fmt.Errorf("unsupported scheme: %s", u.Scheme) 281 + } 282 + 283 + u = u.JoinPath(didSlashRepo) 284 + return u.String(), nil 285 + } 286 + 287 + func isDir(path string) (bool, error) { 288 + info, err := os.Stat(path) 289 + if err == nil && info.IsDir() { 290 + return true, nil 291 + } 292 + if os.IsNotExist(err) { 293 + return false, nil 294 + } 295 + return false, err 296 + }
+5 -2
knotmirror/knotmirror.go
··· 36 36 return fmt.Errorf("initializing db: %w", err) 37 37 } 38 38 39 + // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 40 + gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL) 41 + 39 42 resolver := idresolver.DefaultResolver(cfg.PlcUrl) 40 43 41 44 res, err := db.ExecContext(ctx, ··· 55 58 xrpc := xrpc.New(logger, cfg, db, resolver) 56 59 knotstream := knotstream.NewKnotStream(logger, db, cfg) 57 60 crawler := NewCrawler(logger, db) 58 - resyncer := NewResyncer(logger, db, cfg) 61 + resyncer := NewResyncer(logger, db, gitm, cfg) 59 62 adminpage := NewAdminServer(db) 60 63 61 64 // maintain repository list with tap 62 65 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 63 - tap := NewTapClient(logger, cfg, db, knotstream) 66 + tap := NewTapClient(logger, cfg, db, gitm, knotstream) 64 67 65 68 // start http server 66 69 go func() {
+12 -77
knotmirror/resyncer.go
··· 7 7 "fmt" 8 8 "log/slog" 9 9 "math/rand" 10 - "net/url" 11 - "os" 12 - "path" 13 10 "strings" 14 11 "sync" 15 12 "time" ··· 24 21 type Resyncer struct { 25 22 logger *slog.Logger 26 23 db *sql.DB 24 + gitm GitMirrorManager 27 25 28 26 claimJobMu sync.Mutex 29 27 30 - repoBasePath string 31 28 repoFetchTimeout time.Duration 32 - knotUseSSL bool 33 29 34 30 parallelism int 35 31 } 36 32 37 - func NewResyncer(l *slog.Logger, db *sql.DB, cfg *config.Config) *Resyncer { 33 + func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer { 38 34 return &Resyncer{ 39 - logger: log.SubLogger(l, "resyncer"), 40 - db: db, 41 - repoBasePath: cfg.GitRepoBasePath, 35 + logger: log.SubLogger(l, "resyncer"), 36 + db: db, 37 + gitm: gitm, 38 + 42 39 repoFetchTimeout: cfg.GitRepoFetchTimeout, 43 - knotUseSSL: cfg.KnotUseSSL, 44 40 parallelism: cfg.ResyncParallelism, 45 41 } 46 42 } ··· 141 137 return false, nil 142 138 } 143 139 144 - repoPath := r.repoPath(repo) 145 - l := r.logger.With("repo", repo.DidSlashRepo(), "path", repoPath) 146 - 147 - remoteUrl, err := r.repoRemoteURL(repo) 148 - if err != nil { 149 - return false, fmt.Errorf("parsing knot url: %w", err) 150 - } 151 - l = l.With("url", remoteUrl) 152 - 153 - ctx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 154 - defer cancel() 155 - 156 140 // TODO: check if Knot is on backoff list. If so, return (false, nil) 157 - // TODO: use r.repoFetchTimeout on fetch 158 141 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 159 142 160 - // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 161 - gitclient := &CliGitMirrorClient{} 143 + fetchCtx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 144 + defer cancel() 162 145 163 - exist, err := isDir(repoPath) 164 - if err != nil { 165 - return false, fmt.Errorf("checking repo path: %w", err) 166 - } 167 - if !exist { 168 - if err := gitclient.Clone(ctx, repoPath, remoteUrl); err != nil { 169 - return false, err 170 - } 171 - } else { 172 - if err := gitclient.Fetch(ctx, repoPath, remoteUrl); err != nil { 173 - return false, err 174 - } 146 + if err := r.gitm.Sync(fetchCtx, repo); err != nil { 147 + return false, err 175 148 } 176 149 177 150 // repo.GitRev = <processed git.refUpdate revision> ··· 206 179 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 207 180 } 208 181 182 + // start a 1 min & go up to 1 hr between retries 209 183 var retryCount = repo.RetryCount + 1 210 - var retryAfter int64 211 - if retryCount >= 10 { 212 - state = models.RepoStateSuspended 213 - errMsg = fmt.Sprintf("too many resync fails: %s", errMsg) 214 - retryAfter = 0 215 - } else { 216 - // start a 1 min & go up to 1 hr between retries 217 - retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 218 - } 184 + var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 219 185 220 186 // remove null bytes 221 187 errMsg = strings.ReplaceAll(errMsg, "\x00", "") ··· 230 196 return err 231 197 } 232 198 233 - func (r *Resyncer) repoPath(repo *models.Repo) string { 234 - return path.Join(r.repoBasePath, repo.Did.String(), repo.Rkey.String()) 235 - } 236 - 237 - func (r *Resyncer) repoRemoteURL(repo *models.Repo) (string, error) { 238 - u, err := url.Parse(repo.KnotDomain) 239 - if err != nil { 240 - return "", err 241 - } 242 - if u.Scheme == "" { 243 - if r.knotUseSSL { 244 - u.Scheme = "https" 245 - } else { 246 - u.Scheme = "http" 247 - } 248 - } 249 - u = u.JoinPath(repo.DidSlashRepo()) 250 - return u.String(), nil 251 - } 252 - 253 199 func backoff(retries int, max int) time.Duration { 254 200 dur := min(1<<retries, max) 255 201 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 256 202 return time.Second*time.Duration(dur) + jitter 257 203 } 258 - 259 - func isDir(path string) (bool, error) { 260 - info, err := os.Stat(path) 261 - if err == nil && info.IsDir() { 262 - return true, nil 263 - } 264 - if os.IsNotExist(err) { 265 - return false, nil 266 - } 267 - return false, err 268 - }
+16 -3
knotmirror/tapclient.go
··· 24 24 cfg *config.Config 25 25 tap tapc.Client 26 26 db *sql.DB 27 + gitm GitMirrorManager 27 28 ks *knotstream.KnotStream 28 29 } 29 30 30 - func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, ks *knotstream.KnotStream) *Tap { 31 + func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap { 31 32 return &Tap{ 32 33 logger: log.SubLogger(l, "tapclient"), 33 34 cfg: cfg, 34 35 tap: tapc.NewClient(cfg.TapUrl, ""), 35 36 db: db, 37 + gitm: gitm, 36 38 ks: ks, 37 39 } 38 40 } ··· 87 89 errMsg = "suspending non-public knot" 88 90 } 89 91 90 - if err := db.UpsertRepo(ctx, t.db, &models.Repo{ 92 + repo := &models.Repo{ 91 93 Did: evt.Did, 92 94 Rkey: evt.Rkey, 93 95 Cid: evt.CID, ··· 95 97 KnotDomain: record.Knot, 96 98 State: status, 97 99 ErrorMsg: errMsg, 98 - }); err != nil { 100 + RetryAfter: 0, // clear retry info 101 + RetryCount: 0, 102 + } 103 + 104 + if evt.Action == tapc.RecordUpdateAction { 105 + // update git repo remote url 106 + if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil { 107 + return fmt.Errorf("updating git repo remote url: %w", err) 108 + } 109 + } 110 + 111 + if err := db.UpsertRepo(ctx, t.db, repo); err != nil { 99 112 return fmt.Errorf("upserting repo to db: %w", err) 100 113 } 101 114