A vibe coded tangled fork which supports pijul.
at icy/pwvyvo 440 lines 11 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/go-chi/chi/v5" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/idresolver" 18 "tangled.org/core/jetstream" 19 "tangled.org/core/log" 20 "tangled.org/core/notifier" 21 "tangled.org/core/rbac" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/engines/docker" 26 "tangled.org/core/spindle/engines/nixery" 27 "tangled.org/core/spindle/models" 28 "tangled.org/core/spindle/queue" 29 "tangled.org/core/spindle/secrets" 30 "tangled.org/core/spindle/xrpc" 31 "tangled.org/core/xrpc/serviceauth" 32) 33 34//go:embed motd 35var defaultMotd []byte 36 37const ( 38 rbacDomain = "thisserver" 39) 40 41type Spindle struct { 42 jc *jetstream.JetstreamClient 43 db *db.DB 44 e *rbac.Enforcer 45 l *slog.Logger 46 n *notifier.Notifier 47 engs map[string]models.Engine 48 jq *queue.Queue 49 cfg *config.Config 50 ks *eventconsumer.Consumer 51 res *idresolver.Resolver 52 vault secrets.Manager 53 motd []byte 54 motdMu sync.RWMutex 55} 56 57// New creates a new Spindle server with the provided configuration and engines. 58func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 59 logger := log.FromContext(ctx) 60 61 d, err := db.Make(cfg.Server.DBPath) 62 if err != nil { 63 return nil, fmt.Errorf("failed to setup db: %w", err) 64 } 65 66 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 67 if err != nil { 68 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 69 } 70 e.E.EnableAutoSave(true) 71 72 n := notifier.New() 73 74 var vault secrets.Manager 75 switch cfg.Server.Secrets.Provider { 76 case "openbao": 77 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 78 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 79 } 80 vault, err = secrets.NewOpenBaoManager( 81 cfg.Server.Secrets.OpenBao.ProxyAddr, 82 logger, 83 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 84 ) 85 if err != nil { 86 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 87 } 88 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 89 case "sqlite", "": 90 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 91 if err != nil { 92 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 93 } 94 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 95 default: 96 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 97 } 98 99 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 100 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 101 102 collections := []string{ 103 tangled.SpindleMemberNSID, 104 tangled.RepoNSID, 105 tangled.RepoCollaboratorNSID, 106 } 107 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 108 if err != nil { 109 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 110 } 111 jc.AddDid(cfg.Server.Owner) 112 113 // Check if the spindle knows about any Dids; 114 dids, err := d.GetAllDids() 115 if err != nil { 116 return nil, fmt.Errorf("failed to get all dids: %w", err) 117 } 118 for _, d := range dids { 119 jc.AddDid(d) 120 } 121 122 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 123 124 spindle := &Spindle{ 125 jc: jc, 126 e: e, 127 db: d, 128 l: logger, 129 n: &n, 130 engs: engines, 131 jq: jq, 132 cfg: cfg, 133 res: resolver, 134 vault: vault, 135 motd: defaultMotd, 136 } 137 138 err = e.AddSpindle(rbacDomain) 139 if err != nil { 140 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 141 } 142 err = spindle.configureOwner() 143 if err != nil { 144 return nil, err 145 } 146 logger.Info("owner set", "did", cfg.Server.Owner) 147 148 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 149 if err != nil { 150 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 151 } 152 153 err = jc.StartJetstream(ctx, spindle.ingest()) 154 if err != nil { 155 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 156 } 157 158 // for each incoming sh.tangled.pipeline, we execute 159 // spindle.processPipeline, which in turn enqueues the pipeline 160 // job in the above registered queue. 161 ccfg := eventconsumer.NewConsumerConfig() 162 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 163 ccfg.Dev = cfg.Server.Dev 164 ccfg.ProcessFunc = spindle.processPipeline 165 ccfg.CursorStore = cursorStore 166 knownKnots, err := d.Knots() 167 if err != nil { 168 return nil, err 169 } 170 for _, knot := range knownKnots { 171 logger.Info("adding source start", "knot", knot) 172 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 173 } 174 spindle.ks = eventconsumer.NewConsumer(*ccfg) 175 176 return spindle, nil 177} 178 179// DB returns the database instance. 180func (s *Spindle) DB() *db.DB { 181 return s.db 182} 183 184// Queue returns the job queue instance. 185func (s *Spindle) Queue() *queue.Queue { 186 return s.jq 187} 188 189// Engines returns the map of available engines. 190func (s *Spindle) Engines() map[string]models.Engine { 191 return s.engs 192} 193 194// Vault returns the secrets manager instance. 195func (s *Spindle) Vault() secrets.Manager { 196 return s.vault 197} 198 199// Notifier returns the notifier instance. 200func (s *Spindle) Notifier() *notifier.Notifier { 201 return s.n 202} 203 204// Enforcer returns the RBAC enforcer instance. 205func (s *Spindle) Enforcer() *rbac.Enforcer { 206 return s.e 207} 208 209// SetMotdContent sets custom MOTD content, replacing the embedded default. 210func (s *Spindle) SetMotdContent(content []byte) { 211 s.motdMu.Lock() 212 defer s.motdMu.Unlock() 213 s.motd = content 214} 215 216// GetMotdContent returns the current MOTD content. 217func (s *Spindle) GetMotdContent() []byte { 218 s.motdMu.RLock() 219 defer s.motdMu.RUnlock() 220 return s.motd 221} 222 223// Start starts the Spindle server (blocking). 224func (s *Spindle) Start(ctx context.Context) error { 225 // starts a job queue runner in the background 226 s.jq.Start() 227 defer s.jq.Stop() 228 229 // Stop vault token renewal if it implements Stopper 230 if stopper, ok := s.vault.(secrets.Stopper); ok { 231 defer stopper.Stop() 232 } 233 234 go func() { 235 s.l.Info("starting knot event consumer") 236 s.ks.Start(ctx) 237 }() 238 239 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 240 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 241} 242 243func Run(ctx context.Context) error { 244 cfg, err := config.Load(ctx) 245 if err != nil { 246 return fmt.Errorf("failed to load config: %w", err) 247 } 248 249 nixeryEng, err := nixery.New(ctx, cfg) 250 if err != nil { 251 return err 252 } 253 254 dockerEng, err := docker.New(ctx, cfg) 255 if err != nil { 256 return err 257 } 258 259 s, err := New(ctx, cfg, map[string]models.Engine{ 260 "nixery": nixeryEng, 261 "docker": dockerEng, 262 }) 263 if err != nil { 264 return err 265 } 266 267 return s.Start(ctx) 268} 269 270func (s *Spindle) Router() http.Handler { 271 mux := chi.NewRouter() 272 273 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 274 w.Write(s.GetMotdContent()) 275 }) 276 mux.HandleFunc("/events", s.Events) 277 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 278 279 mux.Mount("/xrpc", s.XrpcRouter()) 280 return mux 281} 282 283func (s *Spindle) XrpcRouter() http.Handler { 284 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 285 286 l := log.SubLogger(s.l, "xrpc") 287 288 x := xrpc.Xrpc{ 289 Logger: l, 290 Db: s.db, 291 Enforcer: s.e, 292 Engines: s.engs, 293 Config: s.cfg, 294 Resolver: s.res, 295 Vault: s.vault, 296 ServiceAuth: serviceAuth, 297 } 298 299 return x.Router() 300} 301 302func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 303 if msg.Nsid == tangled.PipelineNSID { 304 tpl := tangled.Pipeline{} 305 err := json.Unmarshal(msg.EventJson, &tpl) 306 if err != nil { 307 fmt.Println("error unmarshalling", err) 308 return err 309 } 310 311 if tpl.TriggerMetadata == nil { 312 return fmt.Errorf("no trigger metadata found") 313 } 314 315 if tpl.TriggerMetadata.Repo == nil { 316 return fmt.Errorf("no repo data found") 317 } 318 319 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 320 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 321 } 322 323 // filter by repos 324 _, err = s.db.GetRepo( 325 tpl.TriggerMetadata.Repo.Knot, 326 tpl.TriggerMetadata.Repo.Did, 327 tpl.TriggerMetadata.Repo.Repo, 328 ) 329 if err != nil { 330 return err 331 } 332 333 pipelineId := models.PipelineId{ 334 Knot: src.Key(), 335 Rkey: msg.Rkey, 336 } 337 338 workflows := make(map[models.Engine][]models.Workflow) 339 340 // Build pipeline environment variables once for all workflows 341 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 342 343 for _, w := range tpl.Workflows { 344 if w != nil { 345 if _, ok := s.engs[w.Engine]; !ok { 346 err = s.db.StatusFailed(models.WorkflowId{ 347 PipelineId: pipelineId, 348 Name: w.Name, 349 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 350 if err != nil { 351 return err 352 } 353 354 continue 355 } 356 357 eng := s.engs[w.Engine] 358 359 if _, ok := workflows[eng]; !ok { 360 workflows[eng] = []models.Workflow{} 361 } 362 363 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 364 if err != nil { 365 return err 366 } 367 368 // inject TANGLED_* env vars after InitWorkflow 369 // This prevents user-defined env vars from overriding them 370 if ewf.Environment == nil { 371 ewf.Environment = make(map[string]string) 372 } 373 maps.Copy(ewf.Environment, pipelineEnv) 374 375 workflows[eng] = append(workflows[eng], *ewf) 376 377 err = s.db.StatusPending(models.WorkflowId{ 378 PipelineId: pipelineId, 379 Name: w.Name, 380 }, s.n) 381 if err != nil { 382 return err 383 } 384 } 385 } 386 387 ok := s.jq.Enqueue(queue.Job{ 388 Run: func() error { 389 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 390 RepoOwner: tpl.TriggerMetadata.Repo.Did, 391 RepoName: tpl.TriggerMetadata.Repo.Repo, 392 Workflows: workflows, 393 }, pipelineId) 394 return nil 395 }, 396 OnFail: func(jobError error) { 397 s.l.Error("pipeline run failed", "error", jobError) 398 }, 399 }) 400 if ok { 401 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 402 } else { 403 s.l.Error("failed to enqueue pipeline: queue is full") 404 } 405 } 406 407 return nil 408} 409 410func (s *Spindle) configureOwner() error { 411 cfgOwner := s.cfg.Server.Owner 412 413 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 414 if err != nil { 415 return err 416 } 417 418 switch len(existing) { 419 case 0: 420 // no owner configured, continue 421 case 1: 422 // find existing owner 423 existingOwner := existing[0] 424 425 // no ownership change, this is okay 426 if existingOwner == s.cfg.Server.Owner { 427 break 428 } 429 430 // remove existing owner 431 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 432 if err != nil { 433 return nil 434 } 435 default: 436 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 437 } 438 439 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 440}