A vibe coded tangled fork which supports pijul.
at a811fb9d6845e014b0a55538a7439a7df4173657 419 lines 11 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/service/tap" 16 "github.com/go-chi/chi/v5" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/eventconsumer" 19 "tangled.org/core/eventconsumer/cursor" 20 "tangled.org/core/idresolver" 21 "tangled.org/core/log" 22 "tangled.org/core/notifier" 23 "tangled.org/core/rbac2" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/db" 26 "tangled.org/core/spindle/engine" 27 "tangled.org/core/spindle/engines/nixery" 28 "tangled.org/core/spindle/models" 29 "tangled.org/core/spindle/queue" 30 "tangled.org/core/spindle/secrets" 31 "tangled.org/core/spindle/xrpc" 32 "tangled.org/core/tapc" 33 "tangled.org/core/xrpc/serviceauth" 34) 35 36//go:embed motd 37var defaultMotd []byte 38 39type Spindle struct { 40 tap *tapc.Client 41 db *db.DB 42 e *rbac2.Enforcer 43 l *slog.Logger 44 n *notifier.Notifier 45 engs map[string]models.Engine 46 jq *queue.Queue 47 cfg *config.Config 48 ks *eventconsumer.Consumer 49 res *idresolver.Resolver 50 vault secrets.Manager 51 motd []byte 52 motdMu sync.RWMutex 53} 54 55// New creates a new Spindle server with the provided configuration and engines. 56func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 57 logger := log.FromContext(ctx) 58 59 d, err := db.Make(ctx, cfg.Server.DBPath) 60 if err != nil { 61 return nil, fmt.Errorf("failed to setup db: %w", err) 62 } 63 64 e, err := rbac2.NewEnforcer(cfg.Server.DBPath) 65 if err != nil { 66 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 67 } 68 69 n := notifier.New() 70 71 var vault secrets.Manager 72 switch cfg.Server.Secrets.Provider { 73 case "openbao": 74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 75 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 76 } 77 vault, err = secrets.NewOpenBaoManager( 78 cfg.Server.Secrets.OpenBao.ProxyAddr, 79 logger, 80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 81 ) 82 if err != nil { 83 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 84 } 85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 86 case "sqlite", "": 87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 88 if err != nil { 89 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 90 } 91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 92 default: 93 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 94 } 95 96 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 97 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 98 99 tap := tapc.NewClient("http://localhost:"+cfg.Server.TapPort, "") 100 101 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 102 103 spindle := &Spindle{ 104 tap: &tap, 105 e: e, 106 db: d, 107 l: logger, 108 n: &n, 109 engs: engines, 110 jq: jq, 111 cfg: cfg, 112 res: resolver, 113 vault: vault, 114 motd: defaultMotd, 115 } 116 117 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 118 if err != nil { 119 return nil, err 120 } 121 logger.Info("owner set", "did", cfg.Server.Owner) 122 123 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 124 if err != nil { 125 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 126 } 127 128 // for each incoming sh.tangled.pipeline, we execute 129 // spindle.processPipeline, which in turn enqueues the pipeline 130 // job in the above registered queue. 131 ccfg := eventconsumer.NewConsumerConfig() 132 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 133 ccfg.Dev = cfg.Server.Dev 134 ccfg.ProcessFunc = spindle.processPipeline 135 ccfg.CursorStore = cursorStore 136 knownKnots, err := d.Knots() 137 if err != nil { 138 return nil, err 139 } 140 for _, knot := range knownKnots { 141 logger.Info("adding source start", "knot", knot) 142 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 143 } 144 spindle.ks = eventconsumer.NewConsumer(*ccfg) 145 146 return spindle, nil 147} 148 149// DB returns the database instance. 150func (s *Spindle) DB() *db.DB { 151 return s.db 152} 153 154// Queue returns the job queue instance. 155func (s *Spindle) Queue() *queue.Queue { 156 return s.jq 157} 158 159// Engines returns the map of available engines. 160func (s *Spindle) Engines() map[string]models.Engine { 161 return s.engs 162} 163 164// Vault returns the secrets manager instance. 165func (s *Spindle) Vault() secrets.Manager { 166 return s.vault 167} 168 169// Notifier returns the notifier instance. 170func (s *Spindle) Notifier() *notifier.Notifier { 171 return s.n 172} 173 174// Enforcer returns the RBAC enforcer instance. 175func (s *Spindle) Enforcer() *rbac2.Enforcer { 176 return s.e 177} 178 179// SetMotdContent sets custom MOTD content, replacing the embedded default. 180func (s *Spindle) SetMotdContent(content []byte) { 181 s.motdMu.Lock() 182 defer s.motdMu.Unlock() 183 s.motd = content 184} 185 186// GetMotdContent returns the current MOTD content. 187func (s *Spindle) GetMotdContent() []byte { 188 s.motdMu.RLock() 189 defer s.motdMu.RUnlock() 190 return s.motd 191} 192 193// Start starts the Spindle server (blocking). 194func (s *Spindle) Start(ctx context.Context) error { 195 svcErr := make(chan error, 1) 196 197 // starts a job queue runner in the background 198 s.jq.Start() 199 defer s.jq.Stop() 200 201 // Stop vault token renewal if it implements Stopper 202 if stopper, ok := s.vault.(secrets.Stopper); ok { 203 defer stopper.Stop() 204 } 205 206 go func() { 207 s.l.Info("starting knot event consumer") 208 s.ks.Start(ctx) 209 }() 210 211 tap, err := tap.New(tap.Config{ 212 DatabaseURL: s.cfg.Server.TapDBUrl, 213 DBMaxConns: 32, 214 PLCURL: s.cfg.Server.PlcUrl, 215 RelayUrl: s.cfg.Server.RelayUrl, 216 FirehoseParallelism: 10, 217 ResyncParallelism: 5, 218 OutboxParallelism: 1, 219 FirehoseCursorSaveInterval: 1 * time.Second, 220 RepoFetchTimeout: 300 * time.Second, 221 IdentityCacheSize: 2_000_000, 222 EventCacheSize: 100_000, 223 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.SpindleMemberNSID}, 224 RetryTimeout: 3 * time.Second, 225 }) 226 if err != nil { 227 return err 228 } 229 go func() { 230 if err := tap.RunTap(ctx, ":"+s.cfg.Server.TapPort); err != nil { 231 svcErr <- err 232 } 233 }() 234 go func() { 235 s.l.Info("starting embedded tap server") 236 237 s.l.Info("starting tap stream consumer") 238 s.tap.Connect(ctx, &tapc.SimpleIndexer{ 239 EventHandler: s.processEvent, 240 }) 241 }() 242 243 s.l.Debug("waiting for tap readiness") 244 tapReadyCtx, cancel := context.WithTimeout(ctx, 10*time.Second) 245 defer cancel() 246 if err := s.tap.WaitReady(tapReadyCtx); err != nil { 247 return fmt.Errorf("tap not ready: %w", err) 248 } 249 250 // ensure server owner is tracked 251 s.l.Debug("adding server owner to tap") 252 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 253 return err 254 } 255 256 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 257 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 258} 259 260func Run(ctx context.Context) error { 261 cfg, err := config.Load(ctx) 262 if err != nil { 263 return fmt.Errorf("failed to load config: %w", err) 264 } 265 266 nixeryEng, err := nixery.New(ctx, cfg) 267 if err != nil { 268 return err 269 } 270 271 s, err := New(ctx, cfg, map[string]models.Engine{ 272 "nixery": nixeryEng, 273 }) 274 if err != nil { 275 return err 276 } 277 278 return s.Start(ctx) 279} 280 281func (s *Spindle) Router() http.Handler { 282 mux := chi.NewRouter() 283 284 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 285 w.Write(s.GetMotdContent()) 286 }) 287 mux.HandleFunc("/events", s.Events) 288 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 289 290 mux.Mount("/xrpc", s.XrpcRouter()) 291 return mux 292} 293 294func (s *Spindle) XrpcRouter() http.Handler { 295 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 296 297 l := log.SubLogger(s.l, "xrpc") 298 299 x := xrpc.Xrpc{ 300 Logger: l, 301 Db: s.db, 302 Enforcer: s.e, 303 Engines: s.engs, 304 Config: s.cfg, 305 Resolver: s.res, 306 Vault: s.vault, 307 Notifier: s.Notifier(), 308 ServiceAuth: serviceAuth, 309 } 310 311 return x.Router() 312} 313 314func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 315 if msg.Nsid == tangled.PipelineNSID { 316 tpl := tangled.Pipeline{} 317 err := json.Unmarshal(msg.EventJson, &tpl) 318 if err != nil { 319 fmt.Println("error unmarshalling", err) 320 return err 321 } 322 323 if tpl.TriggerMetadata == nil { 324 return fmt.Errorf("no trigger metadata found") 325 } 326 327 if tpl.TriggerMetadata.Repo == nil { 328 return fmt.Errorf("no repo data found") 329 } 330 331 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 332 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 333 } 334 335 // filter by repos 336 _, err = s.db.GetRepoWithName( 337 syntax.DID(tpl.TriggerMetadata.Repo.Did), 338 tpl.TriggerMetadata.Repo.Repo, 339 ) 340 if err != nil { 341 return fmt.Errorf("failed to get repo: %w", err) 342 } 343 344 pipelineId := models.PipelineId{ 345 Knot: src.Key(), 346 Rkey: msg.Rkey, 347 } 348 349 workflows := make(map[models.Engine][]models.Workflow) 350 351 // Build pipeline environment variables once for all workflows 352 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 353 354 for _, w := range tpl.Workflows { 355 if w != nil { 356 if _, ok := s.engs[w.Engine]; !ok { 357 err = s.db.StatusFailed(models.WorkflowId{ 358 PipelineId: pipelineId, 359 Name: w.Name, 360 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 361 if err != nil { 362 return fmt.Errorf("db.StatusFailed: %w", err) 363 } 364 365 continue 366 } 367 368 eng := s.engs[w.Engine] 369 370 if _, ok := workflows[eng]; !ok { 371 workflows[eng] = []models.Workflow{} 372 } 373 374 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 375 if err != nil { 376 return fmt.Errorf("init workflow: %w", err) 377 } 378 379 // inject TANGLED_* env vars after InitWorkflow 380 // This prevents user-defined env vars from overriding them 381 if ewf.Environment == nil { 382 ewf.Environment = make(map[string]string) 383 } 384 maps.Copy(ewf.Environment, pipelineEnv) 385 386 workflows[eng] = append(workflows[eng], *ewf) 387 388 err = s.db.StatusPending(models.WorkflowId{ 389 PipelineId: pipelineId, 390 Name: w.Name, 391 }, s.n) 392 if err != nil { 393 return fmt.Errorf("db.StatusPending: %w", err) 394 } 395 } 396 } 397 398 ok := s.jq.Enqueue(queue.Job{ 399 Run: func() error { 400 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 401 RepoOwner: tpl.TriggerMetadata.Repo.Did, 402 RepoName: tpl.TriggerMetadata.Repo.Repo, 403 Workflows: workflows, 404 }, pipelineId) 405 return nil 406 }, 407 OnFail: func(jobError error) { 408 s.l.Error("pipeline run failed", "error", jobError) 409 }, 410 }) 411 if ok { 412 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 413 } else { 414 s.l.Error("failed to enqueue pipeline: queue is full") 415 } 416 } 417 418 return nil 419}