A vibe coded tangled fork which supports pijul.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "path/filepath"
12 "sync"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/go-chi/chi/v5"
16 "github.com/hashicorp/go-version"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/eventconsumer"
19 "tangled.org/core/eventconsumer/cursor"
20 "tangled.org/core/idresolver"
21 "tangled.org/core/log"
22 "tangled.org/core/notifier"
23 "tangled.org/core/rbac2"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/db"
26 "tangled.org/core/spindle/engine"
27 "tangled.org/core/spindle/engines/nixery"
28 "tangled.org/core/spindle/git"
29 "tangled.org/core/spindle/models"
30 "tangled.org/core/spindle/queue"
31 "tangled.org/core/spindle/secrets"
32 "tangled.org/core/spindle/xrpc"
33 "tangled.org/core/tap"
34 "tangled.org/core/xrpc/serviceauth"
35)
36
37//go:embed motd
38var defaultMotd []byte
39
40type Spindle struct {
41 tap *tap.Client
42 db *db.DB
43 e *rbac2.Enforcer
44 l *slog.Logger
45 n *notifier.Notifier
46 engs map[string]models.Engine
47 jq *queue.Queue
48 cfg *config.Config
49 ks *eventconsumer.Consumer
50 res *idresolver.Resolver
51 vault secrets.Manager
52 motd []byte
53 motdMu sync.RWMutex
54}
55
56// New creates a new Spindle server with the provided configuration and engines.
57func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
58 logger := log.FromContext(ctx)
59
60 if err := ensureGitVersion(); err != nil {
61 return nil, fmt.Errorf("ensuring git version: %w", err)
62 }
63
64 d, err := db.Make(ctx, cfg.Server.DBPath())
65 if err != nil {
66 return nil, fmt.Errorf("failed to setup db: %w", err)
67 }
68
69 e, err := rbac2.NewEnforcer(cfg.Server.DBPath())
70 if err != nil {
71 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
72 }
73
74 n := notifier.New()
75
76 var vault secrets.Manager
77 switch cfg.Server.Secrets.Provider {
78 case "openbao":
79 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
80 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
81 }
82 vault, err = secrets.NewOpenBaoManager(
83 cfg.Server.Secrets.OpenBao.ProxyAddr,
84 logger,
85 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
86 )
87 if err != nil {
88 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
89 }
90 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
91 case "sqlite", "":
92 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets"))
93 if err != nil {
94 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
95 }
96 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath())
97 default:
98 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
99 }
100
101 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
102 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
103
104 tap := tap.NewClient(cfg.Server.TapUrl, "")
105
106 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
107
108 spindle := &Spindle{
109 tap: &tap,
110 e: e,
111 db: d,
112 l: logger,
113 n: &n,
114 engs: engines,
115 jq: jq,
116 cfg: cfg,
117 res: resolver,
118 vault: vault,
119 motd: defaultMotd,
120 }
121
122 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
123 if err != nil {
124 return nil, err
125 }
126 logger.Info("owner set", "did", cfg.Server.Owner)
127
128 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath())
129 if err != nil {
130 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
131 }
132
133 // for each incoming sh.tangled.pipeline, we execute
134 // spindle.processPipeline, which in turn enqueues the pipeline
135 // job in the above registered queue.
136 ccfg := eventconsumer.NewConsumerConfig()
137 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
138 ccfg.Dev = cfg.Server.Dev
139 ccfg.ProcessFunc = spindle.processPipeline
140 ccfg.CursorStore = cursorStore
141 knownKnots, err := d.Knots()
142 if err != nil {
143 return nil, err
144 }
145 for _, knot := range knownKnots {
146 logger.Info("adding source start", "knot", knot)
147 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
148 }
149 spindle.ks = eventconsumer.NewConsumer(*ccfg)
150
151 return spindle, nil
152}
153
154// DB returns the database instance.
155func (s *Spindle) DB() *db.DB {
156 return s.db
157}
158
159// Queue returns the job queue instance.
160func (s *Spindle) Queue() *queue.Queue {
161 return s.jq
162}
163
164// Engines returns the map of available engines.
165func (s *Spindle) Engines() map[string]models.Engine {
166 return s.engs
167}
168
169// Vault returns the secrets manager instance.
170func (s *Spindle) Vault() secrets.Manager {
171 return s.vault
172}
173
174// Notifier returns the notifier instance.
175func (s *Spindle) Notifier() *notifier.Notifier {
176 return s.n
177}
178
179// Enforcer returns the RBAC enforcer instance.
180func (s *Spindle) Enforcer() *rbac2.Enforcer {
181 return s.e
182}
183
184// SetMotdContent sets custom MOTD content, replacing the embedded default.
185func (s *Spindle) SetMotdContent(content []byte) {
186 s.motdMu.Lock()
187 defer s.motdMu.Unlock()
188 s.motd = content
189}
190
191// GetMotdContent returns the current MOTD content.
192func (s *Spindle) GetMotdContent() []byte {
193 s.motdMu.RLock()
194 defer s.motdMu.RUnlock()
195 return s.motd
196}
197
198// Start starts the Spindle server (blocking).
199func (s *Spindle) Start(ctx context.Context) error {
200 // starts a job queue runner in the background
201 s.jq.Start()
202 defer s.jq.Stop()
203
204 // Stop vault token renewal if it implements Stopper
205 if stopper, ok := s.vault.(secrets.Stopper); ok {
206 defer stopper.Stop()
207 }
208
209 go func() {
210 s.l.Info("starting knot event consumer")
211 s.ks.Start(ctx)
212 }()
213
214 // ensure server owner is tracked
215 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
216 return err
217 }
218
219 go func() {
220 s.l.Info("starting tap stream consumer")
221 s.tap.Connect(ctx, &tap.SimpleIndexer{
222 EventHandler: s.processEvent,
223 })
224 }()
225
226 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
227 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
228}
229
230func Run(ctx context.Context) error {
231 cfg, err := config.Load(ctx)
232 if err != nil {
233 return fmt.Errorf("failed to load config: %w", err)
234 }
235
236 nixeryEng, err := nixery.New(ctx, cfg)
237 if err != nil {
238 return err
239 }
240
241 s, err := New(ctx, cfg, map[string]models.Engine{
242 "nixery": nixeryEng,
243 })
244 if err != nil {
245 return err
246 }
247
248 return s.Start(ctx)
249}
250
251func (s *Spindle) Router() http.Handler {
252 mux := chi.NewRouter()
253
254 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
255 w.Write(s.GetMotdContent())
256 })
257 mux.HandleFunc("/events", s.Events)
258 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
259
260 mux.Mount("/xrpc", s.XrpcRouter())
261 return mux
262}
263
264func (s *Spindle) XrpcRouter() http.Handler {
265 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
266
267 l := log.SubLogger(s.l, "xrpc")
268
269 x := xrpc.Xrpc{
270 Logger: l,
271 Db: s.db,
272 Enforcer: s.e,
273 Engines: s.engs,
274 Config: s.cfg,
275 Resolver: s.res,
276 Vault: s.vault,
277 Notifier: s.Notifier(),
278 ServiceAuth: serviceAuth,
279 }
280
281 return x.Router()
282}
283
284func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
285 l := log.FromContext(ctx).With("handler", "processKnotStream")
286 l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey)
287 if msg.Nsid == tangled.PipelineNSID {
288 return nil
289 tpl := tangled.Pipeline{}
290 err := json.Unmarshal(msg.EventJson, &tpl)
291 if err != nil {
292 fmt.Println("error unmarshalling", err)
293 return err
294 }
295
296 if tpl.TriggerMetadata == nil {
297 return fmt.Errorf("no trigger metadata found")
298 }
299
300 if tpl.TriggerMetadata.Repo == nil {
301 return fmt.Errorf("no repo data found")
302 }
303
304 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
305 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
306 }
307
308 // filter by repos
309 _, err = s.db.GetRepoWithName(
310 syntax.DID(tpl.TriggerMetadata.Repo.Did),
311 tpl.TriggerMetadata.Repo.Repo,
312 )
313 if err != nil {
314 return fmt.Errorf("failed to get repo: %w", err)
315 }
316
317 pipelineId := models.PipelineId{
318 Knot: src.Key(),
319 Rkey: msg.Rkey,
320 }
321
322 workflows := make(map[models.Engine][]models.Workflow)
323
324 // Build pipeline environment variables once for all workflows
325 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
326
327 for _, w := range tpl.Workflows {
328 if w != nil {
329 if _, ok := s.engs[w.Engine]; !ok {
330 err = s.db.StatusFailed(models.WorkflowId{
331 PipelineId: pipelineId,
332 Name: w.Name,
333 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
334 if err != nil {
335 return fmt.Errorf("db.StatusFailed: %w", err)
336 }
337
338 continue
339 }
340
341 eng := s.engs[w.Engine]
342
343 if _, ok := workflows[eng]; !ok {
344 workflows[eng] = []models.Workflow{}
345 }
346
347 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
348 if err != nil {
349 return fmt.Errorf("init workflow: %w", err)
350 }
351
352 // inject TANGLED_* env vars after InitWorkflow
353 // This prevents user-defined env vars from overriding them
354 if ewf.Environment == nil {
355 ewf.Environment = make(map[string]string)
356 }
357 maps.Copy(ewf.Environment, pipelineEnv)
358
359 workflows[eng] = append(workflows[eng], *ewf)
360
361 err = s.db.StatusPending(models.WorkflowId{
362 PipelineId: pipelineId,
363 Name: w.Name,
364 }, s.n)
365 if err != nil {
366 return fmt.Errorf("db.StatusPending: %w", err)
367 }
368 }
369 }
370
371 ok := s.jq.Enqueue(queue.Job{
372 Run: func() error {
373 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
374 RepoOwner: tpl.TriggerMetadata.Repo.Did,
375 RepoName: tpl.TriggerMetadata.Repo.Repo,
376 Workflows: workflows,
377 }, pipelineId)
378 return nil
379 },
380 OnFail: func(jobError error) {
381 s.l.Error("pipeline run failed", "error", jobError)
382 },
383 })
384 if ok {
385 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
386 } else {
387 s.l.Error("failed to enqueue pipeline: queue is full")
388 }
389 } else if msg.Nsid == tangled.GitRefUpdateNSID {
390 event := tangled.GitRefUpdate{}
391 if err := json.Unmarshal(msg.EventJson, &event); err != nil {
392 l.Error("error unmarshalling", "err", err)
393 return err
394 }
395 l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName)
396
397 // resolve repo name to rkey
398 // TODO: git.refUpdate should respond with rkey instead of repo name
399 repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName)
400 if err != nil {
401 return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err)
402 }
403
404 // NOTE: we are blindly trusting the knot that it will return only repos it own
405 repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName)
406 repoPath := s.newRepoPath(repo.Did, repo.Rkey)
407 if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil {
408 return fmt.Errorf("sync git repo: %w", err)
409 }
410 l.Info("synced git repo")
411
412 // TODO: plan the pipeline
413 }
414
415 return nil
416}
417
418// newRepoPath creates a path to store repository by its did and rkey.
419// The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey
420func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string {
421 return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String())
422}
423
424func (s *Spindle) newRepoCloneUrl(knot, did, name string) string {
425 scheme := "https://"
426 if s.cfg.Server.Dev {
427 scheme = "http://"
428 }
429 return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name)
430}
431
432const RequiredVersion = "2.49.0"
433
434func ensureGitVersion() error {
435 v, err := git.Version()
436 if err != nil {
437 return fmt.Errorf("fetching git version: %w", err)
438 }
439 if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) {
440 return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion)
441 }
442 return nil
443}