A vibe coded tangled fork which supports pijul.
1package knotserver
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "net/url"
10 "path/filepath"
11 "strings"
12
13 comatproto "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/atproto/identity"
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/xrpc"
17 jmodels "github.com/bluesky-social/jetstream/pkg/models"
18 securejoin "github.com/cyphar/filepath-securejoin"
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/knotserver/db"
22 "tangled.org/core/knotserver/git"
23 "tangled.org/core/log"
24 "tangled.org/core/rbac"
25 "tangled.org/core/workflow"
26)
27
28func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error {
29 l := log.FromContext(ctx)
30 raw := json.RawMessage(event.Commit.Record)
31 did := event.Did
32
33 var record tangled.PublicKey
34 if err := json.Unmarshal(raw, &record); err != nil {
35 return fmt.Errorf("failed to unmarshal record: %w", err)
36 }
37
38 pk := db.PublicKey{
39 Did: did,
40 PublicKey: record,
41 }
42 if err := h.db.AddPublicKey(pk); err != nil {
43 l.Error("failed to add public key", "error", err)
44 return fmt.Errorf("failed to add public key: %w", err)
45 }
46 l.Info("added public key from firehose", "did", did)
47 return nil
48}
49
50func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error {
51 l := log.FromContext(ctx)
52 raw := json.RawMessage(event.Commit.Record)
53 did := event.Did
54
55 var record tangled.KnotMember
56 if err := json.Unmarshal(raw, &record); err != nil {
57 return fmt.Errorf("failed to unmarshal record: %w", err)
58 }
59
60 if record.Domain != h.c.Server.Hostname {
61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname)
62 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname)
63 }
64
65 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite")
66 if err != nil || !ok {
67 l.Error("failed to add member", "did", did)
68 return fmt.Errorf("failed to enforce permissions: %w", err)
69 }
70
71 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil {
72 l.Error("failed to add member", "error", err)
73 return fmt.Errorf("failed to add member: %w", err)
74 }
75 l.Info("added member from firehose", "member", record.Subject)
76
77 if err := h.db.AddDid(record.Subject); err != nil {
78 l.Error("failed to add did", "error", err)
79 return fmt.Errorf("failed to add did: %w", err)
80 }
81 h.jc.AddDid(record.Subject)
82
83 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil {
84 return fmt.Errorf("failed to fetch and add keys: %w", err)
85 }
86
87 return nil
88}
89
90func (h *Knot) validatePullRecord(record *tangled.RepoPull) error {
91 if record.Target == nil {
92 return fmt.Errorf("ignoring pull record: target repo is nil")
93 }
94
95 if record.Source == nil {
96 return fmt.Errorf("ignoring pull record: not a branch-based pull request")
97 }
98
99 if record.Source.Repo != nil {
100 return fmt.Errorf("ignoring pull record: fork based pull")
101 }
102
103 return nil
104}
105
106func (h *Knot) resolveTargetRepo(ctx context.Context, targetRepoUri string) (*identity.Identity, *tangled.Repo, error) {
107 repoAt, err := syntax.ParseATURI(targetRepoUri)
108 if err != nil {
109 return nil, nil, fmt.Errorf("failed to parse ATURI: %w", err)
110 }
111
112 // resolve the repo owner to extract the repo record
113 repoOwnerIdent, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
114 if err != nil || repoOwnerIdent.Handle.IsInvalidHandle() {
115 return nil, nil, fmt.Errorf("failed to resolve repo owner handle: %w", err)
116 }
117
118 xrpcc := xrpc.Client{
119 Host: repoOwnerIdent.PDSEndpoint(),
120 }
121
122 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
123 if err != nil {
124 return nil, nil, fmt.Errorf("failed to resolve repo: %w", err)
125 }
126
127 repo := resp.Value.Val.(*tangled.Repo)
128 return repoOwnerIdent, repo, nil
129}
130
131func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) {
132 // resolve the PR owner's identity to fetch the blob from their PDS
133 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did)
134 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() {
135 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err)
136 }
137
138 roundNumber := len(record.Rounds) - 1
139 round := record.Rounds[roundNumber]
140
141 // fetch the blob from the PR owner's PDS
142 prOwnerPds := prOwnerIdent.PDSEndpoint()
143 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds))
144 if err != nil {
145 return nil, fmt.Errorf("failed to construct blob URL: %w", err)
146 }
147 q := blobUrl.Query()
148 q.Set("cid", round.PatchBlob.Ref.String())
149 q.Set("did", did)
150 blobUrl.RawQuery = q.Encode()
151
152 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil)
153 if err != nil {
154 return nil, fmt.Errorf("failed to create blob request: %w", err)
155 }
156 req.Header.Set("Content-Type", "application/json")
157
158 blobResp, err := http.DefaultClient.Do(req)
159 if err != nil {
160 return nil, fmt.Errorf("failed to fetch blob: %w", err)
161 }
162 defer blobResp.Body.Close()
163
164 blob := io.ReadCloser(blobResp.Body)
165 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob)
166 if err != nil {
167 return nil, fmt.Errorf("failed to parse submission: %w", err)
168 }
169
170 return latestSubmission, nil
171}
172
173func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) {
174 gr, err := git.Open(repoPath, sha)
175 if err != nil {
176 return nil, fmt.Errorf("failed to open git repository: %w", err)
177 }
178
179 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
180 if err != nil {
181 return nil, fmt.Errorf("failed to open workflow directory: %w", err)
182 }
183
184 var pipeline workflow.RawPipeline
185 for _, e := range workflowDir {
186 if !e.IsFile() {
187 continue
188 }
189
190 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
191 contents, err := gr.RawContent(fpath)
192 if err != nil {
193 continue
194 }
195
196 pipeline = append(pipeline, workflow.RawWorkflow{
197 Name: e.Name,
198 Contents: contents,
199 })
200 }
201
202 return pipeline, nil
203}
204
205func (h *Knot) compilePipeline(ctx context.Context, repoOwner *identity.Identity, repo *tangled.Repo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline {
206 l := log.FromContext(ctx)
207
208 trigger := tangled.Pipeline_PullRequestTriggerData{
209 Action: "create",
210 SourceBranch: sourceBranch,
211 SourceSha: sourceSha,
212 TargetBranch: targetBranch,
213 }
214
215 compiler := workflow.Compiler{
216 Trigger: tangled.Pipeline_TriggerMetadata{
217 Kind: string(workflow.TriggerKindPullRequest),
218 PullRequest: &trigger,
219 Repo: &tangled.Pipeline_TriggerRepo{
220 Did: repoOwner.DID.String(),
221 Knot: repo.Knot,
222 Repo: repo.Name,
223 },
224 },
225 }
226
227 l.Info("raw", "raw", rawPipeline)
228 parsed := compiler.Parse(rawPipeline)
229 l.Info("parsed", "parsed", parsed)
230 compiled := compiler.Compile(parsed)
231
232 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics)
233
234 return compiled
235}
236
237func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error {
238 raw := json.RawMessage(event.Commit.Record)
239 rkey := event.Commit.RKey
240 did := event.Did
241
242 var record tangled.RepoPull
243 if err := json.Unmarshal(raw, &record); err != nil {
244 return fmt.Errorf("failed to unmarshal record: %w", err)
245 }
246
247 l := log.FromContext(ctx)
248 l = l.With("handler", "processPull")
249 l = l.With("did", did)
250
251 l.Info("validating pull record")
252 if err := h.validatePullRecord(&record); err != nil {
253 return err
254 }
255
256 l = l.With("target_repo", record.Target.Repo)
257 l = l.With("target_branch", record.Target.Branch)
258
259 l.Info("resolving target repo")
260 repoOwnerIdent, repo, err := h.resolveTargetRepo(ctx, record.Target.Repo)
261 if err != nil {
262 return err
263 }
264
265 if repo.Knot != h.c.Server.Hostname {
266 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname)
267 }
268
269 l.Info("fetching latest submission")
270 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record)
271 if err != nil {
272 return err
273 }
274
275 sha := latestSubmission.SourceRev
276 if sha == "" {
277 return fmt.Errorf("failed to extract source SHA from pull submission")
278 }
279 l = l.With("sha", sha)
280
281 l.Info("constructing repo path")
282 didSlashRepo, err := securejoin.SecureJoin(repoOwnerIdent.DID.String(), repo.Name)
283 if err != nil {
284 return fmt.Errorf("failed to construct relative repo path: %w", err)
285 }
286
287 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo)
288 if err != nil {
289 return fmt.Errorf("failed to construct absolute repo path: %w", err)
290 }
291
292 l.Info("discovering workflows", "repo_path", repoPath)
293 pipeline, err := h.discoverWorkflows(ctx, repoPath, sha)
294 if err != nil {
295 return err
296 }
297
298 l.Info("compiling pipeline", "workflow_count", len(pipeline))
299 cp := h.compilePipeline(ctx, repoOwnerIdent, repo, record.Source.Branch, sha, record.Target.Branch, pipeline)
300
301 // do not run empty pipelines
302 if cp.Workflows == nil {
303 l.Info("skipping empty pipeline")
304 return nil
305 }
306
307 l.Info("marshaling pipeline event")
308 eventJson, err := json.Marshal(cp)
309 if err != nil {
310 return fmt.Errorf("failed to marshal pipeline event: %w", err)
311 }
312
313 ev := db.Event{
314 Rkey: TID(),
315 Nsid: tangled.PipelineNSID,
316 EventJson: string(eventJson),
317 }
318
319 l.Info("inserting pipeline event")
320 return h.db.InsertEvent(ev, h.n)
321}
322
323// duplicated from add collaborator
324func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error {
325 raw := json.RawMessage(event.Commit.Record)
326 did := event.Did
327
328 var record tangled.RepoCollaborator
329 if err := json.Unmarshal(raw, &record); err != nil {
330 return fmt.Errorf("failed to unmarshal record: %w", err)
331 }
332
333 repoAt, err := syntax.ParseATURI(record.Repo)
334 if err != nil {
335 return err
336 }
337
338 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject)
339 if err != nil || subjectId.Handle.IsInvalidHandle() {
340 return err
341 }
342
343 // TODO: fix this for good, we need to fetch the record here unfortunately
344 // resolve this aturi to extract the repo record
345 owner, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String())
346 if err != nil || owner.Handle.IsInvalidHandle() {
347 return fmt.Errorf("failed to resolve handle: %w", err)
348 }
349
350 xrpcc := xrpc.Client{
351 Host: owner.PDSEndpoint(),
352 }
353
354 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String())
355 if err != nil {
356 return err
357 }
358
359 repo := resp.Value.Val.(*tangled.Repo)
360 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name)
361
362 // check perms for this user
363 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo)
364 if err != nil {
365 return fmt.Errorf("failed to check permissions: %w", err)
366 }
367 if !ok {
368 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo)
369 }
370
371 if err := h.db.AddDid(subjectId.DID.String()); err != nil {
372 return err
373 }
374 h.jc.AddDid(subjectId.DID.String())
375
376 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil {
377 return err
378 }
379
380 return h.fetchAndAddKeys(ctx, subjectId.DID.String())
381}
382
383func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error {
384 l := log.FromContext(ctx)
385
386 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did)
387 if err != nil {
388 l.Error("error building endpoint url", "did", did, "error", err.Error())
389 return fmt.Errorf("error building endpoint url: %w", err)
390 }
391
392 resp, err := http.Get(keysEndpoint)
393 if err != nil {
394 l.Error("error getting keys", "did", did, "error", err)
395 return fmt.Errorf("error getting keys: %w", err)
396 }
397 defer resp.Body.Close()
398
399 if resp.StatusCode == http.StatusNotFound {
400 l.Info("no keys found for did", "did", did)
401 return nil
402 }
403
404 plaintext, err := io.ReadAll(resp.Body)
405 if err != nil {
406 l.Error("error reading response body", "error", err)
407 return fmt.Errorf("error reading response body: %w", err)
408 }
409
410 for key := range strings.SplitSeq(string(plaintext), "\n") {
411 if key == "" {
412 continue
413 }
414 pk := db.PublicKey{
415 Did: did,
416 }
417 pk.Key = key
418 if err := h.db.AddPublicKey(pk); err != nil {
419 l.Error("failed to add public key", "error", err)
420 return fmt.Errorf("failed to add public key: %w", err)
421 }
422 }
423 return nil
424}
425
426func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error {
427 if event.Kind != jmodels.EventKindCommit {
428 return nil
429 }
430
431 var err error
432 defer func() {
433 eventTime := event.TimeUS
434 lastTimeUs := eventTime + 1
435 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
436 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
437 }
438 }()
439
440 switch event.Commit.Collection {
441 case tangled.PublicKeyNSID:
442 err = h.processPublicKey(ctx, event)
443 case tangled.KnotMemberNSID:
444 err = h.processKnotMember(ctx, event)
445 case tangled.RepoPullNSID:
446 err = h.processPull(ctx, event)
447 case tangled.RepoCollaboratorNSID:
448 err = h.processCollaborator(ctx, event)
449 }
450
451 if err != nil {
452 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err)
453 }
454
455 return nil
456}