A vibe coded tangled fork which supports pijul.
at a30765c1df1e36a4797dc36a12daff09dbee675c 581 lines 16 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "path/filepath" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/bluesky-social/indigo/service/tap" 18 "github.com/go-chi/chi/v5" 19 "github.com/go-git/go-git/v5/plumbing/object" 20 "github.com/hashicorp/go-version" 21 "tangled.org/core/api/tangled" 22 "tangled.org/core/eventconsumer" 23 "tangled.org/core/eventconsumer/cursor" 24 "tangled.org/core/idresolver" 25 kgit "tangled.org/core/knotserver/git" 26 "tangled.org/core/log" 27 "tangled.org/core/notifier" 28 "tangled.org/core/rbac2" 29 "tangled.org/core/spindle/config" 30 "tangled.org/core/spindle/db" 31 "tangled.org/core/spindle/engine" 32 "tangled.org/core/spindle/engines/nixery" 33 "tangled.org/core/spindle/git" 34 "tangled.org/core/spindle/models" 35 "tangled.org/core/spindle/queue" 36 "tangled.org/core/spindle/secrets" 37 "tangled.org/core/spindle/xrpc" 38 "tangled.org/core/tapc" 39 "tangled.org/core/tid" 40 "tangled.org/core/workflow" 41 "tangled.org/core/xrpc/serviceauth" 42) 43 44//go:embed motd 45var defaultMotd []byte 46 47type Spindle struct { 48 tap *tapc.Client 49 db *db.DB 50 e *rbac2.Enforcer 51 l *slog.Logger 52 n *notifier.Notifier 53 engs map[string]models.Engine 54 jq *queue.Queue 55 cfg *config.Config 56 ks *eventconsumer.Consumer 57 res *idresolver.Resolver 58 vault secrets.Manager 59 motd []byte 60 motdMu sync.RWMutex 61} 62 63// New creates a new Spindle server with the provided configuration and engines. 64func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 65 logger := log.FromContext(ctx) 66 67 if err := ensureGitVersion(); err != nil { 68 return nil, fmt.Errorf("ensuring git version: %w", err) 69 } 70 71 d, err := db.Make(ctx, cfg.Server.DBPath()) 72 if err != nil { 73 return nil, fmt.Errorf("failed to setup db: %w", err) 74 } 75 76 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 77 if err != nil { 78 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 79 } 80 81 n := notifier.New() 82 83 var vault secrets.Manager 84 switch cfg.Server.Secrets.Provider { 85 case "openbao": 86 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 87 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 88 } 89 vault, err = secrets.NewOpenBaoManager( 90 cfg.Server.Secrets.OpenBao.ProxyAddr, 91 logger, 92 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 93 ) 94 if err != nil { 95 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 96 } 97 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 98 case "sqlite", "": 99 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 102 } 103 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 104 default: 105 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 106 } 107 108 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 109 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 110 111 tap := tapc.NewClient("http://localhost:"+cfg.Server.TapPort, "") 112 113 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 114 115 spindle := &Spindle{ 116 tap: &tap, 117 e: e, 118 db: d, 119 l: logger, 120 n: &n, 121 engs: engines, 122 jq: jq, 123 cfg: cfg, 124 res: resolver, 125 vault: vault, 126 motd: defaultMotd, 127 } 128 129 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 130 if err != nil { 131 return nil, err 132 } 133 logger.Info("owner set", "did", cfg.Server.Owner) 134 135 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 136 if err != nil { 137 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 138 } 139 140 // spindle listen to knot stream for sh.tangled.git.refUpdate 141 // which will sync the local workflow files in spindle and enqueues the 142 // pipeline job for on-push workflows 143 ccfg := eventconsumer.NewConsumerConfig() 144 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 145 ccfg.Dev = cfg.Server.Dev 146 ccfg.ProcessFunc = spindle.processKnotStream 147 ccfg.CursorStore = cursorStore 148 knownKnots, err := d.Knots() 149 if err != nil { 150 return nil, err 151 } 152 for _, knot := range knownKnots { 153 logger.Info("adding source start", "knot", knot) 154 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 155 } 156 spindle.ks = eventconsumer.NewConsumer(*ccfg) 157 158 return spindle, nil 159} 160 161// DB returns the database instance. 162func (s *Spindle) DB() *db.DB { 163 return s.db 164} 165 166// Queue returns the job queue instance. 167func (s *Spindle) Queue() *queue.Queue { 168 return s.jq 169} 170 171// Engines returns the map of available engines. 172func (s *Spindle) Engines() map[string]models.Engine { 173 return s.engs 174} 175 176// Vault returns the secrets manager instance. 177func (s *Spindle) Vault() secrets.Manager { 178 return s.vault 179} 180 181// Notifier returns the notifier instance. 182func (s *Spindle) Notifier() *notifier.Notifier { 183 return s.n 184} 185 186// Enforcer returns the RBAC enforcer instance. 187func (s *Spindle) Enforcer() *rbac2.Enforcer { 188 return s.e 189} 190 191// SetMotdContent sets custom MOTD content, replacing the embedded default. 192func (s *Spindle) SetMotdContent(content []byte) { 193 s.motdMu.Lock() 194 defer s.motdMu.Unlock() 195 s.motd = content 196} 197 198// GetMotdContent returns the current MOTD content. 199func (s *Spindle) GetMotdContent() []byte { 200 s.motdMu.RLock() 201 defer s.motdMu.RUnlock() 202 return s.motd 203} 204 205// Start starts the Spindle server (blocking). 206func (s *Spindle) Start(ctx context.Context) error { 207 svcErr := make(chan error, 1) 208 209 // starts a job queue runner in the background 210 s.jq.Start() 211 defer s.jq.Stop() 212 213 // Stop vault token renewal if it implements Stopper 214 if stopper, ok := s.vault.(secrets.Stopper); ok { 215 defer stopper.Stop() 216 } 217 218 go func() { 219 s.l.Info("starting knot event consumer") 220 s.ks.Start(ctx) 221 }() 222 223 tap, err := tap.New(tap.Config{ 224 DatabaseURL: s.cfg.Server.TapDBUrl, 225 DBMaxConns: 32, 226 PLCURL: s.cfg.Server.PlcUrl, 227 RelayUrl: s.cfg.Server.RelayUrl, 228 FirehoseParallelism: 10, 229 ResyncParallelism: 5, 230 OutboxParallelism: 1, 231 FirehoseCursorSaveInterval: 1 * time.Second, 232 RepoFetchTimeout: 300 * time.Second, 233 IdentityCacheSize: 2_000_000, 234 EventCacheSize: 100_000, 235 SignalCollection: tangled.RepoPullNSID, // HACK: listen for repo.pull from non-tangled users 236 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.RepoPullNSID, tangled.SpindleMemberNSID}, 237 RetryTimeout: 3 * time.Second, 238 }) 239 if err != nil { 240 return err 241 } 242 go func() { 243 if err := tap.RunTap(ctx, ":"+s.cfg.Server.TapPort); err != nil { 244 svcErr <- err 245 } 246 }() 247 go func() { 248 s.l.Info("starting embedded tap server") 249 250 s.l.Info("starting tap stream consumer") 251 s.tap.Connect(ctx, &tapc.SimpleIndexer{ 252 EventHandler: s.processEvent, 253 }) 254 }() 255 256 s.l.Debug("waiting until tap connection") 257 if err := s.tap.Wait(ctx); err != nil { 258 return err 259 } 260 261 // ensure server owner is tracked 262 s.l.Debug("adding server owner to tap") 263 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 264 return err 265 } 266 267 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 268 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 269} 270 271func Run(ctx context.Context) error { 272 cfg, err := config.Load(ctx) 273 if err != nil { 274 return fmt.Errorf("failed to load config: %w", err) 275 } 276 277 nixeryEng, err := nixery.New(ctx, cfg) 278 if err != nil { 279 return err 280 } 281 282 s, err := New(ctx, cfg, map[string]models.Engine{ 283 "nixery": nixeryEng, 284 }) 285 if err != nil { 286 return err 287 } 288 289 return s.Start(ctx) 290} 291 292func (s *Spindle) Router() http.Handler { 293 mux := chi.NewRouter() 294 295 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 296 w.Write(s.GetMotdContent()) 297 }) 298 mux.HandleFunc("/events", s.Events) 299 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 300 301 mux.Mount("/xrpc", s.XrpcRouter()) 302 return mux 303} 304 305func (s *Spindle) XrpcRouter() http.Handler { 306 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 307 308 l := log.SubLogger(s.l, "xrpc") 309 310 x := xrpc.Xrpc{ 311 Logger: l, 312 Db: s.db, 313 Enforcer: s.e, 314 Engines: s.engs, 315 Config: s.cfg, 316 Resolver: s.res, 317 Vault: s.vault, 318 Notifier: s.Notifier(), 319 ServiceAuth: serviceAuth, 320 } 321 322 return x.Router() 323} 324 325func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 326 l := log.FromContext(ctx).With("handler", "processKnotStream") 327 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 328 if msg.Nsid == tangled.PipelineNSID { 329 return nil 330 tpl := tangled.Pipeline{} 331 err := json.Unmarshal(msg.EventJson, &tpl) 332 if err != nil { 333 fmt.Println("error unmarshalling", err) 334 return err 335 } 336 337 if tpl.TriggerMetadata == nil { 338 return fmt.Errorf("no trigger metadata found") 339 } 340 341 if tpl.TriggerMetadata.Repo == nil { 342 return fmt.Errorf("no repo data found") 343 } 344 345 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 346 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 347 } 348 349 // filter by repos 350 _, err = s.db.GetRepoWithName( 351 syntax.DID(tpl.TriggerMetadata.Repo.Did), 352 tpl.TriggerMetadata.Repo.Repo, 353 ) 354 if err != nil { 355 return fmt.Errorf("failed to get repo: %w", err) 356 } 357 358 pipelineId := models.PipelineId{ 359 Knot: src.Key(), 360 Rkey: msg.Rkey, 361 } 362 363 err = s.processPipeline(ctx, tpl, pipelineId) 364 if err != nil { 365 return err 366 } 367 } else if msg.Nsid == tangled.GitRefUpdateNSID { 368 event := tangled.GitRefUpdate{} 369 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 370 l.Error("error unmarshalling", "err", err) 371 return err 372 } 373 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 374 375 // resolve repo name to rkey 376 // TODO: git.refUpdate should respond with rkey instead of repo name 377 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 378 if err != nil { 379 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 380 } 381 382 // NOTE: we are blindly trusting the knot that it will return only repos it own 383 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 384 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 385 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 386 return fmt.Errorf("sync git repo: %w", err) 387 } 388 l.Info("synced git repo") 389 390 compiler := workflow.Compiler{ 391 Trigger: tangled.Pipeline_TriggerMetadata{ 392 Kind: string(workflow.TriggerKindPush), 393 Push: &tangled.Pipeline_PushTriggerData{ 394 Ref: event.Ref, 395 OldSha: event.OldSha, 396 NewSha: event.NewSha, 397 }, 398 Repo: &tangled.Pipeline_TriggerRepo{ 399 Did: repo.Did.String(), 400 Knot: repo.Knot, 401 Repo: repo.Name, 402 }, 403 }, 404 } 405 406 // load workflow definitions from rev (without spindle context) 407 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 408 if err != nil { 409 return fmt.Errorf("loading pipeline: %w", err) 410 } 411 if len(rawPipeline) == 0 { 412 l.Info("no workflow definition find for the repo. skipping the event") 413 return nil 414 } 415 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 416 // TODO: pass compile error to workflow log 417 for _, w := range compiler.Diagnostics.Errors { 418 l.Error(w.String()) 419 } 420 for _, w := range compiler.Diagnostics.Warnings { 421 l.Warn(w.String()) 422 } 423 424 pipelineId := models.PipelineId{ 425 Knot: tpl.TriggerMetadata.Repo.Knot, 426 Rkey: tid.TID(), 427 } 428 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 429 l.Error("failed to create pipeline event", "err", err) 430 return nil 431 } 432 err = s.processPipeline(ctx, tpl, pipelineId) 433 if err != nil { 434 return err 435 } 436 } 437 438 return nil 439} 440 441func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 442 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 443 return nil, fmt.Errorf("syncing git repo: %w", err) 444 } 445 gr, err := kgit.Open(repoPath, rev) 446 if err != nil { 447 return nil, fmt.Errorf("opening git repo: %w", err) 448 } 449 450 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 451 if errors.Is(err, object.ErrDirectoryNotFound) { 452 // return empty RawPipeline when directory doesn't exist 453 return nil, nil 454 } else if err != nil { 455 return nil, fmt.Errorf("loading file tree: %w", err) 456 } 457 458 var rawPipeline workflow.RawPipeline 459 for _, e := range workflowDir { 460 if !e.IsFile() { 461 continue 462 } 463 464 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 465 contents, err := gr.RawContent(fpath) 466 if err != nil { 467 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 468 } 469 470 rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 471 Name: e.Name, 472 Contents: contents, 473 }) 474 } 475 476 return rawPipeline, nil 477} 478 479func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 480 // Build pipeline environment variables once for all workflows 481 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 482 483 // filter & init workflows 484 workflows := make(map[models.Engine][]models.Workflow) 485 for _, w := range tpl.Workflows { 486 if w == nil { 487 continue 488 } 489 if _, ok := s.engs[w.Engine]; !ok { 490 err := s.db.StatusFailed(models.WorkflowId{ 491 PipelineId: pipelineId, 492 Name: w.Name, 493 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 494 if err != nil { 495 return fmt.Errorf("db.StatusFailed: %w", err) 496 } 497 498 continue 499 } 500 501 eng := s.engs[w.Engine] 502 503 if _, ok := workflows[eng]; !ok { 504 workflows[eng] = []models.Workflow{} 505 } 506 507 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 508 if err != nil { 509 return fmt.Errorf("init workflow: %w", err) 510 } 511 512 // inject TANGLED_* env vars after InitWorkflow 513 // This prevents user-defined env vars from overriding them 514 if ewf.Environment == nil { 515 ewf.Environment = make(map[string]string) 516 } 517 maps.Copy(ewf.Environment, pipelineEnv) 518 519 workflows[eng] = append(workflows[eng], *ewf) 520 } 521 522 // enqueue pipeline 523 ok := s.jq.Enqueue(queue.Job{ 524 Run: func() error { 525 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 526 RepoOwner: tpl.TriggerMetadata.Repo.Did, 527 RepoName: tpl.TriggerMetadata.Repo.Repo, 528 Workflows: workflows, 529 }, pipelineId) 530 return nil 531 }, 532 OnFail: func(jobError error) { 533 s.l.Error("pipeline run failed", "error", jobError) 534 }, 535 }) 536 if !ok { 537 return fmt.Errorf("failed to enqueue pipeline: queue is full") 538 } 539 s.l.Info("pipeline enqueued successfully", "id", pipelineId) 540 541 // emit StatusPending for all workflows here (after successful enqueue) 542 for _, ewfs := range workflows { 543 for _, ewf := range ewfs { 544 err := s.db.StatusPending(models.WorkflowId{ 545 PipelineId: pipelineId, 546 Name: ewf.Name, 547 }, s.n) 548 if err != nil { 549 return fmt.Errorf("db.StatusPending: %w", err) 550 } 551 } 552 } 553 return nil 554} 555 556// newRepoPath creates a path to store repository by its did and rkey. 557// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 558func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 559 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 560} 561 562func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 563 scheme := "https://" 564 if s.cfg.Server.Dev { 565 scheme = "http://" 566 } 567 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 568} 569 570const RequiredVersion = "2.49.0" 571 572func ensureGitVersion() error { 573 v, err := git.Version() 574 if err != nil { 575 return fmt.Errorf("fetching git version: %w", err) 576 } 577 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 578 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 579 } 580 return nil 581}