A vibe coded tangled fork which supports pijul.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "maps"
11 "net/http"
12 "path/filepath"
13 "sync"
14 "time"
15
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "github.com/bluesky-social/indigo/service/tap"
18 "github.com/go-chi/chi/v5"
19 "github.com/go-git/go-git/v5/plumbing/object"
20 "github.com/hashicorp/go-version"
21 "tangled.org/core/api/tangled"
22 "tangled.org/core/eventconsumer"
23 "tangled.org/core/eventconsumer/cursor"
24 "tangled.org/core/idresolver"
25 kgit "tangled.org/core/knotserver/git"
26 "tangled.org/core/log"
27 "tangled.org/core/notifier"
28 "tangled.org/core/rbac2"
29 "tangled.org/core/spindle/config"
30 "tangled.org/core/spindle/db"
31 "tangled.org/core/spindle/engine"
32 "tangled.org/core/spindle/engines/nixery"
33 "tangled.org/core/spindle/git"
34 "tangled.org/core/spindle/models"
35 "tangled.org/core/spindle/queue"
36 "tangled.org/core/spindle/secrets"
37 "tangled.org/core/spindle/xrpc"
38 "tangled.org/core/tapc"
39 "tangled.org/core/tid"
40 "tangled.org/core/workflow"
41 "tangled.org/core/xrpc/serviceauth"
42)
43
44//go:embed motd
45var defaultMotd []byte
46
47type Spindle struct {
48 tap *tapc.Client
49 db *db.DB
50 e *rbac2.Enforcer
51 l *slog.Logger
52 n *notifier.Notifier
53 engs map[string]models.Engine
54 jq *queue.Queue
55 cfg *config.Config
56 ks *eventconsumer.Consumer
57 res *idresolver.Resolver
58 vault secrets.Manager
59 motd []byte
60 motdMu sync.RWMutex
61}
62
63// New creates a new Spindle server with the provided configuration and engines.
64func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
65 logger := log.FromContext(ctx)
66
67 if err := ensureGitVersion(); err != nil {
68 return nil, fmt.Errorf("ensuring git version: %w", err)
69 }
70
71 d, err := db.Make(ctx, cfg.Server.DBPath())
72 if err != nil {
73 return nil, fmt.Errorf("failed to setup db: %w", err)
74 }
75
76 e, err := rbac2.NewEnforcer(cfg.Server.DBPath())
77 if err != nil {
78 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
79 }
80
81 n := notifier.New()
82
83 var vault secrets.Manager
84 switch cfg.Server.Secrets.Provider {
85 case "openbao":
86 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
87 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
88 }
89 vault, err = secrets.NewOpenBaoManager(
90 cfg.Server.Secrets.OpenBao.ProxyAddr,
91 logger,
92 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
93 )
94 if err != nil {
95 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
96 }
97 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
98 case "sqlite", "":
99 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets"))
100 if err != nil {
101 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
102 }
103 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath())
104 default:
105 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
106 }
107
108 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
109 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
110
111 tap := tapc.NewClient("http://localhost:"+cfg.Server.TapPort, "")
112
113 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
114
115 spindle := &Spindle{
116 tap: &tap,
117 e: e,
118 db: d,
119 l: logger,
120 n: &n,
121 engs: engines,
122 jq: jq,
123 cfg: cfg,
124 res: resolver,
125 vault: vault,
126 motd: defaultMotd,
127 }
128
129 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
130 if err != nil {
131 return nil, err
132 }
133 logger.Info("owner set", "did", cfg.Server.Owner)
134
135 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath())
136 if err != nil {
137 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
138 }
139
140 // spindle listen to knot stream for sh.tangled.git.refUpdate
141 // which will sync the local workflow files in spindle and enqueues the
142 // pipeline job for on-push workflows
143 ccfg := eventconsumer.NewConsumerConfig()
144 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
145 ccfg.Dev = cfg.Server.Dev
146 ccfg.ProcessFunc = spindle.processKnotStream
147 ccfg.CursorStore = cursorStore
148 knownKnots, err := d.Knots()
149 if err != nil {
150 return nil, err
151 }
152 for _, knot := range knownKnots {
153 logger.Info("adding source start", "knot", knot)
154 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
155 }
156 spindle.ks = eventconsumer.NewConsumer(*ccfg)
157
158 return spindle, nil
159}
160
161// DB returns the database instance.
162func (s *Spindle) DB() *db.DB {
163 return s.db
164}
165
166// Queue returns the job queue instance.
167func (s *Spindle) Queue() *queue.Queue {
168 return s.jq
169}
170
171// Engines returns the map of available engines.
172func (s *Spindle) Engines() map[string]models.Engine {
173 return s.engs
174}
175
176// Vault returns the secrets manager instance.
177func (s *Spindle) Vault() secrets.Manager {
178 return s.vault
179}
180
181// Notifier returns the notifier instance.
182func (s *Spindle) Notifier() *notifier.Notifier {
183 return s.n
184}
185
186// Enforcer returns the RBAC enforcer instance.
187func (s *Spindle) Enforcer() *rbac2.Enforcer {
188 return s.e
189}
190
191// SetMotdContent sets custom MOTD content, replacing the embedded default.
192func (s *Spindle) SetMotdContent(content []byte) {
193 s.motdMu.Lock()
194 defer s.motdMu.Unlock()
195 s.motd = content
196}
197
198// GetMotdContent returns the current MOTD content.
199func (s *Spindle) GetMotdContent() []byte {
200 s.motdMu.RLock()
201 defer s.motdMu.RUnlock()
202 return s.motd
203}
204
205// Start starts the Spindle server (blocking).
206func (s *Spindle) Start(ctx context.Context) error {
207 svcErr := make(chan error, 1)
208
209 // starts a job queue runner in the background
210 s.jq.Start()
211 defer s.jq.Stop()
212
213 // Stop vault token renewal if it implements Stopper
214 if stopper, ok := s.vault.(secrets.Stopper); ok {
215 defer stopper.Stop()
216 }
217
218 go func() {
219 s.l.Info("starting knot event consumer")
220 s.ks.Start(ctx)
221 }()
222
223 tap, err := tap.New(tap.Config{
224 DatabaseURL: s.cfg.Server.TapDBUrl,
225 DBMaxConns: 32,
226 PLCURL: s.cfg.Server.PlcUrl,
227 RelayUrl: s.cfg.Server.RelayUrl,
228 FirehoseParallelism: 10,
229 ResyncParallelism: 5,
230 OutboxParallelism: 1,
231 FirehoseCursorSaveInterval: 1 * time.Second,
232 RepoFetchTimeout: 300 * time.Second,
233 IdentityCacheSize: 2_000_000,
234 EventCacheSize: 100_000,
235 SignalCollection: tangled.RepoPullNSID, // HACK: listen for repo.pull from non-tangled users
236 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.RepoPullNSID, tangled.SpindleMemberNSID},
237 RetryTimeout: 3 * time.Second,
238 })
239 if err != nil {
240 return err
241 }
242 go func() {
243 if err := tap.RunTap(ctx, ":"+s.cfg.Server.TapPort); err != nil {
244 svcErr <- err
245 }
246 }()
247 go func() {
248 s.l.Info("starting embedded tap server")
249
250 s.l.Info("starting tap stream consumer")
251 s.tap.Connect(ctx, &tapc.SimpleIndexer{
252 EventHandler: s.processEvent,
253 })
254 }()
255
256 s.l.Debug("waiting for tap readiness")
257 tapReadyCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
258 defer cancel()
259 if err := s.tap.WaitReady(tapReadyCtx); err != nil {
260 return fmt.Errorf("tap not ready: %w", err)
261 }
262
263 // ensure server owner is tracked
264 s.l.Debug("adding server owner to tap")
265 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
266 return err
267 }
268
269 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
270 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
271}
272
273func Run(ctx context.Context) error {
274 cfg, err := config.Load(ctx)
275 if err != nil {
276 return fmt.Errorf("failed to load config: %w", err)
277 }
278
279 nixeryEng, err := nixery.New(ctx, cfg)
280 if err != nil {
281 return err
282 }
283
284 s, err := New(ctx, cfg, map[string]models.Engine{
285 "nixery": nixeryEng,
286 })
287 if err != nil {
288 return err
289 }
290
291 return s.Start(ctx)
292}
293
294func (s *Spindle) Router() http.Handler {
295 mux := chi.NewRouter()
296
297 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
298 w.Write(s.GetMotdContent())
299 })
300 mux.HandleFunc("/events", s.Events)
301 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
302
303 mux.Mount("/xrpc", s.XrpcRouter())
304 return mux
305}
306
307func (s *Spindle) XrpcRouter() http.Handler {
308 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
309
310 l := log.SubLogger(s.l, "xrpc")
311
312 x := xrpc.Xrpc{
313 Logger: l,
314 Db: s.db,
315 Enforcer: s.e,
316 Engines: s.engs,
317 Config: s.cfg,
318 Resolver: s.res,
319 Vault: s.vault,
320 Notifier: s.Notifier(),
321 ServiceAuth: serviceAuth,
322 }
323
324 return x.Router()
325}
326
327func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
328 l := log.FromContext(ctx).With("handler", "processKnotStream")
329 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
330 if msg.Nsid == tangled.PipelineNSID {
331 return nil
332 tpl := tangled.Pipeline{}
333 err := json.Unmarshal(msg.EventJson, &tpl)
334 if err != nil {
335 fmt.Println("error unmarshalling", err)
336 return err
337 }
338
339 if tpl.TriggerMetadata == nil {
340 return fmt.Errorf("no trigger metadata found")
341 }
342
343 if tpl.TriggerMetadata.Repo == nil {
344 return fmt.Errorf("no repo data found")
345 }
346
347 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
348 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
349 }
350
351 // filter by repos
352 _, err = s.db.GetRepoWithName(
353 syntax.DID(tpl.TriggerMetadata.Repo.Did),
354 tpl.TriggerMetadata.Repo.Repo,
355 )
356 if err != nil {
357 return fmt.Errorf("failed to get repo: %w", err)
358 }
359
360 pipelineId := models.PipelineId{
361 Knot: src.Key(),
362 Rkey: msg.Rkey,
363 }
364
365 err = s.processPipeline(ctx, tpl, pipelineId)
366 if err != nil {
367 return err
368 }
369 } else if msg.Nsid == tangled.GitRefUpdateNSID {
370 event := tangled.GitRefUpdate{}
371 if err := json.Unmarshal(msg.EventJson, &event); err != nil {
372 l.Error("error unmarshalling", "err", err)
373 return err
374 }
375 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
376
377 // resolve repo name to rkey
378 // TODO: git.refUpdate should respond with rkey instead of repo name
379 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName)
380 if err != nil {
381 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err)
382 }
383
384 // NOTE: we are blindly trusting the knot that it will return only repos it own
385 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
386 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
387 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
388 return fmt.Errorf("sync git repo: %w", err)
389 }
390 l.Info("synced git repo")
391
392 compiler := workflow.Compiler{
393 Trigger: tangled.Pipeline_TriggerMetadata{
394 Kind: string(workflow.TriggerKindPush),
395 Push: &tangled.Pipeline_PushTriggerData{
396 Ref: event.Ref,
397 OldSha: event.OldSha,
398 NewSha: event.NewSha,
399 },
400 Repo: &tangled.Pipeline_TriggerRepo{
401 Did: repo.Did.String(),
402 Knot: repo.Knot,
403 Repo: repo.Name,
404 },
405 },
406 }
407
408 // load workflow definitions from rev (without spindle context)
409 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha)
410 if err != nil {
411 return fmt.Errorf("loading pipeline: %w", err)
412 }
413 if len(rawPipeline) == 0 {
414 l.Info("no workflow definition find for the repo. skipping the event")
415 return nil
416 }
417 tpl := compiler.Compile(compiler.Parse(rawPipeline))
418 // TODO: pass compile error to workflow log
419 for _, w := range compiler.Diagnostics.Errors {
420 l.Error(w.String())
421 }
422 for _, w := range compiler.Diagnostics.Warnings {
423 l.Warn(w.String())
424 }
425
426 pipelineId := models.PipelineId{
427 Knot: tpl.TriggerMetadata.Repo.Knot,
428 Rkey: tid.TID(),
429 }
430 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil {
431 l.Error("failed to create pipeline event", "err", err)
432 return nil
433 }
434 err = s.processPipeline(ctx, tpl, pipelineId)
435 if err != nil {
436 return err
437 }
438 }
439
440 return nil
441}
442
443func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) {
444 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil {
445 return nil, fmt.Errorf("syncing git repo: %w", err)
446 }
447 gr, err := kgit.Open(repoPath, rev)
448 if err != nil {
449 return nil, fmt.Errorf("opening git repo: %w", err)
450 }
451
452 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
453 if errors.Is(err, object.ErrDirectoryNotFound) {
454 // return empty RawPipeline when directory doesn't exist
455 return nil, nil
456 } else if err != nil {
457 return nil, fmt.Errorf("loading file tree: %w", err)
458 }
459
460 var rawPipeline workflow.RawPipeline
461 for _, e := range workflowDir {
462 if !e.IsFile() {
463 continue
464 }
465
466 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
467 contents, err := gr.RawContent(fpath)
468 if err != nil {
469 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err)
470 }
471
472 rawPipeline = append(rawPipeline, workflow.RawWorkflow{
473 Name: e.Name,
474 Contents: contents,
475 })
476 }
477
478 return rawPipeline, nil
479}
480
481func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error {
482 // Build pipeline environment variables once for all workflows
483 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
484
485 // filter & init workflows
486 workflows := make(map[models.Engine][]models.Workflow)
487 for _, w := range tpl.Workflows {
488 if w == nil {
489 continue
490 }
491 if _, ok := s.engs[w.Engine]; !ok {
492 err := s.db.StatusFailed(models.WorkflowId{
493 PipelineId: pipelineId,
494 Name: w.Name,
495 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
496 if err != nil {
497 return fmt.Errorf("db.StatusFailed: %w", err)
498 }
499
500 continue
501 }
502
503 eng := s.engs[w.Engine]
504
505 if _, ok := workflows[eng]; !ok {
506 workflows[eng] = []models.Workflow{}
507 }
508
509 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
510 if err != nil {
511 return fmt.Errorf("init workflow: %w", err)
512 }
513
514 // inject TANGLED_* env vars after InitWorkflow
515 // This prevents user-defined env vars from overriding them
516 if ewf.Environment == nil {
517 ewf.Environment = make(map[string]string)
518 }
519 maps.Copy(ewf.Environment, pipelineEnv)
520
521 workflows[eng] = append(workflows[eng], *ewf)
522 }
523
524 // enqueue pipeline
525 ok := s.jq.Enqueue(queue.Job{
526 Run: func() error {
527 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
528 RepoOwner: tpl.TriggerMetadata.Repo.Did,
529 RepoName: tpl.TriggerMetadata.Repo.Repo,
530 Workflows: workflows,
531 }, pipelineId)
532 return nil
533 },
534 OnFail: func(jobError error) {
535 s.l.Error("pipeline run failed", "error", jobError)
536 },
537 })
538 if !ok {
539 return fmt.Errorf("failed to enqueue pipeline: queue is full")
540 }
541 s.l.Info("pipeline enqueued successfully", "id", pipelineId)
542
543 // emit StatusPending for all workflows here (after successful enqueue)
544 for _, ewfs := range workflows {
545 for _, ewf := range ewfs {
546 err := s.db.StatusPending(models.WorkflowId{
547 PipelineId: pipelineId,
548 Name: ewf.Name,
549 }, s.n)
550 if err != nil {
551 return fmt.Errorf("db.StatusPending: %w", err)
552 }
553 }
554 }
555 return nil
556}
557
558// newRepoPath creates a path to store repository by its did and rkey.
559// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
560func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string {
561 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String())
562}
563
564func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
565 scheme := "https://"
566 if s.cfg.Server.Dev {
567 scheme = "http://"
568 }
569 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
570}
571
572const RequiredVersion = "2.49.0"
573
574func ensureGitVersion() error {
575 v, err := git.Version()
576 if err != nil {
577 return fmt.Errorf("fetching git version: %w", err)
578 }
579 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
580 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
581 }
582 return nil
583}