A vibe coded tangled fork which supports pijul.
at 7f5a229aff5d673f9b046a235cacd2b489fad58a 478 lines 13 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "path/filepath" 12 "sync" 13 "time" 14 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 "github.com/bluesky-social/indigo/service/tap" 17 "github.com/go-chi/chi/v5" 18 "github.com/hashicorp/go-version" 19 "tangled.org/core/api/tangled" 20 "tangled.org/core/eventconsumer" 21 "tangled.org/core/eventconsumer/cursor" 22 "tangled.org/core/idresolver" 23 "tangled.org/core/log" 24 "tangled.org/core/notifier" 25 "tangled.org/core/rbac2" 26 "tangled.org/core/spindle/config" 27 "tangled.org/core/spindle/db" 28 "tangled.org/core/spindle/engine" 29 "tangled.org/core/spindle/engines/nixery" 30 "tangled.org/core/spindle/git" 31 "tangled.org/core/spindle/models" 32 "tangled.org/core/spindle/queue" 33 "tangled.org/core/spindle/secrets" 34 "tangled.org/core/spindle/xrpc" 35 "tangled.org/core/tapc" 36 "tangled.org/core/xrpc/serviceauth" 37) 38 39//go:embed motd 40var defaultMotd []byte 41 42type Spindle struct { 43 tap *tapc.Client 44 db *db.DB 45 e *rbac2.Enforcer 46 l *slog.Logger 47 n *notifier.Notifier 48 engs map[string]models.Engine 49 jq *queue.Queue 50 cfg *config.Config 51 ks *eventconsumer.Consumer 52 res *idresolver.Resolver 53 vault secrets.Manager 54 motd []byte 55 motdMu sync.RWMutex 56} 57 58// New creates a new Spindle server with the provided configuration and engines. 59func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 60 logger := log.FromContext(ctx) 61 62 if err := ensureGitVersion(); err != nil { 63 return nil, fmt.Errorf("ensuring git version: %w", err) 64 } 65 66 d, err := db.Make(ctx, cfg.Server.DBPath()) 67 if err != nil { 68 return nil, fmt.Errorf("failed to setup db: %w", err) 69 } 70 71 e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 72 if err != nil { 73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 74 } 75 76 n := notifier.New() 77 78 var vault secrets.Manager 79 switch cfg.Server.Secrets.Provider { 80 case "openbao": 81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 83 } 84 vault, err = secrets.NewOpenBaoManager( 85 cfg.Server.Secrets.OpenBao.ProxyAddr, 86 logger, 87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 88 ) 89 if err != nil { 90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 91 } 92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 93 case "sqlite", "": 94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 95 if err != nil { 96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 97 } 98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 99 default: 100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 101 } 102 103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 105 106 tap := tapc.NewClient("http://localhost:"+cfg.Server.TapPort, "") 107 108 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 109 110 spindle := &Spindle{ 111 tap: &tap, 112 e: e, 113 db: d, 114 l: logger, 115 n: &n, 116 engs: engines, 117 jq: jq, 118 cfg: cfg, 119 res: resolver, 120 vault: vault, 121 motd: defaultMotd, 122 } 123 124 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did()) 125 if err != nil { 126 return nil, err 127 } 128 logger.Info("owner set", "did", cfg.Server.Owner) 129 130 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 131 if err != nil { 132 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 133 } 134 135 // for each incoming sh.tangled.pipeline, we execute 136 // spindle.processPipeline, which in turn enqueues the pipeline 137 // job in the above registered queue. 138 ccfg := eventconsumer.NewConsumerConfig() 139 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 140 ccfg.Dev = cfg.Server.Dev 141 ccfg.ProcessFunc = spindle.processPipeline 142 ccfg.CursorStore = cursorStore 143 knownKnots, err := d.Knots() 144 if err != nil { 145 return nil, err 146 } 147 for _, knot := range knownKnots { 148 logger.Info("adding source start", "knot", knot) 149 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 150 } 151 spindle.ks = eventconsumer.NewConsumer(*ccfg) 152 153 return spindle, nil 154} 155 156// DB returns the database instance. 157func (s *Spindle) DB() *db.DB { 158 return s.db 159} 160 161// Queue returns the job queue instance. 162func (s *Spindle) Queue() *queue.Queue { 163 return s.jq 164} 165 166// Engines returns the map of available engines. 167func (s *Spindle) Engines() map[string]models.Engine { 168 return s.engs 169} 170 171// Vault returns the secrets manager instance. 172func (s *Spindle) Vault() secrets.Manager { 173 return s.vault 174} 175 176// Notifier returns the notifier instance. 177func (s *Spindle) Notifier() *notifier.Notifier { 178 return s.n 179} 180 181// Enforcer returns the RBAC enforcer instance. 182func (s *Spindle) Enforcer() *rbac2.Enforcer { 183 return s.e 184} 185 186// SetMotdContent sets custom MOTD content, replacing the embedded default. 187func (s *Spindle) SetMotdContent(content []byte) { 188 s.motdMu.Lock() 189 defer s.motdMu.Unlock() 190 s.motd = content 191} 192 193// GetMotdContent returns the current MOTD content. 194func (s *Spindle) GetMotdContent() []byte { 195 s.motdMu.RLock() 196 defer s.motdMu.RUnlock() 197 return s.motd 198} 199 200// Start starts the Spindle server (blocking). 201func (s *Spindle) Start(ctx context.Context) error { 202 svcErr := make(chan error, 1) 203 204 // starts a job queue runner in the background 205 s.jq.Start() 206 defer s.jq.Stop() 207 208 // Stop vault token renewal if it implements Stopper 209 if stopper, ok := s.vault.(secrets.Stopper); ok { 210 defer stopper.Stop() 211 } 212 213 go func() { 214 s.l.Info("starting knot event consumer") 215 s.ks.Start(ctx) 216 }() 217 218 tap, err := tap.New(tap.Config{ 219 DatabaseURL: s.cfg.Server.TapDBUrl, 220 DBMaxConns: 32, 221 PLCURL: s.cfg.Server.PlcUrl, 222 RelayUrl: s.cfg.Server.RelayUrl, 223 FirehoseParallelism: 10, 224 ResyncParallelism: 5, 225 OutboxParallelism: 1, 226 FirehoseCursorSaveInterval: 1 * time.Second, 227 RepoFetchTimeout: 300 * time.Second, 228 IdentityCacheSize: 2_000_000, 229 EventCacheSize: 100_000, 230 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.SpindleMemberNSID}, 231 RetryTimeout: 3 * time.Second, 232 }) 233 if err != nil { 234 return err 235 } 236 go func() { 237 if err := tap.RunTap(ctx, ":"+s.cfg.Server.TapPort); err != nil { 238 svcErr <- err 239 } 240 }() 241 go func() { 242 s.l.Info("starting embedded tap server") 243 244 s.l.Info("starting tap stream consumer") 245 s.tap.Connect(ctx, &tapc.SimpleIndexer{ 246 EventHandler: s.processEvent, 247 }) 248 }() 249 250 s.l.Debug("waiting until tap connection") 251 if err := s.tap.Wait(ctx); err != nil { 252 return err 253 } 254 255 // ensure server owner is tracked 256 s.l.Debug("adding server owner to tap") 257 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil { 258 return err 259 } 260 261 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 262 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 263} 264 265func Run(ctx context.Context) error { 266 cfg, err := config.Load(ctx) 267 if err != nil { 268 return fmt.Errorf("failed to load config: %w", err) 269 } 270 271 nixeryEng, err := nixery.New(ctx, cfg) 272 if err != nil { 273 return err 274 } 275 276 s, err := New(ctx, cfg, map[string]models.Engine{ 277 "nixery": nixeryEng, 278 }) 279 if err != nil { 280 return err 281 } 282 283 return s.Start(ctx) 284} 285 286func (s *Spindle) Router() http.Handler { 287 mux := chi.NewRouter() 288 289 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 290 w.Write(s.GetMotdContent()) 291 }) 292 mux.HandleFunc("/events", s.Events) 293 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 294 295 mux.Mount("/xrpc", s.XrpcRouter()) 296 return mux 297} 298 299func (s *Spindle) XrpcRouter() http.Handler { 300 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 301 302 l := log.SubLogger(s.l, "xrpc") 303 304 x := xrpc.Xrpc{ 305 Logger: l, 306 Db: s.db, 307 Enforcer: s.e, 308 Engines: s.engs, 309 Config: s.cfg, 310 Resolver: s.res, 311 Vault: s.vault, 312 Notifier: s.Notifier(), 313 ServiceAuth: serviceAuth, 314 } 315 316 return x.Router() 317} 318 319func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 320 l := log.FromContext(ctx).With("handler", "processKnotStream") 321 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 322 if msg.Nsid == tangled.PipelineNSID { 323 return nil 324 tpl := tangled.Pipeline{} 325 err := json.Unmarshal(msg.EventJson, &tpl) 326 if err != nil { 327 fmt.Println("error unmarshalling", err) 328 return err 329 } 330 331 if tpl.TriggerMetadata == nil { 332 return fmt.Errorf("no trigger metadata found") 333 } 334 335 if tpl.TriggerMetadata.Repo == nil { 336 return fmt.Errorf("no repo data found") 337 } 338 339 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 340 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 341 } 342 343 // filter by repos 344 _, err = s.db.GetRepoWithName( 345 syntax.DID(tpl.TriggerMetadata.Repo.Did), 346 tpl.TriggerMetadata.Repo.Repo, 347 ) 348 if err != nil { 349 return fmt.Errorf("failed to get repo: %w", err) 350 } 351 352 pipelineId := models.PipelineId{ 353 Knot: src.Key(), 354 Rkey: msg.Rkey, 355 } 356 357 workflows := make(map[models.Engine][]models.Workflow) 358 359 // Build pipeline environment variables once for all workflows 360 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev) 361 362 for _, w := range tpl.Workflows { 363 if w != nil { 364 if _, ok := s.engs[w.Engine]; !ok { 365 err = s.db.StatusFailed(models.WorkflowId{ 366 PipelineId: pipelineId, 367 Name: w.Name, 368 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 369 if err != nil { 370 return fmt.Errorf("db.StatusFailed: %w", err) 371 } 372 373 continue 374 } 375 376 eng := s.engs[w.Engine] 377 378 if _, ok := workflows[eng]; !ok { 379 workflows[eng] = []models.Workflow{} 380 } 381 382 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 383 if err != nil { 384 return fmt.Errorf("init workflow: %w", err) 385 } 386 387 // inject TANGLED_* env vars after InitWorkflow 388 // This prevents user-defined env vars from overriding them 389 if ewf.Environment == nil { 390 ewf.Environment = make(map[string]string) 391 } 392 maps.Copy(ewf.Environment, pipelineEnv) 393 394 workflows[eng] = append(workflows[eng], *ewf) 395 396 err = s.db.StatusPending(models.WorkflowId{ 397 PipelineId: pipelineId, 398 Name: w.Name, 399 }, s.n) 400 if err != nil { 401 return fmt.Errorf("db.StatusPending: %w", err) 402 } 403 } 404 } 405 406 ok := s.jq.Enqueue(queue.Job{ 407 Run: func() error { 408 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 409 RepoOwner: tpl.TriggerMetadata.Repo.Did, 410 RepoName: tpl.TriggerMetadata.Repo.Repo, 411 Workflows: workflows, 412 }, pipelineId) 413 return nil 414 }, 415 OnFail: func(jobError error) { 416 s.l.Error("pipeline run failed", "error", jobError) 417 }, 418 }) 419 if ok { 420 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 421 } else { 422 s.l.Error("failed to enqueue pipeline: queue is full") 423 } 424 } else if msg.Nsid == tangled.GitRefUpdateNSID { 425 event := tangled.GitRefUpdate{} 426 if err := json.Unmarshal(msg.EventJson, &event); err != nil { 427 l.Error("error unmarshalling", "err", err) 428 return err 429 } 430 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 431 432 // resolve repo name to rkey 433 // TODO: git.refUpdate should respond with rkey instead of repo name 434 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 435 if err != nil { 436 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 437 } 438 439 // NOTE: we are blindly trusting the knot that it will return only repos it own 440 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 441 repoPath := s.newRepoPath(repo.Did, repo.Rkey) 442 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 443 return fmt.Errorf("sync git repo: %w", err) 444 } 445 l.Info("synced git repo") 446 447 // TODO: plan the pipeline 448 } 449 450 return nil 451} 452 453// newRepoPath creates a path to store repository by its did and rkey. 454// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 455func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 456 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 457} 458 459func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 460 scheme := "https://" 461 if s.cfg.Server.Dev { 462 scheme = "http://" 463 } 464 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 465} 466 467const RequiredVersion = "2.49.0" 468 469func ensureGitVersion() error { 470 v, err := git.Version() 471 if err != nil { 472 return fmt.Errorf("fetching git version: %w", err) 473 } 474 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 475 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 476 } 477 return nil 478}