A vibe coded tangled fork which supports pijul.

knotmirror/proxy: proxy in case of knotmirror err

We ought to harden the knotmirror in case we get errors
from tap or anywhere else, in situations such that the
appview knows about a git repo somewhere but the knot-
mirror doesn't. I would call this good practice in general
so that we have robust infrastructure and other possible
future appviews would also be able to trust that the
knotmirror will serve them repos that in fact exist.

Lewis: May this revision serve well! <lewis@tangled.org>

authored by

Lewis and committed by tangled.org 7b3842c8 78adcf9c

+210 -40
+12 -3
knotmirror/xrpc/git_get_archive.go
··· 42 42 43 43 repoPath, err := x.makeRepoPath(ctx, repo) 44 44 if err != nil { 45 - l.Error("failed to resolve repo at-uri", "err", err) 45 + l.Warn("local mirror failed, trying proxy", "err", err) 46 + if x.proxyToKnot(w, r, repo) { 47 + return 48 + } 46 49 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to resolve repo"}) 47 50 return 48 51 } 49 52 50 53 gr, err := git.Open(repoPath, ref) 51 54 if err != nil { 52 - l.Error("failed to open git repo", "err", err) 55 + l.Warn("local mirror failed, trying proxy", "err", err) 56 + if x.proxyToKnot(w, r, repo) { 57 + return 58 + } 53 59 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to open git repo"}) 54 60 return 55 61 } ··· 65 71 return r.Name, nil 66 72 }() 67 73 if err != nil { 68 - l.Error("failed to get repo name", "err", err) 74 + l.Warn("local mirror failed, trying proxy", "err", err) 75 + if x.proxyToKnot(w, r, repo) { 76 + return 77 + } 69 78 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to retrieve repo name"}) 70 79 return 71 80 }
+4 -2
knotmirror/xrpc/git_get_blob.go
··· 35 35 36 36 file, err := x.getFile(r.Context(), repo, ref, path) 37 37 if err != nil { 38 - // TODO: better error return 39 - l.Error("failed to get blob", "err", err) 38 + l.Warn("local mirror failed, trying proxy", "err", err) 39 + if x.proxyToKnot(w, r, repo) { 40 + return 41 + } 40 42 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to get blob"}) 41 43 return 42 44 }
+4 -4
knotmirror/xrpc/git_get_branch.go
··· 33 33 } 34 34 branchName, _ := url.PathUnescape(nameQuery) 35 35 36 - l := x.logger.With("repo", repo, "branch", branchName) 37 - 38 36 out, err := x.getBranch(r.Context(), repo, branchName) 39 37 if err != nil { 40 - // TODO: better error return 41 - l.Error("failed to get branch", "err", err) 38 + x.logger.Warn("local mirror failed, trying proxy", "repo", repo, "err", err) 39 + if x.proxyToKnot(w, r, repo) { 40 + return 41 + } 42 42 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to get branch"}) 43 43 return 44 44 }
+4 -4
knotmirror/xrpc/git_get_tag.go
··· 30 30 return 31 31 } 32 32 33 - l := x.logger.With("repo", repo, "tag", tagName) 34 - 35 33 out, err := x.getTag(r.Context(), repo, tagName) 36 34 if err != nil { 37 - // TODO: better error return 38 - l.Error("failed to get tag", "err", err) 35 + x.logger.Warn("local mirror failed, trying proxy", "repo", repo, "err", err) 36 + if x.proxyToKnot(w, r, repo) { 37 + return 38 + } 39 39 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to get tag"}) 40 40 return 41 41 }
+4 -4
knotmirror/xrpc/git_get_tree.go
··· 28 28 return 29 29 } 30 30 31 - l := x.logger.With("repo", repo, "ref", ref, "path", path) 32 - 33 31 out, err := x.getTree(r.Context(), repo, ref, path) 34 32 if err != nil { 35 - // TODO: better error return 36 - l.Error("failed to get tree", "err", err) 33 + x.logger.Warn("local mirror failed, trying proxy", "repo", repo, "err", err) 34 + if x.proxyToKnot(w, r, repo) { 35 + return 36 + } 37 37 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to get tree"}) 38 38 return 39 39 }
+4 -4
knotmirror/xrpc/git_list_branches.go
··· 44 44 } 45 45 } 46 46 47 - l := x.logger.With("repo", repoQuery, "limit", limit, "cursor", cursor) 48 - 49 47 out, err := x.listBranches(r.Context(), repo, limit, cursor) 50 48 if err != nil { 51 - // TODO: better error return 52 - l.Error("failed to list branches", "err", err) 49 + x.logger.Warn("local mirror failed, trying proxy", "repo", repo, "err", err) 50 + if x.proxyToKnot(w, r, repo) { 51 + return 52 + } 53 53 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to list branches"}) 54 54 return 55 55 }
+4 -4
knotmirror/xrpc/git_list_commits.go
··· 44 44 } 45 45 } 46 46 47 - l := x.logger.With("repo", repo, "ref", ref) 48 - 49 47 out, err := x.listCommits(r.Context(), repo, ref, limit, cursor) 50 48 if err != nil { 51 - // TODO: better error return 52 - l.Error("failed to list commits", "err", err) 49 + x.logger.Warn("local mirror failed, trying proxy", "repo", repo, "err", err) 50 + if x.proxyToKnot(w, r, repo) { 51 + return 52 + } 53 53 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to list commits"}) 54 54 return 55 55 }
+4 -1
knotmirror/xrpc/git_list_languages.go
··· 29 29 30 30 out, err := x.listLanguages(r.Context(), repo, ref) 31 31 if err != nil { 32 - l.Error("failed to list languages", "err", err) 32 + l.Warn("local mirror failed, trying proxy", "err", err) 33 + if x.proxyToKnot(w, r, repo) { 34 + return 35 + } 33 36 writeErr(w, err) 34 37 return 35 38 }
+4 -4
knotmirror/xrpc/git_list_tags.go
··· 45 45 } 46 46 } 47 47 48 - l := x.logger.With("repo", repo, "limit", limit, "cursor", cursor) 49 - 50 48 out, err := x.listTags(r.Context(), repo, limit, cursor) 51 49 if err != nil { 52 - // TODO: better error return 53 - l.Error("failed to list tags", "err", err) 50 + x.logger.Warn("local mirror failed, trying proxy", "repo", repo, "err", err) 51 + if x.proxyToKnot(w, r, repo) { 52 + return 53 + } 54 54 writeJson(w, http.StatusInternalServerError, atclient.ErrorBody{Name: "InternalServerError", Message: "failed to list tags"}) 55 55 return 56 56 }
+151
knotmirror/xrpc/proxy.go
··· 1 + package xrpc 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "io" 7 + "net/http" 8 + "net/url" 9 + "strings" 10 + 11 + "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 14 + "tangled.org/core/api/tangled" 15 + "tangled.org/core/knotmirror/db" 16 + "tangled.org/core/knotmirror/models" 17 + ) 18 + 19 + var mirrorToKnotNSID = map[string]string{ 20 + tangled.GitTempListBranchesNSID: tangled.RepoBranchesNSID, 21 + tangled.GitTempListTagsNSID: tangled.RepoTagsNSID, 22 + tangled.GitTempListCommitsNSID: tangled.RepoLogNSID, 23 + tangled.GitTempGetTreeNSID: tangled.RepoTreeNSID, 24 + tangled.GitTempGetBranchNSID: tangled.RepoBranchNSID, 25 + tangled.GitTempGetBlobNSID: tangled.RepoBlobNSID, 26 + tangled.GitTempGetTagNSID: tangled.RepoTagNSID, 27 + tangled.GitTempGetArchiveNSID: tangled.RepoArchiveNSID, 28 + tangled.GitTempListLanguagesNSID: tangled.RepoLanguagesNSID, 29 + } 30 + 31 + var hopByHopHeaders = map[string]bool{ 32 + "Connection": true, 33 + "Keep-Alive": true, 34 + "Transfer-Encoding": true, 35 + "Te": true, 36 + "Trailer": true, 37 + "Upgrade": true, 38 + "Proxy-Authorization": true, 39 + "Proxy-Authenticate": true, 40 + } 41 + 42 + type knotInfo struct { 43 + baseURL string 44 + didSlashRepo string 45 + } 46 + 47 + func (x *Xrpc) resolveKnot(ctx context.Context, repoAt syntax.ATURI) (*knotInfo, error) { 48 + repo, err := db.GetRepoByAtUri(ctx, x.db, repoAt) 49 + if err == nil && repo != nil { 50 + if repo.State != models.RepoStatePending && repo.State != models.RepoStateResyncing { 51 + go func() { 52 + if err := db.UpdateRepoState(context.Background(), x.db, repo.Did, repo.Rkey, models.RepoStatePending); err != nil { 53 + x.logger.Error("failed to mark repo for resync after proxy", "err", err) 54 + } 55 + }() 56 + } 57 + return &knotInfo{baseURL: repo.KnotDomain, didSlashRepo: repo.DidSlashRepo()}, nil 58 + } 59 + 60 + owner, err := x.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 61 + if err != nil { 62 + return nil, fmt.Errorf("resolving repo owner: %w", err) 63 + } 64 + 65 + xrpcc := indigoxrpc.Client{Host: owner.PDSEndpoint()} 66 + out, err := atproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 67 + if err != nil { 68 + return nil, fmt.Errorf("fetching repo record from PDS: %w", err) 69 + } 70 + 71 + record := out.Value.Val.(*tangled.Repo) 72 + knotURL := record.Knot 73 + if !strings.Contains(knotURL, "://") { 74 + scheme := "http" 75 + if x.cfg.KnotUseSSL { 76 + scheme = "https" 77 + } 78 + knotURL = scheme + "://" + knotURL 79 + } 80 + 81 + go func() { 82 + bgCtx := context.Background() 83 + pending := &models.Repo{ 84 + Did: owner.DID, 85 + Rkey: repoAt.RecordKey(), 86 + Cid: (*syntax.CID)(out.Cid), 87 + Name: record.Name, 88 + KnotDomain: knotURL, 89 + State: models.RepoStatePending, 90 + } 91 + if upsertErr := db.UpsertRepo(bgCtx, x.db, pending); upsertErr != nil { 92 + x.logger.Error("failed to upsert repo after proxy resolution", "err", upsertErr) 93 + } 94 + }() 95 + 96 + return &knotInfo{ 97 + baseURL: knotURL, 98 + didSlashRepo: fmt.Sprintf("%s/%s", owner.DID, record.Name), 99 + }, nil 100 + } 101 + 102 + func (x *Xrpc) proxyToKnot(w http.ResponseWriter, r *http.Request, repoAt syntax.ATURI) bool { 103 + mirrorNSID := strings.TrimPrefix(r.URL.Path, "/xrpc/") 104 + knotNSID, ok := mirrorToKnotNSID[mirrorNSID] 105 + if !ok { 106 + return false 107 + } 108 + 109 + knot, err := x.resolveKnot(r.Context(), repoAt) 110 + if err != nil { 111 + x.logger.Warn("proxy: failed to resolve knot", "repo", repoAt, "err", err) 112 + return false 113 + } 114 + 115 + params := make(url.Values) 116 + for k, v := range r.URL.Query() { 117 + params[k] = v 118 + } 119 + params.Set("repo", knot.didSlashRepo) 120 + 121 + target := fmt.Sprintf("%s/xrpc/%s?%s", knot.baseURL, knotNSID, params.Encode()) 122 + 123 + req, err := http.NewRequestWithContext(r.Context(), http.MethodGet, target, nil) 124 + if err != nil { 125 + x.logger.Warn("proxy: failed to build request", "target", target, "err", err) 126 + return false 127 + } 128 + 129 + resp, err := x.httpClient.Do(req) 130 + if err != nil { 131 + x.logger.Warn("proxy: knot request failed", "target", target, "err", err) 132 + return false 133 + } 134 + defer resp.Body.Close() 135 + 136 + for k, vv := range resp.Header { 137 + if hopByHopHeaders[k] { 138 + continue 139 + } 140 + for _, v := range vv { 141 + w.Header().Add(k, v) 142 + } 143 + } 144 + w.WriteHeader(resp.StatusCode) 145 + if _, err := io.Copy(w, resp.Body); err != nil { 146 + x.logger.Warn("proxy: response copy interrupted", "target", target, "err", err) 147 + } 148 + 149 + x.logger.Info("proxy: served from knot", "repo", repoAt, "knot", knot.baseURL, "status", resp.StatusCode) 150 + return true 151 + }
+15 -10
knotmirror/xrpc/xrpc.go
··· 6 6 "errors" 7 7 "log/slog" 8 8 "net/http" 9 + "time" 9 10 10 11 "github.com/bluesky-social/indigo/atproto/atclient" 11 12 "github.com/go-chi/chi/v5" ··· 17 18 ) 18 19 19 20 type Xrpc struct { 20 - cfg *config.Config 21 - db *sql.DB 22 - resolver *idresolver.Resolver 23 - ks *knotstream.KnotStream 24 - logger *slog.Logger 21 + cfg *config.Config 22 + db *sql.DB 23 + resolver *idresolver.Resolver 24 + ks *knotstream.KnotStream 25 + logger *slog.Logger 26 + httpClient *http.Client 25 27 } 26 28 27 29 func New(logger *slog.Logger, cfg *config.Config, db *sql.DB, resolver *idresolver.Resolver, ks *knotstream.KnotStream) *Xrpc { 28 30 return &Xrpc{ 29 - cfg, 30 - db, 31 - resolver, 32 - ks, 33 - log.SubLogger(logger, "xrpc"), 31 + cfg: cfg, 32 + db: db, 33 + resolver: resolver, 34 + ks: ks, 35 + logger: log.SubLogger(logger, "xrpc"), 36 + httpClient: &http.Client{ 37 + Timeout: 30 * time.Second, 38 + }, 34 39 } 35 40 } 36 41