A vibe coded tangled fork which supports pijul.
at op/zllonksruqxw 456 lines 13 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/identity" 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/xrpc" 17 jmodels "github.com/bluesky-social/jetstream/pkg/models" 18 securejoin "github.com/cyphar/filepath-securejoin" 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/knotserver/db" 22 "tangled.org/core/knotserver/git" 23 "tangled.org/core/log" 24 "tangled.org/core/rbac" 25 "tangled.org/core/workflow" 26) 27 28func (h *Knot) processPublicKey(ctx context.Context, event *jmodels.Event) error { 29 l := log.FromContext(ctx) 30 raw := json.RawMessage(event.Commit.Record) 31 did := event.Did 32 33 var record tangled.PublicKey 34 if err := json.Unmarshal(raw, &record); err != nil { 35 return fmt.Errorf("failed to unmarshal record: %w", err) 36 } 37 38 pk := db.PublicKey{ 39 Did: did, 40 PublicKey: record, 41 } 42 if err := h.db.AddPublicKey(pk); err != nil { 43 l.Error("failed to add public key", "error", err) 44 return fmt.Errorf("failed to add public key: %w", err) 45 } 46 l.Info("added public key from firehose", "did", did) 47 return nil 48} 49 50func (h *Knot) processKnotMember(ctx context.Context, event *jmodels.Event) error { 51 l := log.FromContext(ctx) 52 raw := json.RawMessage(event.Commit.Record) 53 did := event.Did 54 55 var record tangled.KnotMember 56 if err := json.Unmarshal(raw, &record); err != nil { 57 return fmt.Errorf("failed to unmarshal record: %w", err) 58 } 59 60 if record.Domain != h.c.Server.Hostname { 61 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 62 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 63 } 64 65 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 66 if err != nil || !ok { 67 l.Error("failed to add member", "did", did) 68 return fmt.Errorf("failed to enforce permissions: %w", err) 69 } 70 71 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 72 l.Error("failed to add member", "error", err) 73 return fmt.Errorf("failed to add member: %w", err) 74 } 75 l.Info("added member from firehose", "member", record.Subject) 76 77 if err := h.db.AddDid(record.Subject); err != nil { 78 l.Error("failed to add did", "error", err) 79 return fmt.Errorf("failed to add did: %w", err) 80 } 81 h.jc.AddDid(record.Subject) 82 83 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 84 return fmt.Errorf("failed to fetch and add keys: %w", err) 85 } 86 87 return nil 88} 89 90func (h *Knot) validatePullRecord(record *tangled.RepoPull) error { 91 if record.Target == nil { 92 return fmt.Errorf("ignoring pull record: target repo is nil") 93 } 94 95 if record.Source == nil { 96 return fmt.Errorf("ignoring pull record: not a branch-based pull request") 97 } 98 99 if record.Source.Repo != nil { 100 return fmt.Errorf("ignoring pull record: fork based pull") 101 } 102 103 return nil 104} 105 106func (h *Knot) resolveTargetRepo(ctx context.Context, targetRepoUri string) (*identity.Identity, *tangled.Repo, error) { 107 repoAt, err := syntax.ParseATURI(targetRepoUri) 108 if err != nil { 109 return nil, nil, fmt.Errorf("failed to parse ATURI: %w", err) 110 } 111 112 // resolve the repo owner to extract the repo record 113 repoOwnerIdent, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 114 if err != nil || repoOwnerIdent.Handle.IsInvalidHandle() { 115 return nil, nil, fmt.Errorf("failed to resolve repo owner handle: %w", err) 116 } 117 118 xrpcc := xrpc.Client{ 119 Host: repoOwnerIdent.PDSEndpoint(), 120 } 121 122 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 123 if err != nil { 124 return nil, nil, fmt.Errorf("failed to resolve repo: %w", err) 125 } 126 127 repo := resp.Value.Val.(*tangled.Repo) 128 return repoOwnerIdent, repo, nil 129} 130 131func (h *Knot) fetchLatestSubmission(ctx context.Context, did, rkey string, record *tangled.RepoPull) (*models.PullSubmission, error) { 132 // resolve the PR owner's identity to fetch the blob from their PDS 133 prOwnerIdent, err := h.resolver.ResolveIdent(ctx, did) 134 if err != nil || prOwnerIdent.Handle.IsInvalidHandle() { 135 return nil, fmt.Errorf("failed to resolve PR owner handle: %w", err) 136 } 137 138 roundNumber := len(record.Rounds) - 1 139 round := record.Rounds[roundNumber] 140 141 // fetch the blob from the PR owner's PDS 142 prOwnerPds := prOwnerIdent.PDSEndpoint() 143 blobUrl, err := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", prOwnerPds)) 144 if err != nil { 145 return nil, fmt.Errorf("failed to construct blob URL: %w", err) 146 } 147 q := blobUrl.Query() 148 q.Set("cid", round.PatchBlob.Ref.String()) 149 q.Set("did", did) 150 blobUrl.RawQuery = q.Encode() 151 152 req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobUrl.String(), nil) 153 if err != nil { 154 return nil, fmt.Errorf("failed to create blob request: %w", err) 155 } 156 req.Header.Set("Content-Type", "application/json") 157 158 blobResp, err := http.DefaultClient.Do(req) 159 if err != nil { 160 return nil, fmt.Errorf("failed to fetch blob: %w", err) 161 } 162 defer blobResp.Body.Close() 163 164 blob := io.ReadCloser(blobResp.Body) 165 latestSubmission, err := models.PullSubmissionFromRecord(did, rkey, roundNumber, round, &blob) 166 if err != nil { 167 return nil, fmt.Errorf("failed to parse submission: %w", err) 168 } 169 170 return latestSubmission, nil 171} 172 173func (h *Knot) discoverWorkflows(ctx context.Context, repoPath, sha string) (workflow.RawPipeline, error) { 174 gr, err := git.Open(repoPath, sha) 175 if err != nil { 176 return nil, fmt.Errorf("failed to open git repository: %w", err) 177 } 178 179 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 180 if err != nil { 181 return nil, fmt.Errorf("failed to open workflow directory: %w", err) 182 } 183 184 var pipeline workflow.RawPipeline 185 for _, e := range workflowDir { 186 if !e.IsFile() { 187 continue 188 } 189 190 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 191 contents, err := gr.RawContent(fpath) 192 if err != nil { 193 continue 194 } 195 196 pipeline = append(pipeline, workflow.RawWorkflow{ 197 Name: e.Name, 198 Contents: contents, 199 }) 200 } 201 202 return pipeline, nil 203} 204 205func (h *Knot) compilePipeline(ctx context.Context, repoOwner *identity.Identity, repo *tangled.Repo, sourceBranch, sourceSha, targetBranch string, rawPipeline workflow.RawPipeline) tangled.Pipeline { 206 l := log.FromContext(ctx) 207 208 trigger := tangled.Pipeline_PullRequestTriggerData{ 209 Action: "create", 210 SourceBranch: sourceBranch, 211 SourceSha: sourceSha, 212 TargetBranch: targetBranch, 213 } 214 215 compiler := workflow.Compiler{ 216 Trigger: tangled.Pipeline_TriggerMetadata{ 217 Kind: string(workflow.TriggerKindPullRequest), 218 PullRequest: &trigger, 219 Repo: &tangled.Pipeline_TriggerRepo{ 220 Did: repoOwner.DID.String(), 221 Knot: repo.Knot, 222 Repo: repo.Name, 223 }, 224 }, 225 } 226 227 l.Info("raw", "raw", rawPipeline) 228 parsed := compiler.Parse(rawPipeline) 229 l.Info("parsed", "parsed", parsed) 230 compiled := compiler.Compile(parsed) 231 232 l.Info("compiler diagnostics", "diagnostics", compiler.Diagnostics) 233 234 return compiled 235} 236 237func (h *Knot) processPull(ctx context.Context, event *jmodels.Event) error { 238 raw := json.RawMessage(event.Commit.Record) 239 rkey := event.Commit.RKey 240 did := event.Did 241 242 var record tangled.RepoPull 243 if err := json.Unmarshal(raw, &record); err != nil { 244 return fmt.Errorf("failed to unmarshal record: %w", err) 245 } 246 247 l := log.FromContext(ctx) 248 l = l.With("handler", "processPull") 249 l = l.With("did", did) 250 251 l.Info("validating pull record") 252 if err := h.validatePullRecord(&record); err != nil { 253 return err 254 } 255 256 l = l.With("target_repo", record.Target.Repo) 257 l = l.With("target_branch", record.Target.Branch) 258 259 l.Info("resolving target repo") 260 repoOwnerIdent, repo, err := h.resolveTargetRepo(ctx, record.Target.Repo) 261 if err != nil { 262 return err 263 } 264 265 if repo.Knot != h.c.Server.Hostname { 266 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 267 } 268 269 l.Info("fetching latest submission") 270 latestSubmission, err := h.fetchLatestSubmission(ctx, did, rkey, &record) 271 if err != nil { 272 return err 273 } 274 275 sha := latestSubmission.SourceRev 276 if sha == "" { 277 return fmt.Errorf("failed to extract source SHA from pull submission") 278 } 279 l = l.With("sha", sha) 280 281 l.Info("constructing repo path") 282 didSlashRepo, err := securejoin.SecureJoin(repoOwnerIdent.DID.String(), repo.Name) 283 if err != nil { 284 return fmt.Errorf("failed to construct relative repo path: %w", err) 285 } 286 287 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 288 if err != nil { 289 return fmt.Errorf("failed to construct absolute repo path: %w", err) 290 } 291 292 l.Info("discovering workflows", "repo_path", repoPath) 293 pipeline, err := h.discoverWorkflows(ctx, repoPath, sha) 294 if err != nil { 295 return err 296 } 297 298 l.Info("compiling pipeline", "workflow_count", len(pipeline)) 299 cp := h.compilePipeline(ctx, repoOwnerIdent, repo, record.Source.Branch, sha, record.Target.Branch, pipeline) 300 301 // do not run empty pipelines 302 if cp.Workflows == nil { 303 l.Info("skipping empty pipeline") 304 return nil 305 } 306 307 l.Info("marshaling pipeline event") 308 eventJson, err := json.Marshal(cp) 309 if err != nil { 310 return fmt.Errorf("failed to marshal pipeline event: %w", err) 311 } 312 313 ev := db.Event{ 314 Rkey: TID(), 315 Nsid: tangled.PipelineNSID, 316 EventJson: string(eventJson), 317 } 318 319 l.Info("inserting pipeline event") 320 return h.db.InsertEvent(ev, h.n) 321} 322 323// duplicated from add collaborator 324func (h *Knot) processCollaborator(ctx context.Context, event *jmodels.Event) error { 325 raw := json.RawMessage(event.Commit.Record) 326 did := event.Did 327 328 var record tangled.RepoCollaborator 329 if err := json.Unmarshal(raw, &record); err != nil { 330 return fmt.Errorf("failed to unmarshal record: %w", err) 331 } 332 333 repoAt, err := syntax.ParseATURI(record.Repo) 334 if err != nil { 335 return err 336 } 337 338 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 339 if err != nil || subjectId.Handle.IsInvalidHandle() { 340 return err 341 } 342 343 // TODO: fix this for good, we need to fetch the record here unfortunately 344 // resolve this aturi to extract the repo record 345 owner, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 346 if err != nil || owner.Handle.IsInvalidHandle() { 347 return fmt.Errorf("failed to resolve handle: %w", err) 348 } 349 350 xrpcc := xrpc.Client{ 351 Host: owner.PDSEndpoint(), 352 } 353 354 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 355 if err != nil { 356 return err 357 } 358 359 repo := resp.Value.Val.(*tangled.Repo) 360 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 361 362 // check perms for this user 363 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo) 364 if err != nil { 365 return fmt.Errorf("failed to check permissions: %w", err) 366 } 367 if !ok { 368 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo) 369 } 370 371 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 372 return err 373 } 374 h.jc.AddDid(subjectId.DID.String()) 375 376 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 377 return err 378 } 379 380 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 381} 382 383func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 384 l := log.FromContext(ctx) 385 386 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 387 if err != nil { 388 l.Error("error building endpoint url", "did", did, "error", err.Error()) 389 return fmt.Errorf("error building endpoint url: %w", err) 390 } 391 392 resp, err := http.Get(keysEndpoint) 393 if err != nil { 394 l.Error("error getting keys", "did", did, "error", err) 395 return fmt.Errorf("error getting keys: %w", err) 396 } 397 defer resp.Body.Close() 398 399 if resp.StatusCode == http.StatusNotFound { 400 l.Info("no keys found for did", "did", did) 401 return nil 402 } 403 404 plaintext, err := io.ReadAll(resp.Body) 405 if err != nil { 406 l.Error("error reading response body", "error", err) 407 return fmt.Errorf("error reading response body: %w", err) 408 } 409 410 for key := range strings.SplitSeq(string(plaintext), "\n") { 411 if key == "" { 412 continue 413 } 414 pk := db.PublicKey{ 415 Did: did, 416 } 417 pk.Key = key 418 if err := h.db.AddPublicKey(pk); err != nil { 419 l.Error("failed to add public key", "error", err) 420 return fmt.Errorf("failed to add public key: %w", err) 421 } 422 } 423 return nil 424} 425 426func (h *Knot) processMessages(ctx context.Context, event *jmodels.Event) error { 427 if event.Kind != jmodels.EventKindCommit { 428 return nil 429 } 430 431 var err error 432 defer func() { 433 eventTime := event.TimeUS 434 lastTimeUs := eventTime + 1 435 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 436 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 437 } 438 }() 439 440 switch event.Commit.Collection { 441 case tangled.PublicKeyNSID: 442 err = h.processPublicKey(ctx, event) 443 case tangled.KnotMemberNSID: 444 err = h.processKnotMember(ctx, event) 445 case tangled.RepoPullNSID: 446 err = h.processPull(ctx, event) 447 case tangled.RepoCollaboratorNSID: 448 err = h.processCollaborator(ctx, event) 449 } 450 451 if err != nil { 452 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 453 } 454 455 return nil 456}