A vibe coded tangled fork which supports pijul.
at c4ec0546f40f7f9601d35aec71d2881d37ad0011 443 lines 12 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "path/filepath" 12 "sync" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/go-chi/chi/v5" 16 "github.com/hashicorp/go-version" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/eventconsumer" 19 "tangled.org/core/eventconsumer/cursor" 20 "tangled.org/core/idresolver" 21 "tangled.org/core/log" 22 "tangled.org/core/notifier" 23 "tangled.org/core/rbac2" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/db" 26 "tangled.org/core/spindle/engine" 27 "tangled.org/core/spindle/engines/nixery" 28 "tangled.org/core/spindle/git" 29 "tangled.org/core/spindle/models" 30 "tangled.org/core/spindle/queue" 31 "tangled.org/core/spindle/secrets" 32 "tangled.org/core/spindle/xrpc" 33 "tangled.org/core/tap" 34 "tangled.org/core/xrpc/serviceauth" 35) 36 37//go:embed motd 38var defaultMotd []byte 39 40type Spindle struct { 41 tap *tap.Client 42 db *db.DB 43 e *rbac2.Enforcer 44 l *slog.Logger 45 n *notifier.Notifier 46 engs map[string]models.Engine 47 jq *queue.Queue 48 cfg *config.Config 49 ks *eventconsumer.Consumer 50 res *idresolver.Resolver 51 vault secrets.Manager 52 motd []byte 53 motdMu sync.RWMutex 54} 55 56// New creates a new Spindle server with the provided configuration and engines. 57func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 58 logger := log.FromContext(ctx) 59 60 if err := ensureGitVersion(); err != nil { 61 return nil, fmt.Errorf("ensuring git version: %w", err) 62 } 63 64 d, err := db.Make(ctx, cfg.Server.DBPath()) 65 if err != nil { 66 return nil, fmt.Errorf("failed to setup db: %w", err) 67 } 68 69 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 70 if err != nil { 71 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 72 } 73 74 n := notifier.New() 75 76 var vault secrets.Manager 77 switch cfg.Server.Secrets.Provider { 78 case "openbao": 79 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 80 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 81 } 82 vault, err = secrets.NewOpenBaoManager( 83 cfg.Server.Secrets.OpenBao.ProxyAddr, 84 logger, 85 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 86 ) 87 if err != nil { 88 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 89 } 90 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 91 case "sqlite", "": 92 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 93 if err != nil { 94 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 95 } 96 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 97 default: 98 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 99 } 100 101 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 102 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 103 104 tap := tap.NewClient(cfg.Server.TapUrl, "") 105 106 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 107 108 spindle := &Spindle{ 109 tap: &tap, 110 e: e, 111 db: d, 112 l: logger, 113 n: &n, 114 engs: engines, 115 jq: jq, 116 cfg: cfg, 117 res: resolver, 118 vault: vault, 119 motd: defaultMotd, 120 } 121 122 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 123 if err != nil { 124 return nil, err 125 } 126 logger.Info("owner set", "did", cfg.Server.Owner) 127 128 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 129 if err != nil { 130 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 131 } 132 133 // for each incoming sh.tangled.pipeline, we execute 134 // spindle.processPipeline, which in turn enqueues the pipeline 135 // job in the above registered queue. 136 ccfg := eventconsumer.NewConsumerConfig() 137 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 138 ccfg.Dev = cfg.Server.Dev 139 ccfg.ProcessFunc = spindle.processPipeline 140 ccfg.CursorStore = cursorStore 141 knownKnots, err := d.Knots() 142 if err != nil { 143 return nil, err 144 } 145 for _, knot := range knownKnots { 146 logger.Info("adding source start", "knot", knot) 147 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 148 } 149 spindle.ks = eventconsumer.NewConsumer(*ccfg) 150 151 return spindle, nil 152} 153 154// DB returns the database instance. 155func (s *Spindle) DB() *db.DB { 156 return s.db 157} 158 159// Queue returns the job queue instance. 160func (s *Spindle) Queue() *queue.Queue { 161 return s.jq 162} 163 164// Engines returns the map of available engines. 165func (s *Spindle) Engines() map[string]models.Engine { 166 return s.engs 167} 168 169// Vault returns the secrets manager instance. 170func (s *Spindle) Vault() secrets.Manager { 171 return s.vault 172} 173 174// Notifier returns the notifier instance. 175func (s *Spindle) Notifier() *notifier.Notifier { 176 return s.n 177} 178 179// Enforcer returns the RBAC enforcer instance. 180func (s *Spindle) Enforcer() *rbac2.Enforcer { 181 return s.e 182} 183 184// SetMotdContent sets custom MOTD content, replacing the embedded default. 185func (s *Spindle) SetMotdContent(content []byte) { 186 s.motdMu.Lock() 187 defer s.motdMu.Unlock() 188 s.motd = content 189} 190 191// GetMotdContent returns the current MOTD content. 192func (s *Spindle) GetMotdContent() []byte { 193 s.motdMu.RLock() 194 defer s.motdMu.RUnlock() 195 return s.motd 196} 197 198// Start starts the Spindle server (blocking). 199func (s *Spindle) Start(ctx context.Context) error { 200 // starts a job queue runner in the background 201 s.jq.Start() 202 defer s.jq.Stop() 203 204 // Stop vault token renewal if it implements Stopper 205 if stopper, ok := s.vault.(secrets.Stopper); ok { 206 defer stopper.Stop() 207 } 208 209 go func() { 210 s.l.Info("starting knot event consumer") 211 s.ks.Start(ctx) 212 }() 213 214 // ensure server owner is tracked 215 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 216 return err 217 } 218 219 go func() { 220 s.l.Info("starting tap stream consumer") 221 s.tap.Connect(ctx, &tap.SimpleIndexer{ 222 EventHandler: s.processEvent, 223 }) 224 }() 225 226 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 227 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 228} 229 230func Run(ctx context.Context) error { 231 cfg, err := config.Load(ctx) 232 if err != nil { 233 return fmt.Errorf("failed to load config: %w", err) 234 } 235 236 nixeryEng, err := nixery.New(ctx, cfg) 237 if err != nil { 238 return err 239 } 240 241 s, err := New(ctx, cfg, map[string]models.Engine{ 242 "nixery": nixeryEng, 243 }) 244 if err != nil { 245 return err 246 } 247 248 return s.Start(ctx) 249} 250 251func (s *Spindle) Router() http.Handler { 252 mux := chi.NewRouter() 253 254 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 255 w.Write(s.GetMotdContent()) 256 }) 257 mux.HandleFunc("/events", s.Events) 258 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 259 260 mux.Mount("/xrpc", s.XrpcRouter()) 261 return mux 262} 263 264func (s *Spindle) XrpcRouter() http.Handler { 265 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 266 267 l := log.SubLogger(s.l, "xrpc") 268 269 x := xrpc.Xrpc{ 270 Logger: l, 271 Db: s.db, 272 Enforcer: s.e, 273 Engines: s.engs, 274 Config: s.cfg, 275 Resolver: s.res, 276 Vault: s.vault, 277 Notifier: s.Notifier(), 278 ServiceAuth: serviceAuth, 279 } 280 281 return x.Router() 282} 283 284func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 285 l := log.FromContext(ctx).With("handler", "processKnotStream") 286 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 287 if msg.Nsid == tangled.PipelineNSID { 288 return nil 289 tpl := tangled.Pipeline{} 290 err := json.Unmarshal(msg.EventJson, &tpl) 291 if err != nil { 292 fmt.Println("error unmarshalling", err) 293 return err 294 } 295 296 if tpl.TriggerMetadata == nil { 297 return fmt.Errorf("no trigger metadata found") 298 } 299 300 if tpl.TriggerMetadata.Repo == nil { 301 return fmt.Errorf("no repo data found") 302 } 303 304 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 305 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 306 } 307 308 // filter by repos 309 _, err = s.db.GetRepoWithName( 310 syntax.DID(tpl.TriggerMetadata.Repo.Did), 311 tpl.TriggerMetadata.Repo.Repo, 312 ) 313 if err != nil { 314 return fmt.Errorf("failed to get repo: %w", err) 315 } 316 317 pipelineId := models.PipelineId{ 318 Knot: src.Key(), 319 Rkey: msg.Rkey, 320 } 321 322 workflows := make(map[models.Engine][]models.Workflow) 323 324 // Build pipeline environment variables once for all workflows 325 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 326 327 for _, w := range tpl.Workflows { 328 if w != nil { 329 if _, ok := s.engs[w.Engine]; !ok { 330 err = s.db.StatusFailed(models.WorkflowId{ 331 PipelineId: pipelineId, 332 Name: w.Name, 333 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 334 if err != nil { 335 return fmt.Errorf("db.StatusFailed: %w", err) 336 } 337 338 continue 339 } 340 341 eng := s.engs[w.Engine] 342 343 if _, ok := workflows[eng]; !ok { 344 workflows[eng] = []models.Workflow{} 345 } 346 347 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 348 if err != nil { 349 return fmt.Errorf("init workflow: %w", err) 350 } 351 352 // inject TANGLED_* env vars after InitWorkflow 353 // This prevents user-defined env vars from overriding them 354 if ewf.Environment == nil { 355 ewf.Environment = make(map[string]string) 356 } 357 maps.Copy(ewf.Environment, pipelineEnv) 358 359 workflows[eng] = append(workflows[eng], *ewf) 360 361 err = s.db.StatusPending(models.WorkflowId{ 362 PipelineId: pipelineId, 363 Name: w.Name, 364 }, s.n) 365 if err != nil { 366 return fmt.Errorf("db.StatusPending: %w", err) 367 } 368 } 369 } 370 371 ok := s.jq.Enqueue(queue.Job{ 372 Run: func() error { 373 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 374 RepoOwner: tpl.TriggerMetadata.Repo.Did, 375 RepoName: tpl.TriggerMetadata.Repo.Repo, 376 Workflows: workflows, 377 }, pipelineId) 378 return nil 379 }, 380 OnFail: func(jobError error) { 381 s.l.Error("pipeline run failed", "error", jobError) 382 }, 383 }) 384 if ok { 385 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 386 } else { 387 s.l.Error("failed to enqueue pipeline: queue is full") 388 } 389 } else if msg.Nsid == tangled.GitRefUpdateNSID { 390 event := tangled.GitRefUpdate{} 391 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 392 l.Error("error unmarshalling", "err", err) 393 return err 394 } 395 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 396 397 // resolve repo name to rkey 398 // TODO: git.refUpdate should respond with rkey instead of repo name 399 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 400 if err != nil { 401 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 402 } 403 404 // NOTE: we are blindly trusting the knot that it will return only repos it own 405 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 406 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 407 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 408 return fmt.Errorf("sync git repo: %w", err) 409 } 410 l.Info("synced git repo") 411 412 // TODO: plan the pipeline 413 } 414 415 return nil 416} 417 418// newRepoPath creates a path to store repository by its did and rkey. 419// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 420func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 421 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 422} 423 424func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 425 scheme := "https://" 426 if s.cfg.Server.Dev { 427 scheme = "http://" 428 } 429 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 430} 431 432const RequiredVersion = "2.49.0" 433 434func ensureGitVersion() error { 435 v, err := git.Version() 436 if err != nil { 437 return fmt.Errorf("fetching git version: %w", err) 438 } 439 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 440 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 441 } 442 return nil 443}