A vibe coded tangled fork which supports pijul.
at 8a07e7947b3c92552bb6b349dfe4fb63888eadbc 382 lines 9.8 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/go-chi/chi/v5" 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/eventconsumer" 17 "tangled.org/core/eventconsumer/cursor" 18 "tangled.org/core/idresolver" 19 "tangled.org/core/log" 20 "tangled.org/core/notifier" 21 "tangled.org/core/rbac2" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/engines/nixery" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/queue" 28 "tangled.org/core/spindle/secrets" 29 "tangled.org/core/spindle/xrpc" 30 "tangled.org/core/tap" 31 "tangled.org/core/xrpc/serviceauth" 32) 33 34//go:embed motd 35var defaultMotd []byte 36 37type Spindle struct { 38 tap *tap.Client 39 db *db.DB 40 e *rbac2.Enforcer 41 l *slog.Logger 42 n *notifier.Notifier 43 engs map[string]models.Engine 44 jq *queue.Queue 45 cfg *config.Config 46 ks *eventconsumer.Consumer 47 res *idresolver.Resolver 48 vault secrets.Manager 49 motd []byte 50 motdMu sync.RWMutex 51} 52 53// New creates a new Spindle server with the provided configuration and engines. 54func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 55 logger := log.FromContext(ctx) 56 57 d, err := db.Make(ctx, cfg.Server.DBPath) 58 if err != nil { 59 return nil, fmt.Errorf("failed to setup db: %w", err) 60 } 61 62 e, err := rbac2.NewEnforcer(cfg.Server.DBPath) 63 if err != nil { 64 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 65 } 66 67 n := notifier.New() 68 69 var vault secrets.Manager 70 switch cfg.Server.Secrets.Provider { 71 case "openbao": 72 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 73 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 74 } 75 vault, err = secrets.NewOpenBaoManager( 76 cfg.Server.Secrets.OpenBao.ProxyAddr, 77 logger, 78 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 79 ) 80 if err != nil { 81 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 82 } 83 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 84 case "sqlite", "": 85 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 86 if err != nil { 87 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 88 } 89 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 90 default: 91 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 92 } 93 94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 95 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 96 97 tap := tap.NewClient(cfg.Server.TapUrl, "") 98 99 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 100 101 spindle := &Spindle{ 102 tap: &tap, 103 e: e, 104 db: d, 105 l: logger, 106 n: &n, 107 engs: engines, 108 jq: jq, 109 cfg: cfg, 110 res: resolver, 111 vault: vault, 112 motd: defaultMotd, 113 } 114 115 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 116 if err != nil { 117 return nil, err 118 } 119 logger.Info("owner set", "did", cfg.Server.Owner) 120 121 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 122 if err != nil { 123 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 124 } 125 126 // for each incoming sh.tangled.pipeline, we execute 127 // spindle.processPipeline, which in turn enqueues the pipeline 128 // job in the above registered queue. 129 ccfg := eventconsumer.NewConsumerConfig() 130 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 131 ccfg.Dev = cfg.Server.Dev 132 ccfg.ProcessFunc = spindle.processPipeline 133 ccfg.CursorStore = cursorStore 134 knownKnots, err := d.Knots() 135 if err != nil { 136 return nil, err 137 } 138 for _, knot := range knownKnots { 139 logger.Info("adding source start", "knot", knot) 140 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 141 } 142 spindle.ks = eventconsumer.NewConsumer(*ccfg) 143 144 return spindle, nil 145} 146 147// DB returns the database instance. 148func (s *Spindle) DB() *db.DB { 149 return s.db 150} 151 152// Queue returns the job queue instance. 153func (s *Spindle) Queue() *queue.Queue { 154 return s.jq 155} 156 157// Engines returns the map of available engines. 158func (s *Spindle) Engines() map[string]models.Engine { 159 return s.engs 160} 161 162// Vault returns the secrets manager instance. 163func (s *Spindle) Vault() secrets.Manager { 164 return s.vault 165} 166 167// Notifier returns the notifier instance. 168func (s *Spindle) Notifier() *notifier.Notifier { 169 return s.n 170} 171 172// Enforcer returns the RBAC enforcer instance. 173func (s *Spindle) Enforcer() *rbac2.Enforcer { 174 return s.e 175} 176 177// SetMotdContent sets custom MOTD content, replacing the embedded default. 178func (s *Spindle) SetMotdContent(content []byte) { 179 s.motdMu.Lock() 180 defer s.motdMu.Unlock() 181 s.motd = content 182} 183 184// GetMotdContent returns the current MOTD content. 185func (s *Spindle) GetMotdContent() []byte { 186 s.motdMu.RLock() 187 defer s.motdMu.RUnlock() 188 return s.motd 189} 190 191// Start starts the Spindle server (blocking). 192func (s *Spindle) Start(ctx context.Context) error { 193 // starts a job queue runner in the background 194 s.jq.Start() 195 defer s.jq.Stop() 196 197 // Stop vault token renewal if it implements Stopper 198 if stopper, ok := s.vault.(secrets.Stopper); ok { 199 defer stopper.Stop() 200 } 201 202 go func() { 203 s.l.Info("starting knot event consumer") 204 s.ks.Start(ctx) 205 }() 206 207 // ensure server owner is tracked 208 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 209 return err 210 } 211 212 go func() { 213 s.l.Info("starting tap stream consumer") 214 s.tap.Connect(ctx, &tap.SimpleIndexer{ 215 EventHandler: s.processEvent, 216 }) 217 }() 218 219 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 220 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 221} 222 223func Run(ctx context.Context) error { 224 cfg, err := config.Load(ctx) 225 if err != nil { 226 return fmt.Errorf("failed to load config: %w", err) 227 } 228 229 nixeryEng, err := nixery.New(ctx, cfg) 230 if err != nil { 231 return err 232 } 233 234 s, err := New(ctx, cfg, map[string]models.Engine{ 235 "nixery": nixeryEng, 236 }) 237 if err != nil { 238 return err 239 } 240 241 return s.Start(ctx) 242} 243 244func (s *Spindle) Router() http.Handler { 245 mux := chi.NewRouter() 246 247 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 248 w.Write(s.GetMotdContent()) 249 }) 250 mux.HandleFunc("/events", s.Events) 251 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 252 253 mux.Mount("/xrpc", s.XrpcRouter()) 254 return mux 255} 256 257func (s *Spindle) XrpcRouter() http.Handler { 258 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 259 260 l := log.SubLogger(s.l, "xrpc") 261 262 x := xrpc.Xrpc{ 263 Logger: l, 264 Db: s.db, 265 Enforcer: s.e, 266 Engines: s.engs, 267 Config: s.cfg, 268 Resolver: s.res, 269 Vault: s.vault, 270 Notifier: s.Notifier(), 271 ServiceAuth: serviceAuth, 272 } 273 274 return x.Router() 275} 276 277func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 278 if msg.Nsid == tangled.PipelineNSID { 279 tpl := tangled.Pipeline{} 280 err := json.Unmarshal(msg.EventJson, &tpl) 281 if err != nil { 282 fmt.Println("error unmarshalling", err) 283 return err 284 } 285 286 if tpl.TriggerMetadata == nil { 287 return fmt.Errorf("no trigger metadata found") 288 } 289 290 if tpl.TriggerMetadata.Repo == nil { 291 return fmt.Errorf("no repo data found") 292 } 293 294 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 295 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 296 } 297 298 // filter by repos 299 _, err = s.db.GetRepoWithName( 300 syntax.DID(tpl.TriggerMetadata.Repo.Did), 301 tpl.TriggerMetadata.Repo.Repo, 302 ) 303 if err != nil { 304 return fmt.Errorf("failed to get repo: %w", err) 305 } 306 307 pipelineId := models.PipelineId{ 308 Knot: src.Key(), 309 Rkey: msg.Rkey, 310 } 311 312 workflows := make(map[models.Engine][]models.Workflow) 313 314 // Build pipeline environment variables once for all workflows 315 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 316 317 for _, w := range tpl.Workflows { 318 if w != nil { 319 if _, ok := s.engs[w.Engine]; !ok { 320 err = s.db.StatusFailed(models.WorkflowId{ 321 PipelineId: pipelineId, 322 Name: w.Name, 323 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 324 if err != nil { 325 return fmt.Errorf("db.StatusFailed: %w", err) 326 } 327 328 continue 329 } 330 331 eng := s.engs[w.Engine] 332 333 if _, ok := workflows[eng]; !ok { 334 workflows[eng] = []models.Workflow{} 335 } 336 337 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 338 if err != nil { 339 return fmt.Errorf("init workflow: %w", err) 340 } 341 342 // inject TANGLED_* env vars after InitWorkflow 343 // This prevents user-defined env vars from overriding them 344 if ewf.Environment == nil { 345 ewf.Environment = make(map[string]string) 346 } 347 maps.Copy(ewf.Environment, pipelineEnv) 348 349 workflows[eng] = append(workflows[eng], *ewf) 350 351 err = s.db.StatusPending(models.WorkflowId{ 352 PipelineId: pipelineId, 353 Name: w.Name, 354 }, s.n) 355 if err != nil { 356 return fmt.Errorf("db.StatusPending: %w", err) 357 } 358 } 359 } 360 361 ok := s.jq.Enqueue(queue.Job{ 362 Run: func() error { 363 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 364 RepoOwner: tpl.TriggerMetadata.Repo.Did, 365 RepoName: tpl.TriggerMetadata.Repo.Repo, 366 Workflows: workflows, 367 }, pipelineId) 368 return nil 369 }, 370 OnFail: func(jobError error) { 371 s.l.Error("pipeline run failed", "error", jobError) 372 }, 373 }) 374 if ok { 375 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 376 } else { 377 s.l.Error("failed to enqueue pipeline: queue is full") 378 } 379 } 380 381 return nil 382}