A vibe coded tangled fork which supports pijul.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log/slog"
10 "maps"
11 "net/http"
12 "path/filepath"
13 "sync"
14
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/go-chi/chi/v5"
17 "github.com/go-git/go-git/v5/plumbing/object"
18 "github.com/hashicorp/go-version"
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/eventconsumer"
21 "tangled.org/core/eventconsumer/cursor"
22 "tangled.org/core/idresolver"
23 kgit "tangled.org/core/knotserver/git"
24 "tangled.org/core/log"
25 "tangled.org/core/notifier"
26 "tangled.org/core/rbac2"
27 "tangled.org/core/spindle/config"
28 "tangled.org/core/spindle/db"
29 "tangled.org/core/spindle/engine"
30 "tangled.org/core/spindle/engines/nixery"
31 "tangled.org/core/spindle/git"
32 "tangled.org/core/spindle/models"
33 "tangled.org/core/spindle/queue"
34 "tangled.org/core/spindle/secrets"
35 "tangled.org/core/spindle/xrpc"
36 "tangled.org/core/tap"
37 "tangled.org/core/tid"
38 "tangled.org/core/workflow"
39 "tangled.org/core/xrpc/serviceauth"
40)
41
42//go:embed motd
43var defaultMotd []byte
44
45type Spindle struct {
46 tap *tap.Client
47 db *db.DB
48 e *rbac2.Enforcer
49 l *slog.Logger
50 n *notifier.Notifier
51 engs map[string]models.Engine
52 jq *queue.Queue
53 cfg *config.Config
54 ks *eventconsumer.Consumer
55 res *idresolver.Resolver
56 vault secrets.Manager
57 motd []byte
58 motdMu sync.RWMutex
59}
60
61// New creates a new Spindle server with the provided configuration and engines.
62func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
63 logger := log.FromContext(ctx)
64
65 if err := ensureGitVersion(); err != nil {
66 return nil, fmt.Errorf("ensuring git version: %w", err)
67 }
68
69 d, err := db.Make(ctx, cfg.Server.DBPath())
70 if err != nil {
71 return nil, fmt.Errorf("failed to setup db: %w", err)
72 }
73
74 e, err := rbac2.NewEnforcer(cfg.Server.DBPath())
75 if err != nil {
76 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
77 }
78
79 n := notifier.New()
80
81 var vault secrets.Manager
82 switch cfg.Server.Secrets.Provider {
83 case "openbao":
84 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
85 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
86 }
87 vault, err = secrets.NewOpenBaoManager(
88 cfg.Server.Secrets.OpenBao.ProxyAddr,
89 logger,
90 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
91 )
92 if err != nil {
93 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
94 }
95 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
96 case "sqlite", "":
97 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets"))
98 if err != nil {
99 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
100 }
101 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath())
102 default:
103 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
104 }
105
106 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
107 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
108
109 tap := tap.NewClient(cfg.Server.TapUrl, "")
110
111 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
112
113 spindle := &Spindle{
114 tap: &tap,
115 e: e,
116 db: d,
117 l: logger,
118 n: &n,
119 engs: engines,
120 jq: jq,
121 cfg: cfg,
122 res: resolver,
123 vault: vault,
124 motd: defaultMotd,
125 }
126
127 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
128 if err != nil {
129 return nil, err
130 }
131 logger.Info("owner set", "did", cfg.Server.Owner)
132
133 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath())
134 if err != nil {
135 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
136 }
137
138 // spindle listen to knot stream for sh.tangled.git.refUpdate
139 // which will sync the local workflow files in spindle and enqueues the
140 // pipeline job for on-push workflows
141 ccfg := eventconsumer.NewConsumerConfig()
142 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
143 ccfg.Dev = cfg.Server.Dev
144 ccfg.ProcessFunc = spindle.processKnotStream
145 ccfg.CursorStore = cursorStore
146 knownKnots, err := d.Knots()
147 if err != nil {
148 return nil, err
149 }
150 for _, knot := range knownKnots {
151 logger.Info("adding source start", "knot", knot)
152 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
153 }
154 spindle.ks = eventconsumer.NewConsumer(*ccfg)
155
156 return spindle, nil
157}
158
159// DB returns the database instance.
160func (s *Spindle) DB() *db.DB {
161 return s.db
162}
163
164// Queue returns the job queue instance.
165func (s *Spindle) Queue() *queue.Queue {
166 return s.jq
167}
168
169// Engines returns the map of available engines.
170func (s *Spindle) Engines() map[string]models.Engine {
171 return s.engs
172}
173
174// Vault returns the secrets manager instance.
175func (s *Spindle) Vault() secrets.Manager {
176 return s.vault
177}
178
179// Notifier returns the notifier instance.
180func (s *Spindle) Notifier() *notifier.Notifier {
181 return s.n
182}
183
184// Enforcer returns the RBAC enforcer instance.
185func (s *Spindle) Enforcer() *rbac2.Enforcer {
186 return s.e
187}
188
189// SetMotdContent sets custom MOTD content, replacing the embedded default.
190func (s *Spindle) SetMotdContent(content []byte) {
191 s.motdMu.Lock()
192 defer s.motdMu.Unlock()
193 s.motd = content
194}
195
196// GetMotdContent returns the current MOTD content.
197func (s *Spindle) GetMotdContent() []byte {
198 s.motdMu.RLock()
199 defer s.motdMu.RUnlock()
200 return s.motd
201}
202
203// Start starts the Spindle server (blocking).
204func (s *Spindle) Start(ctx context.Context) error {
205 // starts a job queue runner in the background
206 s.jq.Start()
207 defer s.jq.Stop()
208
209 // Stop vault token renewal if it implements Stopper
210 if stopper, ok := s.vault.(secrets.Stopper); ok {
211 defer stopper.Stop()
212 }
213
214 go func() {
215 s.l.Info("starting knot event consumer")
216 s.ks.Start(ctx)
217 }()
218
219 // ensure server owner is tracked
220 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
221 return err
222 }
223
224 go func() {
225 s.l.Info("starting tap stream consumer")
226 s.tap.Connect(ctx, &tap.SimpleIndexer{
227 EventHandler: s.processEvent,
228 })
229 }()
230
231 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
232 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
233}
234
235func Run(ctx context.Context) error {
236 cfg, err := config.Load(ctx)
237 if err != nil {
238 return fmt.Errorf("failed to load config: %w", err)
239 }
240
241 nixeryEng, err := nixery.New(ctx, cfg)
242 if err != nil {
243 return err
244 }
245
246 s, err := New(ctx, cfg, map[string]models.Engine{
247 "nixery": nixeryEng,
248 })
249 if err != nil {
250 return err
251 }
252
253 return s.Start(ctx)
254}
255
256func (s *Spindle) Router() http.Handler {
257 mux := chi.NewRouter()
258
259 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
260 w.Write(s.GetMotdContent())
261 })
262 mux.HandleFunc("/events", s.Events)
263 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
264
265 mux.Mount("/xrpc", s.XrpcRouter())
266 return mux
267}
268
269func (s *Spindle) XrpcRouter() http.Handler {
270 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
271
272 l := log.SubLogger(s.l, "xrpc")
273
274 x := xrpc.Xrpc{
275 Logger: l,
276 Db: s.db,
277 Enforcer: s.e,
278 Engines: s.engs,
279 Config: s.cfg,
280 Resolver: s.res,
281 Vault: s.vault,
282 Notifier: s.Notifier(),
283 ServiceAuth: serviceAuth,
284 }
285
286 return x.Router()
287}
288
289func (s *Spindle) processKnotStream(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
290 l := log.FromContext(ctx).With("handler", "processKnotStream")
291 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
292 if msg.Nsid == tangled.PipelineNSID {
293 return nil
294 tpl := tangled.Pipeline{}
295 err := json.Unmarshal(msg.EventJson, &tpl)
296 if err != nil {
297 fmt.Println("error unmarshalling", err)
298 return err
299 }
300
301 if tpl.TriggerMetadata == nil {
302 return fmt.Errorf("no trigger metadata found")
303 }
304
305 if tpl.TriggerMetadata.Repo == nil {
306 return fmt.Errorf("no repo data found")
307 }
308
309 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
310 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
311 }
312
313 // filter by repos
314 _, err = s.db.GetRepoWithName(
315 syntax.DID(tpl.TriggerMetadata.Repo.Did),
316 tpl.TriggerMetadata.Repo.Repo,
317 )
318 if err != nil {
319 return fmt.Errorf("failed to get repo: %w", err)
320 }
321
322 pipelineId := models.PipelineId{
323 Knot: src.Key(),
324 Rkey: msg.Rkey,
325 }
326
327 err = s.processPipeline(ctx, tpl, pipelineId)
328 if err != nil {
329 return err
330 }
331 } else if msg.Nsid == tangled.GitRefUpdateNSID {
332 event := tangled.GitRefUpdate{}
333 if err := json.Unmarshal(msg.EventJson, &event); err != nil {
334 l.Error("error unmarshalling", "err", err)
335 return err
336 }
337 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
338
339 // resolve repo name to rkey
340 // TODO: git.refUpdate should respond with rkey instead of repo name
341 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName)
342 if err != nil {
343 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err)
344 }
345
346 // NOTE: we are blindly trusting the knot that it will return only repos it own
347 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
348 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
349 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
350 return fmt.Errorf("sync git repo: %w", err)
351 }
352 l.Info("synced git repo")
353
354 compiler := workflow.Compiler{
355 Trigger: tangled.Pipeline_TriggerMetadata{
356 Kind: string(workflow.TriggerKindPush),
357 Push: &tangled.Pipeline_PushTriggerData{
358 Ref: event.Ref,
359 OldSha: event.OldSha,
360 NewSha: event.NewSha,
361 },
362 Repo: &tangled.Pipeline_TriggerRepo{
363 Did: repo.Did.String(),
364 Knot: repo.Knot,
365 Repo: repo.Name,
366 },
367 },
368 }
369
370 // load workflow definitions from rev (without spindle context)
371 rawPipeline, err := s.loadPipeline(ctx, repoCloneUri, repoPath, event.NewSha)
372 if err != nil {
373 return fmt.Errorf("loading pipeline: %w", err)
374 }
375 if len(rawPipeline) == 0 {
376 l.Info("no workflow definition find for the repo. skipping the event")
377 return nil
378 }
379 tpl := compiler.Compile(compiler.Parse(rawPipeline))
380 // TODO: pass compile error to workflow log
381 for _, w := range compiler.Diagnostics.Errors {
382 l.Error(w.String())
383 }
384 for _, w := range compiler.Diagnostics.Warnings {
385 l.Warn(w.String())
386 }
387
388 pipelineId := models.PipelineId{
389 Knot: tpl.TriggerMetadata.Repo.Knot,
390 Rkey: tid.TID(),
391 }
392 if err := s.db.CreatePipelineEvent(pipelineId.Rkey, tpl, s.n); err != nil {
393 l.Error("failed to create pipeline event", "err", err)
394 return nil
395 }
396 err = s.processPipeline(ctx, tpl, pipelineId)
397 if err != nil {
398 return err
399 }
400 }
401
402 return nil
403}
404
405func (s *Spindle) loadPipeline(ctx context.Context, repoUri, repoPath, rev string) (workflow.RawPipeline, error) {
406 if err := git.SparseSyncGitRepo(ctx, repoUri, repoPath, rev); err != nil {
407 return nil, fmt.Errorf("syncing git repo: %w", err)
408 }
409 gr, err := kgit.Open(repoPath, rev)
410 if err != nil {
411 return nil, fmt.Errorf("opening git repo: %w", err)
412 }
413
414 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir)
415 if errors.Is(err, object.ErrDirectoryNotFound) {
416 // return empty RawPipeline when directory doesn't exist
417 return nil, nil
418 } else if err != nil {
419 return nil, fmt.Errorf("loading file tree: %w", err)
420 }
421
422 var rawPipeline workflow.RawPipeline
423 for _, e := range workflowDir {
424 if !e.IsFile() {
425 continue
426 }
427
428 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
429 contents, err := gr.RawContent(fpath)
430 if err != nil {
431 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err)
432 }
433
434 rawPipeline = append(rawPipeline, workflow.RawWorkflow{
435 Name: e.Name,
436 Contents: contents,
437 })
438 }
439
440 return rawPipeline, nil
441}
442
443func (s *Spindle) processPipeline(ctx context.Context, tpl tangled.Pipeline, pipelineId models.PipelineId) error {
444 // Build pipeline environment variables once for all workflows
445 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
446
447 // filter & init workflows
448 workflows := make(map[models.Engine][]models.Workflow)
449 for _, w := range tpl.Workflows {
450 if w == nil {
451 continue
452 }
453 if _, ok := s.engs[w.Engine]; !ok {
454 err := s.db.StatusFailed(models.WorkflowId{
455 PipelineId: pipelineId,
456 Name: w.Name,
457 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
458 if err != nil {
459 return fmt.Errorf("db.StatusFailed: %w", err)
460 }
461
462 continue
463 }
464
465 eng := s.engs[w.Engine]
466
467 if _, ok := workflows[eng]; !ok {
468 workflows[eng] = []models.Workflow{}
469 }
470
471 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
472 if err != nil {
473 return fmt.Errorf("init workflow: %w", err)
474 }
475
476 // inject TANGLED_* env vars after InitWorkflow
477 // This prevents user-defined env vars from overriding them
478 if ewf.Environment == nil {
479 ewf.Environment = make(map[string]string)
480 }
481 maps.Copy(ewf.Environment, pipelineEnv)
482
483 workflows[eng] = append(workflows[eng], *ewf)
484 }
485
486 // enqueue pipeline
487 ok := s.jq.Enqueue(queue.Job{
488 Run: func() error {
489 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
490 RepoOwner: tpl.TriggerMetadata.Repo.Did,
491 RepoName: tpl.TriggerMetadata.Repo.Repo,
492 Workflows: workflows,
493 }, pipelineId)
494 return nil
495 },
496 OnFail: func(jobError error) {
497 s.l.Error("pipeline run failed", "error", jobError)
498 },
499 })
500 if !ok {
501 return fmt.Errorf("failed to enqueue pipeline: queue is full")
502 }
503 s.l.Info("pipeline enqueued successfully", "id", pipelineId)
504
505 // emit StatusPending for all workflows here (after successful enqueue)
506 for _, ewfs := range workflows {
507 for _, ewf := range ewfs {
508 err := s.db.StatusPending(models.WorkflowId{
509 PipelineId: pipelineId,
510 Name: ewf.Name,
511 }, s.n)
512 if err != nil {
513 return fmt.Errorf("db.StatusPending: %w", err)
514 }
515 }
516 }
517 return nil
518}
519
520// newRepoPath creates a path to store repository by its did and rkey.
521// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
522func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string {
523 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String())
524}
525
526func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
527 scheme := "https://"
528 if s.cfg.Server.Dev {
529 scheme = "http://"
530 }
531 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
532}
533
534const RequiredVersion = "2.49.0"
535
536func ensureGitVersion() error {
537 v, err := git.Version()
538 if err != nil {
539 return fmt.Errorf("fetching git version: %w", err)
540 }
541 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
542 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
543 }
544 return nil
545}