A vibe coded tangled fork which supports pijul.
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/xrpc"
16 "github.com/bluesky-social/jetstream/pkg/models"
17 securejoin "github.com/cyphar/filepath-securejoin"
18 "tangled.org/core/api/tangled"
19 "tangled.org/core/knotserver/db"
20 "tangled.org/core/knotserver/git"
21
22 "tangled.org/core/log"
23 "tangled.org/core/rbac"
24 "tangled.org/core/workflow"
25)
26
27func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error {
28 l := log.FromContext(ctx)
29 raw := json.RawMessage(event.Commit.Record)
30 did := event.Did
31
32 var record tangled.PublicKey
33 if err := json.Unmarshal(raw, &record); err != nil {
34 return fmt.Errorf("failed to unmarshal record: %w", err)
35 }
36
37 pk := db.PublicKey{
38 Did: did,
39 PublicKey: record,
40 }
41 if err := h.db.AddPublicKey(pk); err != nil {
42 l.Error("failed to add public key", "error", err)
43 return fmt.Errorf("failed to add public key: %w", err)
44 }
45 l.Info("added public key from firehose", "did", did)
46 return nil
47}
48
49func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error {
50 l := log.FromContext(ctx)
51 raw := json.RawMessage(event.Commit.Record)
52 did := event.Did
53
54 var record tangled.KnotMember
55 if err := json.Unmarshal(raw, &record); err != nil {
56 return fmt.Errorf("failed to unmarshal record: %w", err)
57 }
58
59 if record.Domain != h.c.Server.Hostname {
60 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
61 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
62 }
63
64 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
65 if err != nil || !ok {
66 l.Error("failed to add member", "did", did)
67 return fmt.Errorf("failed to enforce permissions: %w", err)
68 }
69
70 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
71 l.Error("failed to add member", "error", err)
72 return fmt.Errorf("failed to add member: %w", err)
73 }
74 l.Info("added member from firehose", "member", record.Subject)
75
76 if err := h.db.AddDid(record.Subject); err != nil {
77 l.Error("failed to add did", "error", err)
78 return fmt.Errorf("failed to add did: %w", err)
79 }
80 h.jc.AddDid(record.Subject)
81
82 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
83 return fmt.Errorf("failed to fetch and add keys: %w", err)
84 }
85
86 return nil
87}
88
89func (h *Knot) processPull(ctx context.Context, event *models.Event) error {
90 raw := json.RawMessage(event.Commit.Record)
91 did := event.Did
92
93 var record tangled.RepoPull
94 if err := json.Unmarshal(raw, &record); err != nil {
95 return fmt.Errorf("failed to unmarshal record: %w", err)
96 }
97
98 l := log.FromContext(ctx)
99 l = l.With("handler", "processPull")
100 l = l.With("did", did)
101
102 if record.Target == nil {
103 return fmt.Errorf("ignoring pull record: target repo is nil")
104 }
105
106 l = l.With("target_repo", record.Target.Repo)
107 l = l.With("target_branch", record.Target.Branch)
108
109 if record.Source == nil {
110 return fmt.Errorf("ignoring pull record: not a branch-based pull request")
111 }
112
113 if record.Source.Repo != nil {
114 return fmt.Errorf("ignoring pull record: fork based pull")
115 }
116
117 repoAt, err := syntax.ParseATURI(record.Target.Repo)
118 if err != nil {
119 return fmt.Errorf("failed to parse ATURI: %w", err)
120 }
121
122 // resolve this aturi to extract the repo record
123 ident, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
124 if err != nil || ident.Handle.IsInvalidHandle() {
125 return fmt.Errorf("failed to resolve handle: %w", err)
126 }
127
128 xrpcc := xrpc.Client{
129 Host: ident.PDSEndpoint(),
130 }
131
132 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
133 if err != nil {
134 return fmt.Errorf("failed to resolver repo: %w", err)
135 }
136
137 repo := resp.Value.Val.(*tangled.Repo)
138
139 if repo.Knot != h.c.Server.Hostname {
140 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
141 }
142
143 didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name)
144 if err != nil {
145 return fmt.Errorf("failed to construct relative repo path: %w", err)
146 }
147
148 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
149 if err != nil {
150 return fmt.Errorf("failed to construct absolute repo path: %w", err)
151 }
152
153 gr, err := git.Open(repoPath, record.Source.Sha)
154 if err != nil {
155 return fmt.Errorf("failed to open git repository: %w", err)
156 }
157
158 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
159 if err != nil {
160 return fmt.Errorf("failed to open workflow directory: %w", err)
161 }
162
163 var pipeline workflow.RawPipeline
164 for _, e := range workflowDir {
165 if !e.IsFile() {
166 continue
167 }
168
169 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
170 contents, err := gr.RawContent(fpath)
171 if err != nil {
172 continue
173 }
174
175 pipeline = append(pipeline, workflow.RawWorkflow{
176 Name: e.Name,
177 Contents: contents,
178 })
179 }
180
181 trigger := tangled.Pipeline_PullRequestTriggerData{
182 Action: "create",
183 SourceBranch: record.Source.Branch,
184 SourceSha: record.Source.Sha,
185 TargetBranch: record.Target.Branch,
186 }
187
188 compiler := workflow.Compiler{
189 Trigger: tangled.Pipeline_TriggerMetadata{
190 Kind: string(workflow.TriggerKindPullRequest),
191 PullRequest: &trigger,
192 Repo: &tangled.Pipeline_TriggerRepo{
193 Did: ident.DID.String(),
194 Knot: repo.Knot,
195 Repo: repo.Name,
196 },
197 },
198 }
199
200 cp := compiler.Compile(compiler.Parse(pipeline))
201 eventJson, err := json.Marshal(cp)
202 if err != nil {
203 return fmt.Errorf("failed to marshal pipeline event: %w", err)
204 }
205
206 // do not run empty pipelines
207 if cp.Workflows == nil {
208 return nil
209 }
210
211 ev := db.Event{
212 Rkey: TID(),
213 Nsid: tangled.PipelineNSID,
214 EventJson: string(eventJson),
215 }
216
217 return h.db.InsertEvent(ev, h.n)
218}
219
220// duplicated from add collaborator
221func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error {
222 raw := json.RawMessage(event.Commit.Record)
223 did := event.Did
224
225 var record tangled.RepoCollaborator
226 if err := json.Unmarshal(raw, &record); err != nil {
227 return fmt.Errorf("failed to unmarshal record: %w", err)
228 }
229
230 repoAt, err := syntax.ParseATURI(record.Repo)
231 if err != nil {
232 return err
233 }
234
235 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
236 if err != nil || subjectId.Handle.IsInvalidHandle() {
237 return err
238 }
239
240 // TODO: fix this for good, we need to fetch the record here unfortunately
241 // resolve this aturi to extract the repo record
242 owner, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
243 if err != nil || owner.Handle.IsInvalidHandle() {
244 return fmt.Errorf("failed to resolve handle: %w", err)
245 }
246
247 xrpcc := xrpc.Client{
248 Host: owner.PDSEndpoint(),
249 }
250
251 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
252 if err != nil {
253 return err
254 }
255
256 repo := resp.Value.Val.(*tangled.Repo)
257 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
258
259 // check perms for this user
260 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo)
261 if err != nil {
262 return fmt.Errorf("failed to check permissions: %w", err)
263 }
264 if !ok {
265 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo)
266 }
267
268 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
269 return err
270 }
271 h.jc.AddDid(subjectId.DID.String())
272
273 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
274 return err
275 }
276
277 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
278}
279
280func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
281 l := log.FromContext(ctx)
282
283 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
284 if err != nil {
285 l.Error("error building endpoint url", "did", did, "error", err.Error())
286 return fmt.Errorf("error building endpoint url: %w", err)
287 }
288
289 resp, err := http.Get(keysEndpoint)
290 if err != nil {
291 l.Error("error getting keys", "did", did, "error", err)
292 return fmt.Errorf("error getting keys: %w", err)
293 }
294 defer resp.Body.Close()
295
296 if resp.StatusCode == http.StatusNotFound {
297 l.Info("no keys found for did", "did", did)
298 return nil
299 }
300
301 plaintext, err := io.ReadAll(resp.Body)
302 if err != nil {
303 l.Error("error reading response body", "error", err)
304 return fmt.Errorf("error reading response body: %w", err)
305 }
306
307 for key := range strings.SplitSeq(string(plaintext), "\n") {
308 if key == "" {
309 continue
310 }
311 pk := db.PublicKey{
312 Did: did,
313 }
314 pk.Key = key
315 if err := h.db.AddPublicKey(pk); err != nil {
316 l.Error("failed to add public key", "error", err)
317 return fmt.Errorf("failed to add public key: %w", err)
318 }
319 }
320 return nil
321}
322
323func (h *Knot) processMessages(ctx context.Context, event *models.Event) error {
324 if event.Kind != models.EventKindCommit {
325 return nil
326 }
327
328 var err error
329 defer func() {
330 eventTime := event.TimeUS
331 lastTimeUs := eventTime + 1
332 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
333 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
334 }
335 }()
336
337 switch event.Commit.Collection {
338 case tangled.PublicKeyNSID:
339 err = h.processPublicKey(ctx, event)
340 case tangled.KnotMemberNSID:
341 err = h.processKnotMember(ctx, event)
342 case tangled.RepoPullNSID:
343 err = h.processPull(ctx, event)
344 case tangled.RepoCollaboratorNSID:
345 err = h.processCollaborator(ctx, event)
346 }
347
348 if err != nil {
349 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
350 }
351
352 return nil
353}