A vibe coded tangled fork which supports pijul.
at ae7adc924f0d591866190fe2420aa7f576c0f4c3 393 lines 10 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/go-chi/chi/v5" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/idresolver" 18 "tangled.org/core/jetstream" 19 "tangled.org/core/log" 20 "tangled.org/core/notifier" 21 "tangled.org/core/rbac2" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/engines/nixery" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/queue" 28 "tangled.org/core/spindle/secrets" 29 "tangled.org/core/spindle/xrpc" 30 "tangled.org/core/xrpc/serviceauth" 31) 32 33//go:embed motd 34var defaultMotd []byte 35 36type Spindle struct { 37 jc *jetstream.JetstreamClient 38 db *db.DB 39 e *rbac2.Enforcer 40 l *slog.Logger 41 n *notifier.Notifier 42 engs map[string]models.Engine 43 jq *queue.Queue 44 cfg *config.Config 45 ks *eventconsumer.Consumer 46 res *idresolver.Resolver 47 vault secrets.Manager 48 motd []byte 49 motdMu sync.RWMutex 50} 51 52// New creates a new Spindle server with the provided configuration and engines. 53func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 54 logger := log.FromContext(ctx) 55 56 d, err := db.Make(cfg.Server.DBPath) 57 if err != nil { 58 return nil, fmt.Errorf("failed to setup db: %w", err) 59 } 60 61 e, err := rbac2.NewEnforcer(cfg.Server.DBPath) 62 if err != nil { 63 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 64 } 65 66 n := notifier.New() 67 68 var vault secrets.Manager 69 switch cfg.Server.Secrets.Provider { 70 case "openbao": 71 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 72 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 73 } 74 vault, err = secrets.NewOpenBaoManager( 75 cfg.Server.Secrets.OpenBao.ProxyAddr, 76 logger, 77 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 78 ) 79 if err != nil { 80 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 81 } 82 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 83 case "sqlite", "": 84 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 85 if err != nil { 86 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 87 } 88 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 89 default: 90 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 91 } 92 93 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 94 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 95 96 collections := []string{ 97 tangled.SpindleMemberNSID, 98 tangled.RepoNSID, 99 tangled.RepoCollaboratorNSID, 100 } 101 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 102 if err != nil { 103 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 104 } 105 jc.AddDid(cfg.Server.Owner.String()) 106 107 // Check if the spindle knows about any Dids; 108 dids, err := d.GetAllDids() 109 if err != nil { 110 return nil, fmt.Errorf("failed to get all dids: %w", err) 111 } 112 for _, d := range dids { 113 jc.AddDid(d) 114 } 115 116 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 117 118 spindle := &Spindle{ 119 jc: jc, 120 e: e, 121 db: d, 122 l: logger, 123 n: &n, 124 engs: engines, 125 jq: jq, 126 cfg: cfg, 127 res: resolver, 128 vault: vault, 129 motd: defaultMotd, 130 } 131 132 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 133 if err != nil { 134 return nil, err 135 } 136 logger.Info("owner set", "did", cfg.Server.Owner) 137 138 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 139 if err != nil { 140 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 141 } 142 143 err = jc.StartJetstream(ctx, spindle.ingest()) 144 if err != nil { 145 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 146 } 147 148 // for each incoming sh.tangled.pipeline, we execute 149 // spindle.processPipeline, which in turn enqueues the pipeline 150 // job in the above registered queue. 151 ccfg := eventconsumer.NewConsumerConfig() 152 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 153 ccfg.Dev = cfg.Server.Dev 154 ccfg.ProcessFunc = spindle.processPipeline 155 ccfg.CursorStore = cursorStore 156 knownKnots, err := d.Knots() 157 if err != nil { 158 return nil, err 159 } 160 for _, knot := range knownKnots { 161 logger.Info("adding source start", "knot", knot) 162 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 163 } 164 spindle.ks = eventconsumer.NewConsumer(*ccfg) 165 166 return spindle, nil 167} 168 169// DB returns the database instance. 170func (s *Spindle) DB() *db.DB { 171 return s.db 172} 173 174// Queue returns the job queue instance. 175func (s *Spindle) Queue() *queue.Queue { 176 return s.jq 177} 178 179// Engines returns the map of available engines. 180func (s *Spindle) Engines() map[string]models.Engine { 181 return s.engs 182} 183 184// Vault returns the secrets manager instance. 185func (s *Spindle) Vault() secrets.Manager { 186 return s.vault 187} 188 189// Notifier returns the notifier instance. 190func (s *Spindle) Notifier() *notifier.Notifier { 191 return s.n 192} 193 194// Enforcer returns the RBAC enforcer instance. 195func (s *Spindle) Enforcer() *rbac2.Enforcer { 196 return s.e 197} 198 199// SetMotdContent sets custom MOTD content, replacing the embedded default. 200func (s *Spindle) SetMotdContent(content []byte) { 201 s.motdMu.Lock() 202 defer s.motdMu.Unlock() 203 s.motd = content 204} 205 206// GetMotdContent returns the current MOTD content. 207func (s *Spindle) GetMotdContent() []byte { 208 s.motdMu.RLock() 209 defer s.motdMu.RUnlock() 210 return s.motd 211} 212 213// Start starts the Spindle server (blocking). 214func (s *Spindle) Start(ctx context.Context) error { 215 // starts a job queue runner in the background 216 s.jq.Start() 217 defer s.jq.Stop() 218 219 // Stop vault token renewal if it implements Stopper 220 if stopper, ok := s.vault.(secrets.Stopper); ok { 221 defer stopper.Stop() 222 } 223 224 go func() { 225 s.l.Info("starting knot event consumer") 226 s.ks.Start(ctx) 227 }() 228 229 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 230 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 231} 232 233func Run(ctx context.Context) error { 234 cfg, err := config.Load(ctx) 235 if err != nil { 236 return fmt.Errorf("failed to load config: %w", err) 237 } 238 239 nixeryEng, err := nixery.New(ctx, cfg) 240 if err != nil { 241 return err 242 } 243 244 s, err := New(ctx, cfg, map[string]models.Engine{ 245 "nixery": nixeryEng, 246 }) 247 if err != nil { 248 return err 249 } 250 251 return s.Start(ctx) 252} 253 254func (s *Spindle) Router() http.Handler { 255 mux := chi.NewRouter() 256 257 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 258 w.Write(s.GetMotdContent()) 259 }) 260 mux.HandleFunc("/events", s.Events) 261 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 262 263 mux.Mount("/xrpc", s.XrpcRouter()) 264 return mux 265} 266 267func (s *Spindle) XrpcRouter() http.Handler { 268 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 269 270 l := log.SubLogger(s.l, "xrpc") 271 272 x := xrpc.Xrpc{ 273 Logger: l, 274 Db: s.db, 275 Enforcer: s.e, 276 Engines: s.engs, 277 Config: s.cfg, 278 Resolver: s.res, 279 Vault: s.vault, 280 Notifier: s.Notifier(), 281 ServiceAuth: serviceAuth, 282 } 283 284 return x.Router() 285} 286 287func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 288 if msg.Nsid == tangled.PipelineNSID { 289 tpl := tangled.Pipeline{} 290 err := json.Unmarshal(msg.EventJson, &tpl) 291 if err != nil { 292 fmt.Println("error unmarshalling", err) 293 return err 294 } 295 296 if tpl.TriggerMetadata == nil { 297 return fmt.Errorf("no trigger metadata found") 298 } 299 300 if tpl.TriggerMetadata.Repo == nil { 301 return fmt.Errorf("no repo data found") 302 } 303 304 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 305 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 306 } 307 308 // filter by repos 309 _, err = s.db.GetRepo( 310 tpl.TriggerMetadata.Repo.Knot, 311 tpl.TriggerMetadata.Repo.Did, 312 tpl.TriggerMetadata.Repo.Repo, 313 ) 314 if err != nil { 315 return fmt.Errorf("failed to get repo: %w", err) 316 } 317 318 pipelineId := models.PipelineId{ 319 Knot: src.Key(), 320 Rkey: msg.Rkey, 321 } 322 323 workflows := make(map[models.Engine][]models.Workflow) 324 325 // Build pipeline environment variables once for all workflows 326 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 327 328 for _, w := range tpl.Workflows { 329 if w != nil { 330 if _, ok := s.engs[w.Engine]; !ok { 331 err = s.db.StatusFailed(models.WorkflowId{ 332 PipelineId: pipelineId, 333 Name: w.Name, 334 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 335 if err != nil { 336 return fmt.Errorf("db.StatusFailed: %w", err) 337 } 338 339 continue 340 } 341 342 eng := s.engs[w.Engine] 343 344 if _, ok := workflows[eng]; !ok { 345 workflows[eng] = []models.Workflow{} 346 } 347 348 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 349 if err != nil { 350 return fmt.Errorf("init workflow: %w", err) 351 } 352 353 // inject TANGLED_* env vars after InitWorkflow 354 // This prevents user-defined env vars from overriding them 355 if ewf.Environment == nil { 356 ewf.Environment = make(map[string]string) 357 } 358 maps.Copy(ewf.Environment, pipelineEnv) 359 360 workflows[eng] = append(workflows[eng], *ewf) 361 362 err = s.db.StatusPending(models.WorkflowId{ 363 PipelineId: pipelineId, 364 Name: w.Name, 365 }, s.n) 366 if err != nil { 367 return fmt.Errorf("db.StatusPending: %w", err) 368 } 369 } 370 } 371 372 ok := s.jq.Enqueue(queue.Job{ 373 Run: func() error { 374 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 375 RepoOwner: tpl.TriggerMetadata.Repo.Did, 376 RepoName: tpl.TriggerMetadata.Repo.Repo, 377 Workflows: workflows, 378 }, pipelineId) 379 return nil 380 }, 381 OnFail: func(jobError error) { 382 s.l.Error("pipeline run failed", "error", jobError) 383 }, 384 }) 385 if ok { 386 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 387 } else { 388 s.l.Error("failed to enqueue pipeline: queue is full") 389 } 390 } 391 392 return nil 393}