A vibe coded tangled fork which supports pijul.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "path/filepath"
12 "sync"
13 "time"
14
15 "github.com/bluesky-social/indigo/atproto/syntax"
16 "github.com/bluesky-social/indigo/service/tap"
17 "github.com/go-chi/chi/v5"
18 "github.com/hashicorp/go-version"
19 "tangled.org/core/api/tangled"
20 "tangled.org/core/eventconsumer"
21 "tangled.org/core/eventconsumer/cursor"
22 "tangled.org/core/idresolver"
23 "tangled.org/core/log"
24 "tangled.org/core/notifier"
25 "tangled.org/core/rbac2"
26 "tangled.org/core/spindle/config"
27 "tangled.org/core/spindle/db"
28 "tangled.org/core/spindle/engine"
29 "tangled.org/core/spindle/engines/nixery"
30 "tangled.org/core/spindle/git"
31 "tangled.org/core/spindle/models"
32 "tangled.org/core/spindle/queue"
33 "tangled.org/core/spindle/secrets"
34 "tangled.org/core/spindle/xrpc"
35 "tangled.org/core/tapc"
36 "tangled.org/core/xrpc/serviceauth"
37)
38
39//go:embed motd
40var defaultMotd []byte
41
42type Spindle struct {
43 tap *tapc.Client
44 db *db.DB
45 e *rbac2.Enforcer
46 l *slog.Logger
47 n *notifier.Notifier
48 engs map[string]models.Engine
49 jq *queue.Queue
50 cfg *config.Config
51 ks *eventconsumer.Consumer
52 res *idresolver.Resolver
53 vault secrets.Manager
54 motd []byte
55 motdMu sync.RWMutex
56}
57
58// New creates a new Spindle server with the provided configuration and engines.
59func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
60 logger := log.FromContext(ctx)
61
62 if err := ensureGitVersion(); err != nil {
63 return nil, fmt.Errorf("ensuring git version: %w", err)
64 }
65
66 d, err := db.Make(ctx, cfg.Server.DBPath())
67 if err != nil {
68 return nil, fmt.Errorf("failed to setup db: %w", err)
69 }
70
71 e, err := rbac2.NewEnforcer(cfg.Server.DBPath())
72 if err != nil {
73 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
74 }
75
76 n := notifier.New()
77
78 var vault secrets.Manager
79 switch cfg.Server.Secrets.Provider {
80 case "openbao":
81 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
82 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
83 }
84 vault, err = secrets.NewOpenBaoManager(
85 cfg.Server.Secrets.OpenBao.ProxyAddr,
86 logger,
87 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
88 )
89 if err != nil {
90 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
91 }
92 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
93 case "sqlite", "":
94 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets"))
95 if err != nil {
96 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
97 }
98 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath())
99 default:
100 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
101 }
102
103 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
104 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
105
106 tap := tapc.NewClient("http://localhost:"+cfg.Server.TapPort, "")
107
108 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
109
110 spindle := &Spindle{
111 tap: &tap,
112 e: e,
113 db: d,
114 l: logger,
115 n: &n,
116 engs: engines,
117 jq: jq,
118 cfg: cfg,
119 res: resolver,
120 vault: vault,
121 motd: defaultMotd,
122 }
123
124 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
125 if err != nil {
126 return nil, err
127 }
128 logger.Info("owner set", "did", cfg.Server.Owner)
129
130 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath())
131 if err != nil {
132 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
133 }
134
135 // for each incoming sh.tangled.pipeline, we execute
136 // spindle.processPipeline, which in turn enqueues the pipeline
137 // job in the above registered queue.
138 ccfg := eventconsumer.NewConsumerConfig()
139 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
140 ccfg.Dev = cfg.Server.Dev
141 ccfg.ProcessFunc = spindle.processPipeline
142 ccfg.CursorStore = cursorStore
143 knownKnots, err := d.Knots()
144 if err != nil {
145 return nil, err
146 }
147 for _, knot := range knownKnots {
148 logger.Info("adding source start", "knot", knot)
149 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
150 }
151 spindle.ks = eventconsumer.NewConsumer(*ccfg)
152
153 return spindle, nil
154}
155
156// DB returns the database instance.
157func (s *Spindle) DB() *db.DB {
158 return s.db
159}
160
161// Queue returns the job queue instance.
162func (s *Spindle) Queue() *queue.Queue {
163 return s.jq
164}
165
166// Engines returns the map of available engines.
167func (s *Spindle) Engines() map[string]models.Engine {
168 return s.engs
169}
170
171// Vault returns the secrets manager instance.
172func (s *Spindle) Vault() secrets.Manager {
173 return s.vault
174}
175
176// Notifier returns the notifier instance.
177func (s *Spindle) Notifier() *notifier.Notifier {
178 return s.n
179}
180
181// Enforcer returns the RBAC enforcer instance.
182func (s *Spindle) Enforcer() *rbac2.Enforcer {
183 return s.e
184}
185
186// SetMotdContent sets custom MOTD content, replacing the embedded default.
187func (s *Spindle) SetMotdContent(content []byte) {
188 s.motdMu.Lock()
189 defer s.motdMu.Unlock()
190 s.motd = content
191}
192
193// GetMotdContent returns the current MOTD content.
194func (s *Spindle) GetMotdContent() []byte {
195 s.motdMu.RLock()
196 defer s.motdMu.RUnlock()
197 return s.motd
198}
199
200// Start starts the Spindle server (blocking).
201func (s *Spindle) Start(ctx context.Context) error {
202 svcErr := make(chan error, 1)
203
204 // starts a job queue runner in the background
205 s.jq.Start()
206 defer s.jq.Stop()
207
208 // Stop vault token renewal if it implements Stopper
209 if stopper, ok := s.vault.(secrets.Stopper); ok {
210 defer stopper.Stop()
211 }
212
213 go func() {
214 s.l.Info("starting knot event consumer")
215 s.ks.Start(ctx)
216 }()
217
218 tap, err := tap.New(tap.Config{
219 DatabaseURL: s.cfg.Server.TapDBUrl,
220 DBMaxConns: 32,
221 PLCURL: s.cfg.Server.PlcUrl,
222 RelayUrl: s.cfg.Server.RelayUrl,
223 FirehoseParallelism: 10,
224 ResyncParallelism: 5,
225 OutboxParallelism: 1,
226 FirehoseCursorSaveInterval: 1 * time.Second,
227 RepoFetchTimeout: 300 * time.Second,
228 IdentityCacheSize: 2_000_000,
229 EventCacheSize: 100_000,
230 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.SpindleMemberNSID},
231 RetryTimeout: 3 * time.Second,
232 })
233 if err != nil {
234 return err
235 }
236 go func() {
237 if err := tap.RunTap(ctx, ":"+s.cfg.Server.TapPort); err != nil {
238 svcErr <- err
239 }
240 }()
241 go func() {
242 s.l.Info("starting embedded tap server")
243
244 s.l.Info("starting tap stream consumer")
245 s.tap.Connect(ctx, &tapc.SimpleIndexer{
246 EventHandler: s.processEvent,
247 })
248 }()
249
250 s.l.Debug("waiting until tap connection")
251 if err := s.tap.Wait(ctx); err != nil {
252 return err
253 }
254
255 // ensure server owner is tracked
256 s.l.Debug("adding server owner to tap")
257 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
258 return err
259 }
260
261 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
262 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
263}
264
265func Run(ctx context.Context) error {
266 cfg, err := config.Load(ctx)
267 if err != nil {
268 return fmt.Errorf("failed to load config: %w", err)
269 }
270
271 nixeryEng, err := nixery.New(ctx, cfg)
272 if err != nil {
273 return err
274 }
275
276 s, err := New(ctx, cfg, map[string]models.Engine{
277 "nixery": nixeryEng,
278 })
279 if err != nil {
280 return err
281 }
282
283 return s.Start(ctx)
284}
285
286func (s *Spindle) Router() http.Handler {
287 mux := chi.NewRouter()
288
289 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
290 w.Write(s.GetMotdContent())
291 })
292 mux.HandleFunc("/events", s.Events)
293 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
294
295 mux.Mount("/xrpc", s.XrpcRouter())
296 return mux
297}
298
299func (s *Spindle) XrpcRouter() http.Handler {
300 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
301
302 l := log.SubLogger(s.l, "xrpc")
303
304 x := xrpc.Xrpc{
305 Logger: l,
306 Db: s.db,
307 Enforcer: s.e,
308 Engines: s.engs,
309 Config: s.cfg,
310 Resolver: s.res,
311 Vault: s.vault,
312 Notifier: s.Notifier(),
313 ServiceAuth: serviceAuth,
314 }
315
316 return x.Router()
317}
318
319func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
320 l := log.FromContext(ctx).With("handler", "processKnotStream")
321 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
322 if msg.Nsid == tangled.PipelineNSID {
323 return nil
324 tpl := tangled.Pipeline{}
325 err := json.Unmarshal(msg.EventJson, &tpl)
326 if err != nil {
327 fmt.Println("error unmarshalling", err)
328 return err
329 }
330
331 if tpl.TriggerMetadata == nil {
332 return fmt.Errorf("no trigger metadata found")
333 }
334
335 if tpl.TriggerMetadata.Repo == nil {
336 return fmt.Errorf("no repo data found")
337 }
338
339 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
340 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
341 }
342
343 // filter by repos
344 _, err = s.db.GetRepoWithName(
345 syntax.DID(tpl.TriggerMetadata.Repo.Did),
346 tpl.TriggerMetadata.Repo.Repo,
347 )
348 if err != nil {
349 return fmt.Errorf("failed to get repo: %w", err)
350 }
351
352 pipelineId := models.PipelineId{
353 Knot: src.Key(),
354 Rkey: msg.Rkey,
355 }
356
357 workflows := make(map[models.Engine][]models.Workflow)
358
359 // Build pipeline environment variables once for all workflows
360 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
361
362 for _, w := range tpl.Workflows {
363 if w != nil {
364 if _, ok := s.engs[w.Engine]; !ok {
365 err = s.db.StatusFailed(models.WorkflowId{
366 PipelineId: pipelineId,
367 Name: w.Name,
368 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
369 if err != nil {
370 return fmt.Errorf("db.StatusFailed: %w", err)
371 }
372
373 continue
374 }
375
376 eng := s.engs[w.Engine]
377
378 if _, ok := workflows[eng]; !ok {
379 workflows[eng] = []models.Workflow{}
380 }
381
382 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
383 if err != nil {
384 return fmt.Errorf("init workflow: %w", err)
385 }
386
387 // inject TANGLED_* env vars after InitWorkflow
388 // This prevents user-defined env vars from overriding them
389 if ewf.Environment == nil {
390 ewf.Environment = make(map[string]string)
391 }
392 maps.Copy(ewf.Environment, pipelineEnv)
393
394 workflows[eng] = append(workflows[eng], *ewf)
395
396 err = s.db.StatusPending(models.WorkflowId{
397 PipelineId: pipelineId,
398 Name: w.Name,
399 }, s.n)
400 if err != nil {
401 return fmt.Errorf("db.StatusPending: %w", err)
402 }
403 }
404 }
405
406 ok := s.jq.Enqueue(queue.Job{
407 Run: func() error {
408 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
409 RepoOwner: tpl.TriggerMetadata.Repo.Did,
410 RepoName: tpl.TriggerMetadata.Repo.Repo,
411 Workflows: workflows,
412 }, pipelineId)
413 return nil
414 },
415 OnFail: func(jobError error) {
416 s.l.Error("pipeline run failed", "error", jobError)
417 },
418 })
419 if ok {
420 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
421 } else {
422 s.l.Error("failed to enqueue pipeline: queue is full")
423 }
424 } else if msg.Nsid == tangled.GitRefUpdateNSID {
425 event := tangled.GitRefUpdate{}
426 if err := json.Unmarshal(msg.EventJson, &event); err != nil {
427 l.Error("error unmarshalling", "err", err)
428 return err
429 }
430 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
431
432 // resolve repo name to rkey
433 // TODO: git.refUpdate should respond with rkey instead of repo name
434 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName)
435 if err != nil {
436 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err)
437 }
438
439 // NOTE: we are blindly trusting the knot that it will return only repos it own
440 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
441 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
442 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
443 return fmt.Errorf("sync git repo: %w", err)
444 }
445 l.Info("synced git repo")
446
447 // TODO: plan the pipeline
448 }
449
450 return nil
451}
452
453// newRepoPath creates a path to store repository by its did and rkey.
454// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
455func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string {
456 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String())
457}
458
459func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
460 scheme := "https://"
461 if s.cfg.Server.Dev {
462 scheme = "http://"
463 }
464 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
465}
466
467const RequiredVersion = "2.49.0"
468
469func ensureGitVersion() error {
470 v, err := git.Version()
471 if err != nil {
472 return fmt.Errorf("fetching git version: %w", err)
473 }
474 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
475 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
476 }
477 return nil
478}