A vibe coded tangled fork which supports pijul.
at master 151 lines 4.4 kB view raw
1package xrpc 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "net/http" 8 "net/url" 9 "strings" 10 11 "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 indigoxrpc "github.com/bluesky-social/indigo/xrpc" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/knotmirror/db" 16 "tangled.org/core/knotmirror/models" 17) 18 19var mirrorToKnotNSID = map[string]string{ 20 tangled.GitTempListBranchesNSID: tangled.RepoBranchesNSID, 21 tangled.GitTempListTagsNSID: tangled.RepoTagsNSID, 22 tangled.GitTempListCommitsNSID: tangled.RepoLogNSID, 23 tangled.GitTempGetTreeNSID: tangled.RepoTreeNSID, 24 tangled.GitTempGetBranchNSID: tangled.RepoBranchNSID, 25 tangled.GitTempGetBlobNSID: tangled.RepoBlobNSID, 26 tangled.GitTempGetTagNSID: tangled.RepoTagNSID, 27 tangled.GitTempGetArchiveNSID: tangled.RepoArchiveNSID, 28 tangled.GitTempListLanguagesNSID: tangled.RepoLanguagesNSID, 29} 30 31var hopByHopHeaders = map[string]bool{ 32 "Connection": true, 33 "Keep-Alive": true, 34 "Transfer-Encoding": true, 35 "Te": true, 36 "Trailer": true, 37 "Upgrade": true, 38 "Proxy-Authorization": true, 39 "Proxy-Authenticate": true, 40} 41 42type knotInfo struct { 43 baseURL string 44 didSlashRepo string 45} 46 47func (x *Xrpc) resolveKnot(ctx context.Context, repoAt syntax.ATURI) (*knotInfo, error) { 48 repo, err := db.GetRepoByAtUri(ctx, x.db, repoAt) 49 if err == nil && repo != nil { 50 if repo.State != models.RepoStatePending && repo.State != models.RepoStateResyncing { 51 go func() { 52 if err := db.UpdateRepoState(context.Background(), x.db, repo.Did, repo.Rkey, models.RepoStatePending); err != nil { 53 x.logger.Error("failed to mark repo for resync after proxy", "err", err) 54 } 55 }() 56 } 57 return &knotInfo{baseURL: repo.KnotDomain, didSlashRepo: repo.DidSlashRepo()}, nil 58 } 59 60 owner, err := x.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 61 if err != nil { 62 return nil, fmt.Errorf("resolving repo owner: %w", err) 63 } 64 65 xrpcc := indigoxrpc.Client{Host: owner.PDSEndpoint()} 66 out, err := atproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 67 if err != nil { 68 return nil, fmt.Errorf("fetching repo record from PDS: %w", err) 69 } 70 71 record := out.Value.Val.(*tangled.Repo) 72 knotURL := record.Knot 73 if !strings.Contains(knotURL, "://") { 74 scheme := "http" 75 if x.cfg.KnotUseSSL { 76 scheme = "https" 77 } 78 knotURL = scheme + "://" + knotURL 79 } 80 81 go func() { 82 bgCtx := context.Background() 83 pending := &models.Repo{ 84 Did: owner.DID, 85 Rkey: repoAt.RecordKey(), 86 Cid: (*syntax.CID)(out.Cid), 87 Name: record.Name, 88 KnotDomain: knotURL, 89 State: models.RepoStatePending, 90 } 91 if upsertErr := db.UpsertRepo(bgCtx, x.db, pending); upsertErr != nil { 92 x.logger.Error("failed to upsert repo after proxy resolution", "err", upsertErr) 93 } 94 }() 95 96 return &knotInfo{ 97 baseURL: knotURL, 98 didSlashRepo: fmt.Sprintf("%s/%s", owner.DID, record.Name), 99 }, nil 100} 101 102func (x *Xrpc) proxyToKnot(w http.ResponseWriter, r *http.Request, repoAt syntax.ATURI) bool { 103 mirrorNSID := strings.TrimPrefix(r.URL.Path, "/xrpc/") 104 knotNSID, ok := mirrorToKnotNSID[mirrorNSID] 105 if !ok { 106 return false 107 } 108 109 knot, err := x.resolveKnot(r.Context(), repoAt) 110 if err != nil { 111 x.logger.Warn("proxy: failed to resolve knot", "repo", repoAt, "err", err) 112 return false 113 } 114 115 params := make(url.Values) 116 for k, v := range r.URL.Query() { 117 params[k] = v 118 } 119 params.Set("repo", knot.didSlashRepo) 120 121 target := fmt.Sprintf("%s/xrpc/%s?%s", knot.baseURL, knotNSID, params.Encode()) 122 123 req, err := http.NewRequestWithContext(r.Context(), http.MethodGet, target, nil) 124 if err != nil { 125 x.logger.Warn("proxy: failed to build request", "target", target, "err", err) 126 return false 127 } 128 129 resp, err := x.httpClient.Do(req) 130 if err != nil { 131 x.logger.Warn("proxy: knot request failed", "target", target, "err", err) 132 return false 133 } 134 defer resp.Body.Close() 135 136 for k, vv := range resp.Header { 137 if hopByHopHeaders[k] { 138 continue 139 } 140 for _, v := range vv { 141 w.Header().Add(k, v) 142 } 143 } 144 w.WriteHeader(resp.StatusCode) 145 if _, err := io.Copy(w, resp.Body); err != nil { 146 x.logger.Warn("proxy: response copy interrupted", "target", target, "err", err) 147 } 148 149 x.logger.Info("proxy: served from knot", "repo", repoAt, "knot", knot.baseURL, "status", resp.StatusCode) 150 return true 151}