A vibe coded tangled fork which supports pijul.
at 51b391b858fd669de4b33a34ee2d9c10a4c1b339 583 lines 16 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "path/filepath" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/bluesky-social/indigo/service/tap" 18 "github.com/go-chi/chi/v5" 19 "github.com/go-git/go-git/v5/plumbing/object" 20 "github.com/hashicorp/go-version" 21 "tangled.org/core/api/tangled" 22 "tangled.org/core/eventconsumer" 23 "tangled.org/core/eventconsumer/cursor" 24 "tangled.org/core/idresolver" 25 kgit "tangled.org/core/knotserver/git" 26 "tangled.org/core/log" 27 "tangled.org/core/notifier" 28 "tangled.org/core/rbac2" 29 "tangled.org/core/spindle/config" 30 "tangled.org/core/spindle/db" 31 "tangled.org/core/spindle/engine" 32 "tangled.org/core/spindle/engines/nixery" 33 "tangled.org/core/spindle/git" 34 "tangled.org/core/spindle/models" 35 "tangled.org/core/spindle/queue" 36 "tangled.org/core/spindle/secrets" 37 "tangled.org/core/spindle/xrpc" 38 "tangled.org/core/tapc" 39 "tangled.org/core/tid" 40 "tangled.org/core/workflow" 41 "tangled.org/core/xrpc/serviceauth" 42) 43 44//go:embed motd 45var defaultMotd []byte 46 47type Spindle struct { 48 tap *tapc.Client 49 db *db.DB 50 e *rbac2.Enforcer 51 l *slog.Logger 52 n *notifier.Notifier 53 engs map[string]models.Engine 54 jq *queue.Queue 55 cfg *config.Config 56 ks *eventconsumer.Consumer 57 res *idresolver.Resolver 58 vault secrets.Manager 59 motd []byte 60 motdMu sync.RWMutex 61} 62 63// New creates a new Spindle server with the provided configuration and engines. 64func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 65 logger := log.FromContext(ctx) 66 67 if err := ensureGitVersion(); err != nil { 68 return nil, fmt.Errorf("ensuring git version: %w", err) 69 } 70 71 d, err := db.Make(ctx, cfg.Server.DBPath()) 72 if err != nil { 73 return nil, fmt.Errorf("failed to setup db: %w", err) 74 } 75 76 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 77 if err != nil { 78 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 79 } 80 81 n := notifier.New() 82 83 var vault secrets.Manager 84 switch cfg.Server.Secrets.Provider { 85 case "openbao": 86 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 87 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 88 } 89 vault, err = secrets.NewOpenBaoManager( 90 cfg.Server.Secrets.OpenBao.ProxyAddr, 91 logger, 92 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 93 ) 94 if err != nil { 95 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 96 } 97 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 98 case "sqlite", "": 99 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 102 } 103 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 104 default: 105 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 106 } 107 108 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 109 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 110 111 tap := tapc.NewClient("http://localhost:"+cfg.Server.TapPort, "") 112 113 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 114 115 spindle := &Spindle{ 116 tap: &tap, 117 e: e, 118 db: d, 119 l: logger, 120 n: &n, 121 engs: engines, 122 jq: jq, 123 cfg: cfg, 124 res: resolver, 125 vault: vault, 126 motd: defaultMotd, 127 } 128 129 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 130 if err != nil { 131 return nil, err 132 } 133 logger.Info("owner set", "did", cfg.Server.Owner) 134 135 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 136 if err != nil { 137 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 138 } 139 140 // spindle listen to knot stream for sh.tangled.git.refUpdate 141 // which will sync the local workflow files in spindle and enqueues the 142 // pipeline job for on-push workflows 143 ccfg := eventconsumer.NewConsumerConfig() 144 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 145 ccfg.Dev = cfg.Server.Dev 146 ccfg.ProcessFunc = spindle.processKnotStream 147 ccfg.CursorStore = cursorStore 148 knownKnots, err := d.Knots() 149 if err != nil { 150 return nil, err 151 } 152 for _, knot := range knownKnots { 153 logger.Info("adding source start", "knot", knot) 154 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 155 } 156 spindle.ks = eventconsumer.NewConsumer(*ccfg) 157 158 return spindle, nil 159} 160 161// DB returns the database instance. 162func (s *Spindle) DB() *db.DB { 163 return s.db 164} 165 166// Queue returns the job queue instance. 167func (s *Spindle) Queue() *queue.Queue { 168 return s.jq 169} 170 171// Engines returns the map of available engines. 172func (s *Spindle) Engines() map[string]models.Engine { 173 return s.engs 174} 175 176// Vault returns the secrets manager instance. 177func (s *Spindle) Vault() secrets.Manager { 178 return s.vault 179} 180 181// Notifier returns the notifier instance. 182func (s *Spindle) Notifier() *notifier.Notifier { 183 return s.n 184} 185 186// Enforcer returns the RBAC enforcer instance. 187func (s *Spindle) Enforcer() *rbac2.Enforcer { 188 return s.e 189} 190 191// SetMotdContent sets custom MOTD content, replacing the embedded default. 192func (s *Spindle) SetMotdContent(content []byte) { 193 s.motdMu.Lock() 194 defer s.motdMu.Unlock() 195 s.motd = content 196} 197 198// GetMotdContent returns the current MOTD content. 199func (s *Spindle) GetMotdContent() []byte { 200 s.motdMu.RLock() 201 defer s.motdMu.RUnlock() 202 return s.motd 203} 204 205// Start starts the Spindle server (blocking). 206func (s *Spindle) Start(ctx context.Context) error { 207 svcErr := make(chan error, 1) 208 209 // starts a job queue runner in the background 210 s.jq.Start() 211 defer s.jq.Stop() 212 213 // Stop vault token renewal if it implements Stopper 214 if stopper, ok := s.vault.(secrets.Stopper); ok { 215 defer stopper.Stop() 216 } 217 218 go func() { 219 s.l.Info("starting knot event consumer") 220 s.ks.Start(ctx) 221 }() 222 223 tap, err := tap.New(tap.Config{ 224 DatabaseURL: s.cfg.Server.TapDBUrl, 225 DBMaxConns: 32, 226 PLCURL: s.cfg.Server.PlcUrl, 227 RelayUrl: s.cfg.Server.RelayUrl, 228 FirehoseParallelism: 10, 229 ResyncParallelism: 5, 230 OutboxParallelism: 1, 231 FirehoseCursorSaveInterval: 1 * time.Second, 232 RepoFetchTimeout: 300 * time.Second, 233 IdentityCacheSize: 2_000_000, 234 EventCacheSize: 100_000, 235 SignalCollection: tangled.RepoPullNSID, // HACK: listen for repo.pull from non-tangled users 236 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.RepoPullNSID, tangled.SpindleMemberNSID}, 237 RetryTimeout: 3 * time.Second, 238 }) 239 if err != nil { 240 return err 241 } 242 go func() { 243 if err := tap.RunTap(ctx, ":"+s.cfg.Server.TapPort); err != nil { 244 svcErr <- err 245 } 246 }() 247 go func() { 248 s.l.Info("starting embedded tap server") 249 250 s.l.Info("starting tap stream consumer") 251 s.tap.Connect(ctx, &tapc.SimpleIndexer{ 252 EventHandler: s.processEvent, 253 }) 254 }() 255 256 s.l.Debug("waiting for tap readiness") 257 tapReadyCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 258 defer cancel() 259 if err := s.tap.WaitReady(tapReadyCtx); err != nil { 260 return fmt.Errorf("tap not ready: %w", err) 261 } 262 263 // ensure server owner is tracked 264 s.l.Debug("adding server owner to tap") 265 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 266 return err 267 } 268 269 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 270 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 271} 272 273func Run(ctx context.Context) error { 274 cfg, err := config.Load(ctx) 275 if err != nil { 276 return fmt.Errorf("failed to load config: %w", err) 277 } 278 279 nixeryEng, err := nixery.New(ctx, cfg) 280 if err != nil { 281 return err 282 } 283 284 s, err := New(ctx, cfg, map[string]models.Engine{ 285 "nixery": nixeryEng, 286 }) 287 if err != nil { 288 return err 289 } 290 291 return s.Start(ctx) 292} 293 294func (s *Spindle) Router() http.Handler { 295 mux := chi.NewRouter() 296 297 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 298 w.Write(s.GetMotdContent()) 299 }) 300 mux.HandleFunc("/events", s.Events) 301 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 302 303 mux.Mount("/xrpc", s.XrpcRouter()) 304 return mux 305} 306 307func (s *Spindle) XrpcRouter() http.Handler { 308 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 309 310 l := log.SubLogger(s.l, "xrpc") 311 312 x := xrpc.Xrpc{ 313 Logger: l, 314 Db: s.db, 315 Enforcer: s.e, 316 Engines: s.engs, 317 Config: s.cfg, 318 Resolver: s.res, 319 Vault: s.vault, 320 Notifier: s.Notifier(), 321 ServiceAuth: serviceAuth, 322 } 323 324 return x.Router() 325} 326 327func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 328 l := log.FromContext(ctx).With("handler", "processKnotStream") 329 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 330 if msg.Nsid == tangled.PipelineNSID { 331 return nil 332 tpl := tangled.Pipeline{} 333 err := json.Unmarshal(msg.EventJson, &tpl) 334 if err != nil { 335 fmt.Println("error unmarshalling", err) 336 return err 337 } 338 339 if tpl.TriggerMetadata == nil { 340 return fmt.Errorf("no trigger metadata found") 341 } 342 343 if tpl.TriggerMetadata.Repo == nil { 344 return fmt.Errorf("no repo data found") 345 } 346 347 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 348 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 349 } 350 351 // filter by repos 352 _, err = s.db.GetRepoWithName( 353 syntax.DID(tpl.TriggerMetadata.Repo.Did), 354 tpl.TriggerMetadata.Repo.Repo, 355 ) 356 if err != nil { 357 return fmt.Errorf("failed to get repo: %w", err) 358 } 359 360 pipelineId := models.PipelineId{ 361 Knot: src.Key(), 362 Rkey: msg.Rkey, 363 } 364 365 err = s.processPipeline(ctx, tpl, pipelineId) 366 if err != nil { 367 return err 368 } 369 } else if msg.Nsid == tangled.GitRefUpdateNSID { 370 event := tangled.GitRefUpdate{} 371 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 372 l.Error("error unmarshalling", "err", err) 373 return err 374 } 375 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 376 377 // resolve repo name to rkey 378 // TODO: git.refUpdate should respond with rkey instead of repo name 379 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 380 if err != nil { 381 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 382 } 383 384 // NOTE: we are blindly trusting the knot that it will return only repos it own 385 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 386 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 387 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 388 return fmt.Errorf("sync git repo: %w", err) 389 } 390 l.Info("synced git repo") 391 392 compiler := workflow.Compiler{ 393 Trigger: tangled.Pipeline_TriggerMetadata{ 394 Kind: string(workflow.TriggerKindPush), 395 Push: &tangled.Pipeline_PushTriggerData{ 396 Ref: event.Ref, 397 OldSha: event.OldSha, 398 NewSha: event.NewSha, 399 }, 400 Repo: &tangled.Pipeline_TriggerRepo{ 401 Did: repo.Did.String(), 402 Knot: repo.Knot, 403 Repo: repo.Name, 404 }, 405 }, 406 } 407 408 // load workflow definitions from rev (without spindle context) 409 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 410 if err != nil { 411 return fmt.Errorf("loading pipeline: %w", err) 412 } 413 if len(rawPipeline) == 0 { 414 l.Info("no workflow definition find for the repo. skipping the event") 415 return nil 416 } 417 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 418 // TODO: pass compile error to workflow log 419 for _, w := range compiler.Diagnostics.Errors { 420 l.Error(w.String()) 421 } 422 for _, w := range compiler.Diagnostics.Warnings { 423 l.Warn(w.String()) 424 } 425 426 pipelineId := models.PipelineId{ 427 Knot: tpl.TriggerMetadata.Repo.Knot, 428 Rkey: tid.TID(), 429 } 430 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 431 l.Error("failed to create pipeline event", "err", err) 432 return nil 433 } 434 err = s.processPipeline(ctx, tpl, pipelineId) 435 if err != nil { 436 return err 437 } 438 } 439 440 return nil 441} 442 443func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 444 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 445 return nil, fmt.Errorf("syncing git repo: %w", err) 446 } 447 gr, err := kgit.Open(repoPath, rev) 448 if err != nil { 449 return nil, fmt.Errorf("opening git repo: %w", err) 450 } 451 452 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 453 if errors.Is(err, object.ErrDirectoryNotFound) { 454 // return empty RawPipeline when directory doesn't exist 455 return nil, nil 456 } else if err != nil { 457 return nil, fmt.Errorf("loading file tree: %w", err) 458 } 459 460 var rawPipeline workflow.RawPipeline 461 for _, e := range workflowDir { 462 if !e.IsFile() { 463 continue 464 } 465 466 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 467 contents, err := gr.RawContent(fpath) 468 if err != nil { 469 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 470 } 471 472 rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 473 Name: e.Name, 474 Contents: contents, 475 }) 476 } 477 478 return rawPipeline, nil 479} 480 481func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 482 // Build pipeline environment variables once for all workflows 483 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 484 485 // filter & init workflows 486 workflows := make(map[models.Engine][]models.Workflow) 487 for _, w := range tpl.Workflows { 488 if w == nil { 489 continue 490 } 491 if _, ok := s.engs[w.Engine]; !ok { 492 err := s.db.StatusFailed(models.WorkflowId{ 493 PipelineId: pipelineId, 494 Name: w.Name, 495 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 496 if err != nil { 497 return fmt.Errorf("db.StatusFailed: %w", err) 498 } 499 500 continue 501 } 502 503 eng := s.engs[w.Engine] 504 505 if _, ok := workflows[eng]; !ok { 506 workflows[eng] = []models.Workflow{} 507 } 508 509 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 510 if err != nil { 511 return fmt.Errorf("init workflow: %w", err) 512 } 513 514 // inject TANGLED_* env vars after InitWorkflow 515 // This prevents user-defined env vars from overriding them 516 if ewf.Environment == nil { 517 ewf.Environment = make(map[string]string) 518 } 519 maps.Copy(ewf.Environment, pipelineEnv) 520 521 workflows[eng] = append(workflows[eng], *ewf) 522 } 523 524 // enqueue pipeline 525 ok := s.jq.Enqueue(queue.Job{ 526 Run: func() error { 527 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 528 RepoOwner: tpl.TriggerMetadata.Repo.Did, 529 RepoName: tpl.TriggerMetadata.Repo.Repo, 530 Workflows: workflows, 531 }, pipelineId) 532 return nil 533 }, 534 OnFail: func(jobError error) { 535 s.l.Error("pipeline run failed", "error", jobError) 536 }, 537 }) 538 if !ok { 539 return fmt.Errorf("failed to enqueue pipeline: queue is full") 540 } 541 s.l.Info("pipeline enqueued successfully", "id", pipelineId) 542 543 // emit StatusPending for all workflows here (after successful enqueue) 544 for _, ewfs := range workflows { 545 for _, ewf := range ewfs { 546 err := s.db.StatusPending(models.WorkflowId{ 547 PipelineId: pipelineId, 548 Name: ewf.Name, 549 }, s.n) 550 if err != nil { 551 return fmt.Errorf("db.StatusPending: %w", err) 552 } 553 } 554 } 555 return nil 556} 557 558// newRepoPath creates a path to store repository by its did and rkey. 559// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 560func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 561 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 562} 563 564func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 565 scheme := "https://" 566 if s.cfg.Server.Dev { 567 scheme = "http://" 568 } 569 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 570} 571 572const RequiredVersion = "2.49.0" 573 574func ensureGitVersion() error { 575 v, err := git.Version() 576 if err != nil { 577 return fmt.Errorf("fetching git version: %w", err) 578 } 579 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 580 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 581 } 582 return nil 583}