A vibe coded tangled fork which supports pijul.
at f969b4e60ec01a75d72f24ab68ffdc5e3a207d09 545 lines 15 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log/slog" 10 "maps" 11 "net/http" 12 "path/filepath" 13 "sync" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/go-chi/chi/v5" 17 "github.com/go-git/go-git/v5/plumbing/object" 18 "github.com/hashicorp/go-version" 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/eventconsumer" 21 "tangled.org/core/eventconsumer/cursor" 22 "tangled.org/core/idresolver" 23 kgit "tangled.org/core/knotserver/git" 24 "tangled.org/core/log" 25 "tangled.org/core/notifier" 26 "tangled.org/core/rbac2" 27 "tangled.org/core/spindle/config" 28 "tangled.org/core/spindle/db" 29 "tangled.org/core/spindle/engine" 30 "tangled.org/core/spindle/engines/nixery" 31 "tangled.org/core/spindle/git" 32 "tangled.org/core/spindle/models" 33 "tangled.org/core/spindle/queue" 34 "tangled.org/core/spindle/secrets" 35 "tangled.org/core/spindle/xrpc" 36 "tangled.org/core/tap" 37 "tangled.org/core/tid" 38 "tangled.org/core/workflow" 39 "tangled.org/core/xrpc/serviceauth" 40) 41 42//go:embed motd 43var defaultMotd []byte 44 45type Spindle struct { 46 tap *tap.Client 47 db *db.DB 48 e *rbac2.Enforcer 49 l *slog.Logger 50 n *notifier.Notifier 51 engs map[string]models.Engine 52 jq *queue.Queue 53 cfg *config.Config 54 ks *eventconsumer.Consumer 55 res *idresolver.Resolver 56 vault secrets.Manager 57 motd []byte 58 motdMu sync.RWMutex 59} 60 61// New creates a new Spindle server with the provided configuration and engines. 62func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 63 logger := log.FromContext(ctx) 64 65 if err := ensureGitVersion(); err != nil { 66 return nil, fmt.Errorf("ensuring git version: %w", err) 67 } 68 69 d, err := db.Make(ctx, cfg.Server.DBPath()) 70 if err != nil { 71 return nil, fmt.Errorf("failed to setup db: %w", err) 72 } 73 74 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 75 if err != nil { 76 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 77 } 78 79 n := notifier.New() 80 81 var vault secrets.Manager 82 switch cfg.Server.Secrets.Provider { 83 case "openbao": 84 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 85 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 86 } 87 vault, err = secrets.NewOpenBaoManager( 88 cfg.Server.Secrets.OpenBao.ProxyAddr, 89 logger, 90 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 91 ) 92 if err != nil { 93 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 94 } 95 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 96 case "sqlite", "": 97 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 98 if err != nil { 99 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 100 } 101 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 102 default: 103 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 104 } 105 106 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 107 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 108 109 tap := tap.NewClient(cfg.Server.TapUrl, "") 110 111 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 112 113 spindle := &Spindle{ 114 tap: &tap, 115 e: e, 116 db: d, 117 l: logger, 118 n: &n, 119 engs: engines, 120 jq: jq, 121 cfg: cfg, 122 res: resolver, 123 vault: vault, 124 motd: defaultMotd, 125 } 126 127 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 128 if err != nil { 129 return nil, err 130 } 131 logger.Info("owner set", "did", cfg.Server.Owner) 132 133 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 134 if err != nil { 135 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 136 } 137 138 // spindle listen to knot stream for sh.tangled.git.refUpdate 139 // which will sync the local workflow files in spindle and enqueues the 140 // pipeline job for on-push workflows 141 ccfg := eventconsumer.NewConsumerConfig() 142 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 143 ccfg.Dev = cfg.Server.Dev 144 ccfg.ProcessFunc = spindle.processKnotStream 145 ccfg.CursorStore = cursorStore 146 knownKnots, err := d.Knots() 147 if err != nil { 148 return nil, err 149 } 150 for _, knot := range knownKnots { 151 logger.Info("adding source start", "knot", knot) 152 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 153 } 154 spindle.ks = eventconsumer.NewConsumer(*ccfg) 155 156 return spindle, nil 157} 158 159// DB returns the database instance. 160func (s *Spindle) DB() *db.DB { 161 return s.db 162} 163 164// Queue returns the job queue instance. 165func (s *Spindle) Queue() *queue.Queue { 166 return s.jq 167} 168 169// Engines returns the map of available engines. 170func (s *Spindle) Engines() map[string]models.Engine { 171 return s.engs 172} 173 174// Vault returns the secrets manager instance. 175func (s *Spindle) Vault() secrets.Manager { 176 return s.vault 177} 178 179// Notifier returns the notifier instance. 180func (s *Spindle) Notifier() *notifier.Notifier { 181 return s.n 182} 183 184// Enforcer returns the RBAC enforcer instance. 185func (s *Spindle) Enforcer() *rbac2.Enforcer { 186 return s.e 187} 188 189// SetMotdContent sets custom MOTD content, replacing the embedded default. 190func (s *Spindle) SetMotdContent(content []byte) { 191 s.motdMu.Lock() 192 defer s.motdMu.Unlock() 193 s.motd = content 194} 195 196// GetMotdContent returns the current MOTD content. 197func (s *Spindle) GetMotdContent() []byte { 198 s.motdMu.RLock() 199 defer s.motdMu.RUnlock() 200 return s.motd 201} 202 203// Start starts the Spindle server (blocking). 204func (s *Spindle) Start(ctx context.Context) error { 205 // starts a job queue runner in the background 206 s.jq.Start() 207 defer s.jq.Stop() 208 209 // Stop vault token renewal if it implements Stopper 210 if stopper, ok := s.vault.(secrets.Stopper); ok { 211 defer stopper.Stop() 212 } 213 214 go func() { 215 s.l.Info("starting knot event consumer") 216 s.ks.Start(ctx) 217 }() 218 219 // ensure server owner is tracked 220 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 221 return err 222 } 223 224 go func() { 225 s.l.Info("starting tap stream consumer") 226 s.tap.Connect(ctx, &tap.SimpleIndexer{ 227 EventHandler: s.processEvent, 228 }) 229 }() 230 231 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 232 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 233} 234 235func Run(ctx context.Context) error { 236 cfg, err := config.Load(ctx) 237 if err != nil { 238 return fmt.Errorf("failed to load config: %w", err) 239 } 240 241 nixeryEng, err := nixery.New(ctx, cfg) 242 if err != nil { 243 return err 244 } 245 246 s, err := New(ctx, cfg, map[string]models.Engine{ 247 "nixery": nixeryEng, 248 }) 249 if err != nil { 250 return err 251 } 252 253 return s.Start(ctx) 254} 255 256func (s *Spindle) Router() http.Handler { 257 mux := chi.NewRouter() 258 259 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 260 w.Write(s.GetMotdContent()) 261 }) 262 mux.HandleFunc("/events", s.Events) 263 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 264 265 mux.Mount("/xrpc", s.XrpcRouter()) 266 return mux 267} 268 269func (s *Spindle) XrpcRouter() http.Handler { 270 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 271 272 l := log.SubLogger(s.l, "xrpc") 273 274 x := xrpc.Xrpc{ 275 Logger: l, 276 Db: s.db, 277 Enforcer: s.e, 278 Engines: s.engs, 279 Config: s.cfg, 280 Resolver: s.res, 281 Vault: s.vault, 282 Notifier: s.Notifier(), 283 ServiceAuth: serviceAuth, 284 } 285 286 return x.Router() 287} 288 289func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 290 l := log.FromContext(ctx).With("handler", "processKnotStream") 291 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 292 if msg.Nsid == tangled.PipelineNSID { 293 return nil 294 tpl := tangled.Pipeline{} 295 err := json.Unmarshal(msg.EventJson, &tpl) 296 if err != nil { 297 fmt.Println("error unmarshalling", err) 298 return err 299 } 300 301 if tpl.TriggerMetadata == nil { 302 return fmt.Errorf("no trigger metadata found") 303 } 304 305 if tpl.TriggerMetadata.Repo == nil { 306 return fmt.Errorf("no repo data found") 307 } 308 309 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 310 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 311 } 312 313 // filter by repos 314 _, err = s.db.GetRepoWithName( 315 syntax.DID(tpl.TriggerMetadata.Repo.Did), 316 tpl.TriggerMetadata.Repo.Repo, 317 ) 318 if err != nil { 319 return fmt.Errorf("failed to get repo: %w", err) 320 } 321 322 pipelineId := models.PipelineId{ 323 Knot: src.Key(), 324 Rkey: msg.Rkey, 325 } 326 327 err = s.processPipeline(ctx, tpl, pipelineId) 328 if err != nil { 329 return err 330 } 331 } else if msg.Nsid == tangled.GitRefUpdateNSID { 332 event := tangled.GitRefUpdate{} 333 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 334 l.Error("error unmarshalling", "err", err) 335 return err 336 } 337 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 338 339 // resolve repo name to rkey 340 // TODO: git.refUpdate should respond with rkey instead of repo name 341 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 342 if err != nil { 343 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 344 } 345 346 // NOTE: we are blindly trusting the knot that it will return only repos it own 347 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 348 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 349 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 350 return fmt.Errorf("sync git repo: %w", err) 351 } 352 l.Info("synced git repo") 353 354 compiler := workflow.Compiler{ 355 Trigger: tangled.Pipeline_TriggerMetadata{ 356 Kind: string(workflow.TriggerKindPush), 357 Push: &tangled.Pipeline_PushTriggerData{ 358 Ref: event.Ref, 359 OldSha: event.OldSha, 360 NewSha: event.NewSha, 361 }, 362 Repo: &tangled.Pipeline_TriggerRepo{ 363 Did: repo.Did.String(), 364 Knot: repo.Knot, 365 Repo: repo.Name, 366 }, 367 }, 368 } 369 370 // load workflow definitions from rev (without spindle context) 371 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha) 372 if err != nil { 373 return fmt.Errorf("loading pipeline: %w", err) 374 } 375 if len(rawPipeline) == 0 { 376 l.Info("no workflow definition find for the repo. skipping the event") 377 return nil 378 } 379 tpl := compiler.Compile(compiler.Parse(rawPipeline)) 380 // TODO: pass compile error to workflow log 381 for _, w := range compiler.Diagnostics.Errors { 382 l.Error(w.String()) 383 } 384 for _, w := range compiler.Diagnostics.Warnings { 385 l.Warn(w.String()) 386 } 387 388 pipelineId := models.PipelineId{ 389 Knot: tpl.TriggerMetadata.Repo.Knot, 390 Rkey: tid.TID(), 391 } 392 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil { 393 l.Error("failed to create pipeline event", "err", err) 394 return nil 395 } 396 err = s.processPipeline(ctx, tpl, pipelineId) 397 if err != nil { 398 return err 399 } 400 } 401 402 return nil 403} 404 405func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) { 406 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil { 407 return nil, fmt.Errorf("syncing git repo: %w", err) 408 } 409 gr, err := kgit.Open(repoPath, rev) 410 if err != nil { 411 return nil, fmt.Errorf("opening git repo: %w", err) 412 } 413 414 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 415 if errors.Is(err, object.ErrDirectoryNotFound) { 416 // return empty RawPipeline when directory doesn't exist 417 return nil, nil 418 } else if err != nil { 419 return nil, fmt.Errorf("loading file tree: %w", err) 420 } 421 422 var rawPipeline workflow.RawPipeline 423 for _, e := range workflowDir { 424 if !e.IsFile() { 425 continue 426 } 427 428 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 429 contents, err := gr.RawContent(fpath) 430 if err != nil { 431 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 432 } 433 434 rawPipeline = append(rawPipeline, workflow.RawWorkflow{ 435 Name: e.Name, 436 Contents: contents, 437 }) 438 } 439 440 return rawPipeline, nil 441} 442 443func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error { 444 // Build pipeline environment variables once for all workflows 445 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 446 447 // filter & init workflows 448 workflows := make(map[models.Engine][]models.Workflow) 449 for _, w := range tpl.Workflows { 450 if w == nil { 451 continue 452 } 453 if _, ok := s.engs[w.Engine]; !ok { 454 err := s.db.StatusFailed(models.WorkflowId{ 455 PipelineId: pipelineId, 456 Name: w.Name, 457 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 458 if err != nil { 459 return fmt.Errorf("db.StatusFailed: %w", err) 460 } 461 462 continue 463 } 464 465 eng := s.engs[w.Engine] 466 467 if _, ok := workflows[eng]; !ok { 468 workflows[eng] = []models.Workflow{} 469 } 470 471 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 472 if err != nil { 473 return fmt.Errorf("init workflow: %w", err) 474 } 475 476 // inject TANGLED_* env vars after InitWorkflow 477 // This prevents user-defined env vars from overriding them 478 if ewf.Environment == nil { 479 ewf.Environment = make(map[string]string) 480 } 481 maps.Copy(ewf.Environment, pipelineEnv) 482 483 workflows[eng] = append(workflows[eng], *ewf) 484 } 485 486 // enqueue pipeline 487 ok := s.jq.Enqueue(queue.Job{ 488 Run: func() error { 489 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 490 RepoOwner: tpl.TriggerMetadata.Repo.Did, 491 RepoName: tpl.TriggerMetadata.Repo.Repo, 492 Workflows: workflows, 493 }, pipelineId) 494 return nil 495 }, 496 OnFail: func(jobError error) { 497 s.l.Error("pipeline run failed", "error", jobError) 498 }, 499 }) 500 if !ok { 501 return fmt.Errorf("failed to enqueue pipeline: queue is full") 502 } 503 s.l.Info("pipeline enqueued successfully", "id", pipelineId) 504 505 // emit StatusPending for all workflows here (after successful enqueue) 506 for _, ewfs := range workflows { 507 for _, ewf := range ewfs { 508 err := s.db.StatusPending(models.WorkflowId{ 509 PipelineId: pipelineId, 510 Name: ewf.Name, 511 }, s.n) 512 if err != nil { 513 return fmt.Errorf("db.StatusPending: %w", err) 514 } 515 } 516 } 517 return nil 518} 519 520// newRepoPath creates a path to store repository by its did and rkey. 521// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 522func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 523 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 524} 525 526func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 527 scheme := "https://" 528 if s.cfg.Server.Dev { 529 scheme = "http://" 530 } 531 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 532} 533 534const RequiredVersion = "2.49.0" 535 536func ensureGitVersion() error { 537 v, err := git.Version() 538 if err != nil { 539 return fmt.Errorf("fetching git version: %w", err) 540 } 541 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 542 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 543 } 544 return nil 545}