A vibe coded tangled fork which supports pijul.
at 707a0909a1e5d6d69dc19257a0b9a81dfcc513aa 542 lines 15 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "path/filepath" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/bluesky-social/indigo/service/tap" 18 "github.com/go-chi/chi/v5" 19 "github.com/go-git/go-git/v5/plumbing/object" 20 "github.com/hashicorp/go-version" 21 "tangled.org/core/api/tangled" 22 "tangled.org/core/eventconsumer" 23 "tangled.org/core/eventconsumer/cursor" 24 "tangled.org/core/idresolver" 25 kgit "tangled.org/core/knotserver/git" 26 "tangled.org/core/log" 27 "tangled.org/core/notifier" 28 "tangled.org/core/rbac2" 29 "tangled.org/core/spindle/config" 30 "tangled.org/core/spindle/db" 31 "tangled.org/core/spindle/engine" 32 "tangled.org/core/spindle/engines/nixery" 33 "tangled.org/core/spindle/git" 34 "tangled.org/core/spindle/models" 35 "tangled.org/core/spindle/queue" 36 "tangled.org/core/spindle/secrets" 37 "tangled.org/core/spindle/xrpc" 38 "tangled.org/core/tapc" 39 "tangled.org/core/tid" 40 "tangled.org/core/workflow" 41 "tangled.org/core/xrpc/serviceauth" 42) 43 44//go:embed motd 45var defaultMotd []byte 46 47type Spindle struct { 48 tap *tapc.Client 49 db *db.DB 50 e *rbac2.Enforcer 51 l *slog.Logger 52 n *notifier.Notifier 53 engs map[string]models.Engine 54 jq *queue.Queue 55 cfg *config.Config 56 ks *eventconsumer.Consumer 57 res *idresolver.Resolver 58 vault secrets.Manager 59 motd []byte 60 motdMu sync.RWMutex 61} 62 63// New creates a new Spindle server with the provided configuration and engines. 64func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 65 logger := log.FromContext(ctx) 66 67 if err := ensureGitVersion(); err != nil { 68 return nil, fmt.Errorf("ensuring git version: %w", err) 69 } 70 71 d, err := db.Make(ctx, cfg.Server.DBPath()) 72 if err != nil { 73 return nil, fmt.Errorf("failed to setup db: %w", err) 74 } 75 76 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 77 if err != nil { 78 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 79 } 80 81 n := notifier.New() 82 83 var vault secrets.Manager 84 switch cfg.Server.Secrets.Provider { 85 case "openbao": 86 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 87 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 88 } 89 vault, err = secrets.NewOpenBaoManager( 90 cfg.Server.Secrets.OpenBao.ProxyAddr, 91 logger, 92 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 93 ) 94 if err != nil { 95 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 96 } 97 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 98 case "sqlite", "": 99 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 102 } 103 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 104 default: 105 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 106 } 107 108 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 109 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 110 111 tap := tapc.NewClient("http://localhost:"+cfg.Server.TapPort, "") 112 113 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 114 115 spindle := &Spindle{ 116 tap: &tap, 117 e: e, 118 db: d, 119 l: logger, 120 n: &n, 121 engs: engines, 122 jq: jq, 123 cfg: cfg, 124 res: resolver, 125 vault: vault, 126 motd: defaultMotd, 127 } 128 129 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 130 if err != nil { 131 return nil, err 132 } 133 logger.Info("owner set", "did", cfg.Server.Owner) 134 135 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 136 if err != nil { 137 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 138 } 139 140 // spindle listen to knot stream for sh.tangled.git.refUpdate 141 // which will sync the local workflow files in spindle and enqueues the 142 // pipeline job for on-push workflows 143 ccfg := eventconsumer.NewConsumerConfig() 144 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 145 ccfg.Dev = cfg.Server.Dev 146 ccfg.ProcessFunc = spindle.processKnotStream 147 ccfg.CursorStore = cursorStore 148 knownKnots, err := d.Knots() 149 if err != nil { 150 return nil, err 151 } 152 for _, knot := range knownKnots { 153 logger.Info("adding source start", "knot", knot) 154 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 155 } 156 spindle.ks = eventconsumer.NewConsumer(*ccfg) 157 158 return spindle, nil 159} 160 161// DB returns the database instance. 162func (s *Spindle) DB() *db.DB { 163 return s.db 164} 165 166// Queue returns the job queue instance. 167func (s *Spindle) Queue() *queue.Queue { 168 return s.jq 169} 170 171// Engines returns the map of available engines. 172func (s *Spindle) Engines() map[string]models.Engine { 173 return s.engs 174} 175 176// Vault returns the secrets manager instance. 177func (s *Spindle) Vault() secrets.Manager { 178 return s.vault 179} 180 181// Notifier returns the notifier instance. 182func (s *Spindle) Notifier() *notifier.Notifier { 183 return s.n 184} 185 186// Enforcer returns the RBAC enforcer instance. 187func (s *Spindle) Enforcer() *rbac2.Enforcer { 188 return s.e 189} 190 191// SetMotdContent sets custom MOTD content, replacing the embedded default. 192func (s *Spindle) SetMotdContent(content []byte) { 193 s.motdMu.Lock() 194 defer s.motdMu.Unlock() 195 s.motd = content 196} 197 198// GetMotdContent returns the current MOTD content. 199func (s *Spindle) GetMotdContent() []byte { 200 s.motdMu.RLock() 201 defer s.motdMu.RUnlock() 202 return s.motd 203} 204 205// Start starts the Spindle server (blocking). 206func (s *Spindle) Start(ctx context.Context) error { 207 svcErr := make(chan error, 1) 208 209 // starts a job queue runner in the background 210 s.jq.Start() 211 defer s.jq.Stop() 212 213 // Stop vault token renewal if it implements Stopper 214 if stopper, ok := s.vault.(secrets.Stopper); ok { 215 defer stopper.Stop() 216 } 217 218 go func() { 219 s.l.Info("starting knot event consumer") 220 s.ks.Start(ctx) 221 }() 222 223 tap, err := tap.New(tap.Config{ 224 DatabaseURL: s.cfg.Server.TapDBUrl, 225 DBMaxConns: 32, 226 PLCURL: s.cfg.Server.PlcUrl, 227 RelayUrl: s.cfg.Server.RelayUrl, 228 FirehoseParallelism: 10, 229 ResyncParallelism: 5, 230 OutboxParallelism: 1, 231 FirehoseCursorSaveInterval: 1 * time.Second, 232 RepoFetchTimeout: 300 * time.Second, 233 IdentityCacheSize: 2_000_000, 234 EventCacheSize: 100_000, 235 SignalCollection: tangled.RepoPullNSID, // HACK: listen for repo.pull from non-tangled users 236 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.RepoPullNSID, tangled.SpindleMemberNSID}, 237 RetryTimeout: 3 * time.Second, 238 }) 239 if err != nil { 240 return err 241 } 242 go func() { 243 if err := tap.RunTap(ctx, ":"+s.cfg.Server.TapPort); err != nil { 244 svcErr <- err 245 } 246 }() 247 go func() { 248 s.l.Info("starting embedded tap server") 249 250 s.l.Info("starting tap stream consumer") 251 s.tap.Connect(ctx, &tapc.SimpleIndexer{ 252 EventHandler: s.processEvent, 253 }) 254 }() 255 256 s.l.Debug("waiting until tap connection") 257 if err := s.tap.Wait(ctx); err != nil { 258 return err 259 } 260 261 // ensure server owner is tracked 262 s.l.Debug("adding server owner to tap") 263 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 264 return err 265 } 266 267 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 268 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 269} 270 271func Run(ctx context.Context) error { 272 cfg, err := config.Load(ctx) 273 if err != nil { 274 return fmt.Errorf("failed to load config: %w", err) 275 } 276 277 nixeryEng, err := nixery.New(ctx, cfg) 278 if err != nil { 279 return err 280 } 281 282 s, err := New(ctx, cfg, map[string]models.Engine{ 283 "nixery": nixeryEng, 284 }) 285 if err != nil { 286 return err 287 } 288 289 return s.Start(ctx) 290} 291 292func (s *Spindle) Router() http.Handler { 293 mux := chi.NewRouter() 294 295 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 296 w.Write(s.GetMotdContent()) 297 }) 298 mux.HandleFunc("/events", s.Events) 299 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 300 301 mux.Mount("/xrpc", s.XrpcRouter()) 302 return mux 303} 304 305func (s *Spindle) XrpcRouter() http.Handler { 306 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 307 308 l := log.SubLogger(s.l, "xrpc") 309 310 x := xrpc.Xrpc{ 311 Logger: l, 312 Db: s.db, 313 Enforcer: s.e, 314 Engines: s.engs, 315 Config: s.cfg, 316 Resolver: s.res, 317 Vault: s.vault, 318 Notifier: s.Notifier(), 319 ServiceAuth: serviceAuth, 320 } 321 322 return x.Router() 323} 324 325func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 326 l := log.FromContext(ctx).With("handler", "processKnotStream") 327 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 328 if msg.Nsid == tangled.GitRefUpdateNSID { 329 event := tangled.GitRefUpdate{} 330 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 331 l.Error("error unmarshalling", "err", err) 332 return err 333 } 334 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 335 336 // resolve repo name to rkey 337 // TODO: git.refUpdate should respond with rkey instead of repo name 338 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 339 if err != nil { 340 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 341 } 342 343 // NOTE: we are blindly trusting the knot that it will return only repos it own 344 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 345 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 346 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 347 return fmt.Errorf("sync git repo: %w", err) 348 } 349 l.Info("synced git repo") 350 351 compiler := workflow.Compiler{ 352 Trigger: tangled.Pipeline_TriggerMetadata{ 353 Kind: string(workflow.TriggerKindPush), 354 Push: &tangled.Pipeline_PushTriggerData{ 355 Ref: event.Ref, 356 OldSha: event.OldSha, 357 NewSha: event.NewSha, 358 }, 359 Repo: &tangled.Pipeline_TriggerRepo{ 360 Did: repo.Did.String(), 361 Knot: repo.Knot, 362 Repo: repo.Name, 363 }, 364 }, 365 } 366 367 // load workflow definitions from rev (without spindle context) 368 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 369 if err != nil { 370 return fmt.Errorf("loading pipeline: %w", err) 371 } 372 if len(rawPipeline) == 0 { 373 l.Info("no workflow definition find for the repo. skipping the event") 374 return nil 375 } 376 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 377 // TODO: pass compile error to workflow log 378 for _, w := range compiler.Diagnostics.Errors { 379 l.Error(w.String()) 380 } 381 for _, w := range compiler.Diagnostics.Warnings { 382 l.Warn(w.String()) 383 } 384 385 pipelineId := models.PipelineId{ 386 Knot: tpl.TriggerMetadata.Repo.Knot, 387 Rkey: tid.TID(), 388 } 389 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 390 l.Error("failed to create pipeline event", "err", err) 391 return nil 392 } 393 err = s.processPipeline(ctx, tpl, pipelineId) 394 if err != nil { 395 return err 396 } 397 } 398 399 return nil 400} 401 402func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 403 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 404 return nil, fmt.Errorf("syncing git repo: %w", err) 405 } 406 gr, err := kgit.Open(repoPath, rev) 407 if err != nil { 408 return nil, fmt.Errorf("opening git repo: %w", err) 409 } 410 411 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 412 if errors.Is(err, object.ErrDirectoryNotFound) { 413 // return empty RawPipeline when directory doesn't exist 414 return nil, nil 415 } else if err != nil { 416 return nil, fmt.Errorf("loading file tree: %w", err) 417 } 418 419 var rawPipeline workflow.RawPipeline 420 for _, e := range workflowDir { 421 if !e.IsFile() { 422 continue 423 } 424 425 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 426 contents, err := gr.RawContent(fpath) 427 if err != nil { 428 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 429 } 430 431 rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 432 Name: e.Name, 433 Contents: contents, 434 }) 435 } 436 437 return rawPipeline, nil 438} 439 440func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 441 // Build pipeline environment variables once for all workflows 442 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 443 444 // filter & init workflows 445 workflows := make(map[models.Engine][]models.Workflow) 446 for _, w := range tpl.Workflows { 447 if w == nil { 448 continue 449 } 450 if _, ok := s.engs[w.Engine]; !ok { 451 err := s.db.StatusFailed(models.WorkflowId{ 452 PipelineId: pipelineId, 453 Name: w.Name, 454 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 455 if err != nil { 456 return fmt.Errorf("db.StatusFailed: %w", err) 457 } 458 459 continue 460 } 461 462 eng := s.engs[w.Engine] 463 464 if _, ok := workflows[eng]; !ok { 465 workflows[eng] = []models.Workflow{} 466 } 467 468 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 469 if err != nil { 470 return fmt.Errorf("init workflow: %w", err) 471 } 472 473 // inject TANGLED_* env vars after InitWorkflow 474 // This prevents user-defined env vars from overriding them 475 if ewf.Environment == nil { 476 ewf.Environment = make(map[string]string) 477 } 478 maps.Copy(ewf.Environment, pipelineEnv) 479 480 workflows[eng] = append(workflows[eng], *ewf) 481 } 482 483 // enqueue pipeline 484 ok := s.jq.Enqueue(queue.Job{ 485 Run: func() error { 486 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 487 RepoOwner: tpl.TriggerMetadata.Repo.Did, 488 RepoName: tpl.TriggerMetadata.Repo.Repo, 489 Workflows: workflows, 490 }, pipelineId) 491 return nil 492 }, 493 OnFail: func(jobError error) { 494 s.l.Error("pipeline run failed", "error", jobError) 495 }, 496 }) 497 if !ok { 498 return fmt.Errorf("failed to enqueue pipeline: queue is full") 499 } 500 s.l.Info("pipeline enqueued successfully", "id", pipelineId) 501 502 // emit StatusPending for all workflows here (after successful enqueue) 503 for _, ewfs := range workflows { 504 for _, ewf := range ewfs { 505 err := s.db.StatusPending(models.WorkflowId{ 506 PipelineId: pipelineId, 507 Name: ewf.Name, 508 }, s.n) 509 if err != nil { 510 return fmt.Errorf("db.StatusPending: %w", err) 511 } 512 } 513 } 514 return nil 515} 516 517// newRepoPath creates a path to store repository by its did and rkey. 518// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 519func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 520 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 521} 522 523func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 524 scheme := "https://" 525 if s.cfg.Server.Dev { 526 scheme = "http://" 527 } 528 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 529} 530 531const RequiredVersion = "2.49.0" 532 533func ensureGitVersion() error { 534 v, err := git.Version() 535 if err != nil { 536 return fmt.Errorf("fetching git version: %w", err) 537 } 538 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 539 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 540 } 541 return nil 542}