A vibe coded tangled fork which supports pijul.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 "github.com/go-chi/chi/v5"
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/eventconsumer"
17 "tangled.org/core/eventconsumer/cursor"
18 "tangled.org/core/idresolver"
19 "tangled.org/core/log"
20 "tangled.org/core/notifier"
21 "tangled.org/core/rbac2"
22 "tangled.org/core/spindle/config"
23 "tangled.org/core/spindle/db"
24 "tangled.org/core/spindle/engine"
25 "tangled.org/core/spindle/engines/nixery"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/queue"
28 "tangled.org/core/spindle/secrets"
29 "tangled.org/core/spindle/xrpc"
30 "tangled.org/core/tap"
31 "tangled.org/core/xrpc/serviceauth"
32)
33
34//go:embed motd
35var defaultMotd []byte
36
37type Spindle struct {
38 tap *tap.Client
39 db *db.DB
40 e *rbac2.Enforcer
41 l *slog.Logger
42 n *notifier.Notifier
43 engs map[string]models.Engine
44 jq *queue.Queue
45 cfg *config.Config
46 ks *eventconsumer.Consumer
47 res *idresolver.Resolver
48 vault secrets.Manager
49 motd []byte
50 motdMu sync.RWMutex
51}
52
53// New creates a new Spindle server with the provided configuration and engines.
54func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
55 logger := log.FromContext(ctx)
56
57 d, err := db.Make(ctx, cfg.Server.DBPath)
58 if err != nil {
59 return nil, fmt.Errorf("failed to setup db: %w", err)
60 }
61
62 e, err := rbac2.NewEnforcer(cfg.Server.DBPath)
63 if err != nil {
64 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
65 }
66
67 n := notifier.New()
68
69 var vault secrets.Manager
70 switch cfg.Server.Secrets.Provider {
71 case "openbao":
72 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
73 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
74 }
75 vault, err = secrets.NewOpenBaoManager(
76 cfg.Server.Secrets.OpenBao.ProxyAddr,
77 logger,
78 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
79 )
80 if err != nil {
81 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
82 }
83 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
84 case "sqlite", "":
85 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
86 if err != nil {
87 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
88 }
89 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
90 default:
91 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
92 }
93
94 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
95 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
96
97 tap := tap.NewClient(cfg.Server.TapUrl, "")
98
99 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
100
101 spindle := &Spindle{
102 tap: &tap,
103 e: e,
104 db: d,
105 l: logger,
106 n: &n,
107 engs: engines,
108 jq: jq,
109 cfg: cfg,
110 res: resolver,
111 vault: vault,
112 motd: defaultMotd,
113 }
114
115 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
116 if err != nil {
117 return nil, err
118 }
119 logger.Info("owner set", "did", cfg.Server.Owner)
120
121 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
122 if err != nil {
123 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
124 }
125
126 // for each incoming sh.tangled.pipeline, we execute
127 // spindle.processPipeline, which in turn enqueues the pipeline
128 // job in the above registered queue.
129 ccfg := eventconsumer.NewConsumerConfig()
130 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
131 ccfg.Dev = cfg.Server.Dev
132 ccfg.ProcessFunc = spindle.processPipeline
133 ccfg.CursorStore = cursorStore
134 knownKnots, err := d.Knots()
135 if err != nil {
136 return nil, err
137 }
138 for _, knot := range knownKnots {
139 logger.Info("adding source start", "knot", knot)
140 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
141 }
142 spindle.ks = eventconsumer.NewConsumer(*ccfg)
143
144 return spindle, nil
145}
146
147// DB returns the database instance.
148func (s *Spindle) DB() *db.DB {
149 return s.db
150}
151
152// Queue returns the job queue instance.
153func (s *Spindle) Queue() *queue.Queue {
154 return s.jq
155}
156
157// Engines returns the map of available engines.
158func (s *Spindle) Engines() map[string]models.Engine {
159 return s.engs
160}
161
162// Vault returns the secrets manager instance.
163func (s *Spindle) Vault() secrets.Manager {
164 return s.vault
165}
166
167// Notifier returns the notifier instance.
168func (s *Spindle) Notifier() *notifier.Notifier {
169 return s.n
170}
171
172// Enforcer returns the RBAC enforcer instance.
173func (s *Spindle) Enforcer() *rbac2.Enforcer {
174 return s.e
175}
176
177// SetMotdContent sets custom MOTD content, replacing the embedded default.
178func (s *Spindle) SetMotdContent(content []byte) {
179 s.motdMu.Lock()
180 defer s.motdMu.Unlock()
181 s.motd = content
182}
183
184// GetMotdContent returns the current MOTD content.
185func (s *Spindle) GetMotdContent() []byte {
186 s.motdMu.RLock()
187 defer s.motdMu.RUnlock()
188 return s.motd
189}
190
191// Start starts the Spindle server (blocking).
192func (s *Spindle) Start(ctx context.Context) error {
193 // starts a job queue runner in the background
194 s.jq.Start()
195 defer s.jq.Stop()
196
197 // Stop vault token renewal if it implements Stopper
198 if stopper, ok := s.vault.(secrets.Stopper); ok {
199 defer stopper.Stop()
200 }
201
202 go func() {
203 s.l.Info("starting knot event consumer")
204 s.ks.Start(ctx)
205 }()
206
207 // ensure server owner is tracked
208 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
209 return err
210 }
211
212 go func() {
213 s.l.Info("starting tap stream consumer")
214 s.tap.Connect(ctx, &tap.SimpleIndexer{
215 EventHandler: s.processEvent,
216 })
217 }()
218
219 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
220 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
221}
222
223func Run(ctx context.Context) error {
224 cfg, err := config.Load(ctx)
225 if err != nil {
226 return fmt.Errorf("failed to load config: %w", err)
227 }
228
229 nixeryEng, err := nixery.New(ctx, cfg)
230 if err != nil {
231 return err
232 }
233
234 s, err := New(ctx, cfg, map[string]models.Engine{
235 "nixery": nixeryEng,
236 })
237 if err != nil {
238 return err
239 }
240
241 return s.Start(ctx)
242}
243
244func (s *Spindle) Router() http.Handler {
245 mux := chi.NewRouter()
246
247 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
248 w.Write(s.GetMotdContent())
249 })
250 mux.HandleFunc("/events", s.Events)
251 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
252
253 mux.Mount("/xrpc", s.XrpcRouter())
254 return mux
255}
256
257func (s *Spindle) XrpcRouter() http.Handler {
258 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
259
260 l := log.SubLogger(s.l, "xrpc")
261
262 x := xrpc.Xrpc{
263 Logger: l,
264 Db: s.db,
265 Enforcer: s.e,
266 Engines: s.engs,
267 Config: s.cfg,
268 Resolver: s.res,
269 Vault: s.vault,
270 Notifier: s.Notifier(),
271 ServiceAuth: serviceAuth,
272 }
273
274 return x.Router()
275}
276
277func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
278 if msg.Nsid == tangled.PipelineNSID {
279 tpl := tangled.Pipeline{}
280 err := json.Unmarshal(msg.EventJson, &tpl)
281 if err != nil {
282 fmt.Println("error unmarshalling", err)
283 return err
284 }
285
286 if tpl.TriggerMetadata == nil {
287 return fmt.Errorf("no trigger metadata found")
288 }
289
290 if tpl.TriggerMetadata.Repo == nil {
291 return fmt.Errorf("no repo data found")
292 }
293
294 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
295 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
296 }
297
298 // filter by repos
299 _, err = s.db.GetRepoWithName(
300 syntax.DID(tpl.TriggerMetadata.Repo.Did),
301 tpl.TriggerMetadata.Repo.Repo,
302 )
303 if err != nil {
304 return fmt.Errorf("failed to get repo: %w", err)
305 }
306
307 pipelineId := models.PipelineId{
308 Knot: src.Key(),
309 Rkey: msg.Rkey,
310 }
311
312 workflows := make(map[models.Engine][]models.Workflow)
313
314 // Build pipeline environment variables once for all workflows
315 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
316
317 for _, w := range tpl.Workflows {
318 if w != nil {
319 if _, ok := s.engs[w.Engine]; !ok {
320 err = s.db.StatusFailed(models.WorkflowId{
321 PipelineId: pipelineId,
322 Name: w.Name,
323 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
324 if err != nil {
325 return fmt.Errorf("db.StatusFailed: %w", err)
326 }
327
328 continue
329 }
330
331 eng := s.engs[w.Engine]
332
333 if _, ok := workflows[eng]; !ok {
334 workflows[eng] = []models.Workflow{}
335 }
336
337 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
338 if err != nil {
339 return fmt.Errorf("init workflow: %w", err)
340 }
341
342 // inject TANGLED_* env vars after InitWorkflow
343 // This prevents user-defined env vars from overriding them
344 if ewf.Environment == nil {
345 ewf.Environment = make(map[string]string)
346 }
347 maps.Copy(ewf.Environment, pipelineEnv)
348
349 workflows[eng] = append(workflows[eng], *ewf)
350
351 err = s.db.StatusPending(models.WorkflowId{
352 PipelineId: pipelineId,
353 Name: w.Name,
354 }, s.n)
355 if err != nil {
356 return fmt.Errorf("db.StatusPending: %w", err)
357 }
358 }
359 }
360
361 ok := s.jq.Enqueue(queue.Job{
362 Run: func() error {
363 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
364 RepoOwner: tpl.TriggerMetadata.Repo.Did,
365 RepoName: tpl.TriggerMetadata.Repo.Repo,
366 Workflows: workflows,
367 }, pipelineId)
368 return nil
369 },
370 OnFail: func(jobError error) {
371 s.l.Error("pipeline run failed", "error", jobError)
372 },
373 })
374 if ok {
375 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
376 } else {
377 s.l.Error("failed to enqueue pipeline: queue is full")
378 }
379 }
380
381 return nil
382}