A vibe coded tangled fork which supports pijul.

spindle: create pipeline events from spindle

spindle will emit `sh.tangled.pipeline` event on:
- `sh.tangled.git.refUpdate` events from knot stream
- live create/update events of `sh.tangled.repo.pull` records

Signed-off-by: Seongmin Lee <git@boltless.me>

+280 -75
+1
knotserver/internal.go
··· 176 176 } 177 177 178 178 for _, line := range lines { 179 + // TODO: pass pushOptions to refUpdate 179 180 err := h.insertRefUpdate(line, gitUserDid, repoDid, repoName) 180 181 if err != nil { 181 182 l.Error("failed to insert op", "err", err, "line", line, "did", gitUserDid, "repo", gitRelativeDir)
+14
spindle/db/events.go
··· 70 70 return evts, nil 71 71 } 72 72 73 + func (d *DB) CreatePipelineEvent(rkey string, pipeline tangled.Pipeline, n *notifier.Notifier) error { 74 + eventJson, err := json.Marshal(pipeline) 75 + if err != nil { 76 + return err 77 + } 78 + event := Event{ 79 + Rkey: rkey, 80 + Nsid: tangled.PipelineNSID, 81 + Created: time.Now().UnixNano(), 82 + EventJson: string(eventJson), 83 + } 84 + return d.insertEvent(event, n) 85 + } 86 + 73 87 func (d *DB) createStatusEvent( 74 88 workflowId models.WorkflowId, 75 89 statusKind models.StatusKind,
+176 -73
spindle/server.go
··· 4 4 "context" 5 5 _ "embed" 6 6 "encoding/json" 7 + "errors" 7 8 "fmt" 8 9 "log/slog" 9 10 "maps" ··· 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 16 17 "github.com/bluesky-social/indigo/service/tap" 17 18 "github.com/go-chi/chi/v5" 19 + "github.com/go-git/go-git/v5/plumbing/object" 18 20 "github.com/hashicorp/go-version" 19 21 "tangled.org/core/api/tangled" 20 22 "tangled.org/core/eventconsumer" 21 23 "tangled.org/core/eventconsumer/cursor" 22 24 "tangled.org/core/idresolver" 25 + kgit "tangled.org/core/knotserver/git" 23 26 "tangled.org/core/log" 24 27 "tangled.org/core/notifier" 25 28 "tangled.org/core/rbac2" ··· 33 36 "tangled.org/core/spindle/secrets" 34 37 "tangled.org/core/spindle/xrpc" 35 38 "tangled.org/core/tapc" 39 + "tangled.org/core/tid" 40 + "tangled.org/core/workflow" 36 41 "tangled.org/core/xrpc/serviceauth" 37 42 ) 38 43 ··· 132 137 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 133 138 } 134 139 135 - // for each incoming sh.tangled.pipeline, we execute 136 - // spindle.processPipeline, which in turn enqueues the pipeline 137 - // job in the above registered queue. 140 + // spindle listen to knot stream for sh.tangled.git.refUpdate 141 + // which will sync the local workflow files in spindle and enqueues the 142 + // pipeline job for on-push workflows 138 143 ccfg := eventconsumer.NewConsumerConfig() 139 144 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 140 145 ccfg.Dev = cfg.Server.Dev 141 - ccfg.ProcessFunc = spindle.processPipeline 146 + ccfg.ProcessFunc = spindle.processKnotStream 142 147 ccfg.CursorStore = cursorStore 143 148 knownKnots, err := d.Knots() 144 149 if err != nil { ··· 227 232 RepoFetchTimeout: 300 * time.Second, 228 233 IdentityCacheSize: 2_000_000, 229 234 EventCacheSize: 100_000, 230 - CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.SpindleMemberNSID}, 235 + SignalCollection: tangled.RepoPullNSID, // HACK: listen for repo.pull from non-tangled users 236 + CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.RepoPullNSID, tangled.SpindleMemberNSID}, 231 237 RetryTimeout: 3 * time.Second, 232 238 }) 233 239 if err != nil { ··· 318 324 return x.Router() 319 325 } 320 326 321 - func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 327 + func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 322 328 l := log.FromContext(ctx).With("handler", "processKnotStream") 323 329 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 324 330 if msg.Nsid == tangled.PipelineNSID { ··· 356 362 Rkey: msg.Rkey, 357 363 } 358 364 359 - workflows := make(map[models.Engine][]models.Workflow) 360 - 361 - // Build pipeline environment variables once for all workflows 362 - pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 363 - 364 - for _, w := range tpl.Workflows { 365 - if w != nil { 366 - if _, ok := s.engs[w.Engine]; !ok { 367 - err = s.db.StatusFailed(models.WorkflowId{ 368 - PipelineId: pipelineId, 369 - Name: w.Name, 370 - }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 371 - if err != nil { 372 - return fmt.Errorf("db.StatusFailed: %w", err) 373 - } 374 - 375 - continue 376 - } 377 - 378 - eng := s.engs[w.Engine] 379 - 380 - if _, ok := workflows[eng]; !ok { 381 - workflows[eng] = []models.Workflow{} 382 - } 383 - 384 - ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 385 - if err != nil { 386 - return fmt.Errorf("init workflow: %w", err) 387 - } 388 - 389 - // inject TANGLED_* env vars after InitWorkflow 390 - // This prevents user-defined env vars from overriding them 391 - if ewf.Environment == nil { 392 - ewf.Environment = make(map[string]string) 393 - } 394 - maps.Copy(ewf.Environment, pipelineEnv) 395 - 396 - workflows[eng] = append(workflows[eng], *ewf) 397 - 398 - err = s.db.StatusPending(models.WorkflowId{ 399 - PipelineId: pipelineId, 400 - Name: w.Name, 401 - }, s.n) 402 - if err != nil { 403 - return fmt.Errorf("db.StatusPending: %w", err) 404 - } 405 - } 406 - } 407 - 408 - ok := s.jq.Enqueue(queue.Job{ 409 - Run: func() error { 410 - engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 411 - RepoOwner: tpl.TriggerMetadata.Repo.Did, 412 - RepoName: tpl.TriggerMetadata.Repo.Repo, 413 - Workflows: workflows, 414 - }, pipelineId) 415 - return nil 416 - }, 417 - OnFail: func(jobError error) { 418 - s.l.Error("pipeline run failed", "error", jobError) 419 - }, 420 - }) 421 - if ok { 422 - s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 423 - } else { 424 - s.l.Error("failed to enqueue pipeline: queue is full") 365 + err = s.processPipeline(ctx, tpl, pipelineId) 366 + if err != nil { 367 + return err 425 368 } 426 369 } else if msg.Nsid == tangled.GitRefUpdateNSID { 427 370 event := tangled.GitRefUpdate{} ··· 446 389 } 447 390 l.Info("synced git repo") 448 391 449 - // TODO: plan the pipeline 392 + compiler := workflow.Compiler{ 393 + Trigger: tangled.Pipeline_TriggerMetadata{ 394 + Kind: string(workflow.TriggerKindPush), 395 + Push: &tangled.Pipeline_PushTriggerData{ 396 + Ref: event.Ref, 397 + OldSha: event.OldSha, 398 + NewSha: event.NewSha, 399 + }, 400 + Repo: &tangled.Pipeline_TriggerRepo{ 401 + Did: repo.Did.String(), 402 + Knot: repo.Knot, 403 + Repo: repo.Name, 404 + }, 405 + }, 406 + } 407 + 408 + // load workflow definitions from rev (without spindle context) 409 + rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 410 + if err != nil { 411 + return fmt.Errorf("loading pipeline: %w", err) 412 + } 413 + if len(rawPipeline) == 0 { 414 + l.Info("no workflow definition find for the repo. skipping the event") 415 + return nil 416 + } 417 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 418 + // TODO: pass compile error to workflow log 419 + for _, w := range compiler.Diagnostics.Errors { 420 + l.Error(w.String()) 421 + } 422 + for _, w := range compiler.Diagnostics.Warnings { 423 + l.Warn(w.String()) 424 + } 425 + 426 + pipelineId := models.PipelineId{ 427 + Knot: tpl.TriggerMetadata.Repo.Knot, 428 + Rkey: tid.TID(), 429 + } 430 + if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 431 + l.Error("failed to create pipeline event", "err", err) 432 + return nil 433 + } 434 + err = s.processPipeline(ctx, tpl, pipelineId) 435 + if err != nil { 436 + return err 437 + } 438 + } 439 + 440 + return nil 441 + } 442 + 443 + func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 444 + if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 445 + return nil, fmt.Errorf("syncing git repo: %w", err) 446 + } 447 + gr, err := kgit.Open(repoPath, rev) 448 + if err != nil { 449 + return nil, fmt.Errorf("opening git repo: %w", err) 450 + } 451 + 452 + workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 453 + if errors.Is(err, object.ErrDirectoryNotFound) { 454 + // return empty RawPipeline when directory doesn't exist 455 + return nil, nil 456 + } else if err != nil { 457 + return nil, fmt.Errorf("loading file tree: %w", err) 450 458 } 451 459 460 + var rawPipeline workflow.RawPipeline 461 + for _, e := range workflowDir { 462 + if !e.IsFile() { 463 + continue 464 + } 465 + 466 + fpath := filepath.Join(workflow.WorkflowDir, e.Name) 467 + contents, err := gr.RawContent(fpath) 468 + if err != nil { 469 + return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 470 + } 471 + 472 + rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 473 + Name: e.Name, 474 + Contents: contents, 475 + }) 476 + } 477 + 478 + return rawPipeline, nil 479 + } 480 + 481 + func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 482 + // Build pipeline environment variables once for all workflows 483 + pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 484 + 485 + // filter & init workflows 486 + workflows := make(map[models.Engine][]models.Workflow) 487 + for _, w := range tpl.Workflows { 488 + if w == nil { 489 + continue 490 + } 491 + if _, ok := s.engs[w.Engine]; !ok { 492 + err := s.db.StatusFailed(models.WorkflowId{ 493 + PipelineId: pipelineId, 494 + Name: w.Name, 495 + }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 496 + if err != nil { 497 + return fmt.Errorf("db.StatusFailed: %w", err) 498 + } 499 + 500 + continue 501 + } 502 + 503 + eng := s.engs[w.Engine] 504 + 505 + if _, ok := workflows[eng]; !ok { 506 + workflows[eng] = []models.Workflow{} 507 + } 508 + 509 + ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 510 + if err != nil { 511 + return fmt.Errorf("init workflow: %w", err) 512 + } 513 + 514 + // inject TANGLED_* env vars after InitWorkflow 515 + // This prevents user-defined env vars from overriding them 516 + if ewf.Environment == nil { 517 + ewf.Environment = make(map[string]string) 518 + } 519 + maps.Copy(ewf.Environment, pipelineEnv) 520 + 521 + workflows[eng] = append(workflows[eng], *ewf) 522 + } 523 + 524 + // enqueue pipeline 525 + ok := s.jq.Enqueue(queue.Job{ 526 + Run: func() error { 527 + engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 528 + RepoOwner: tpl.TriggerMetadata.Repo.Did, 529 + RepoName: tpl.TriggerMetadata.Repo.Repo, 530 + Workflows: workflows, 531 + }, pipelineId) 532 + return nil 533 + }, 534 + OnFail: func(jobError error) { 535 + s.l.Error("pipeline run failed", "error", jobError) 536 + }, 537 + }) 538 + if !ok { 539 + return fmt.Errorf("failed to enqueue pipeline: queue is full") 540 + } 541 + s.l.Info("pipeline enqueued successfully", "id", pipelineId) 542 + 543 + // emit StatusPending for all workflows here (after successful enqueue) 544 + for _, ewfs := range workflows { 545 + for _, ewf := range ewfs { 546 + err := s.db.StatusPending(models.WorkflowId{ 547 + PipelineId: pipelineId, 548 + Name: ewf.Name, 549 + }, s.n) 550 + if err != nil { 551 + return fmt.Errorf("db.StatusPending: %w", err) 552 + } 553 + } 554 + } 452 555 return nil 453 556 } 454 557
+89 -2
spindle/tap.go
··· 11 11 "tangled.org/core/eventconsumer" 12 12 "tangled.org/core/spindle/db" 13 13 "tangled.org/core/spindle/git" 14 + "tangled.org/core/spindle/models" 14 15 "tangled.org/core/tapc" 16 + "tangled.org/core/tid" 17 + "tangled.org/core/workflow" 15 18 ) 16 19 17 20 func (s *Spindle) processEvent(ctx context.Context, evt tapc.Event) error { ··· 281 284 282 285 l.Info("processing pull record") 283 286 287 + // only listen to live events 288 + if !evt.Record.Live { 289 + l.Info("skipping backfill event", "event", evt.Record.AtUri()) 290 + return nil 291 + } 292 + 284 293 switch evt.Record.Action { 285 294 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 286 - // TODO 295 + record := tangled.RepoPull{} 296 + if err := json.Unmarshal(evt.Record.Record, &record); err != nil { 297 + l.Error("invalid record", "err", err) 298 + return fmt.Errorf("parsing record: %w", err) 299 + } 300 + 301 + // ignore legacy records 302 + if record.Target == nil { 303 + l.Info("ignoring pull record: target repo is nil") 304 + return nil 305 + } 306 + 307 + // ignore patch-based and fork-based PRs 308 + if record.Source == nil || record.Source.Repo != nil { 309 + l.Info("ignoring pull record: not a branch-based pull request") 310 + return nil 311 + } 312 + 313 + // skip if target repo is unknown 314 + repo, err := s.db.GetRepo(syntax.ATURI(record.Target.Repo)) 315 + if err != nil { 316 + l.Warn("target repo is not ingested yet", "repo", record.Target.Repo, "err", err) 317 + return fmt.Errorf("target repo is unknown") 318 + } 319 + 320 + compiler := workflow.Compiler{ 321 + Trigger: tangled.Pipeline_TriggerMetadata{ 322 + Kind: string(workflow.TriggerKindPullRequest), 323 + PullRequest: &tangled.Pipeline_PullRequestTriggerData{ 324 + Action: "create", 325 + SourceBranch: record.Source.Branch, 326 + SourceSha: record.Source.Sha, 327 + TargetBranch: record.Target.Branch, 328 + }, 329 + Repo: &tangled.Pipeline_TriggerRepo{ 330 + Did: repo.Did.String(), 331 + Knot: repo.Knot, 332 + Repo: repo.Name, 333 + }, 334 + }, 335 + } 336 + 337 + repoUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) 338 + repoPath := s.newRepoPath(repo.Did, repo.Rkey) 339 + 340 + // load workflow definitions from rev (without spindle context) 341 + rawPipeline, err := s.loadPipeline(ctx, repoUri, repoPath, record.Source.Sha) 342 + if err != nil { 343 + // don't retry 344 + l.Error("failed loading pipeline", "err", err) 345 + return nil 346 + } 347 + if len(rawPipeline) == 0 { 348 + l.Info("no workflow definition find for the repo. skipping the event") 349 + return nil 350 + } 351 + tpl := compiler.Compile(compiler.Parse(rawPipeline)) 352 + // TODO: pass compile error to workflow log 353 + for _, w := range compiler.Diagnostics.Errors { 354 + l.Error(w.String()) 355 + } 356 + for _, w := range compiler.Diagnostics.Warnings { 357 + l.Warn(w.String()) 358 + } 359 + 360 + pipelineId := models.PipelineId{ 361 + Knot: tpl.TriggerMetadata.Repo.Knot, 362 + Rkey: tid.TID(), 363 + } 364 + if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 365 + l.Error("failed to create pipeline event", "err", err) 366 + return nil 367 + } 368 + err = s.processPipeline(ctx, tpl, pipelineId) 369 + if err != nil { 370 + // don't retry 371 + l.Error("failed processing pipeline", "err", err) 372 + return nil 373 + } 287 374 case tapc.RecordDeleteAction: 288 - // TODO 375 + // no-op 289 376 } 290 377 return nil 291 378 }