A vibe coded tangled fork which supports pijul.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/go-chi/chi/v5"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/eventconsumer/cursor"
17 "tangled.org/core/idresolver"
18 "tangled.org/core/jetstream"
19 "tangled.org/core/log"
20 "tangled.org/core/notifier"
21 "tangled.org/core/rbac2"
22 "tangled.org/core/spindle/config"
23 "tangled.org/core/spindle/db"
24 "tangled.org/core/spindle/engine"
25 "tangled.org/core/spindle/engines/nixery"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/queue"
28 "tangled.org/core/spindle/secrets"
29 "tangled.org/core/spindle/xrpc"
30 "tangled.org/core/xrpc/serviceauth"
31)
32
33//go:embed motd
34var defaultMotd []byte
35
36type Spindle struct {
37 jc *jetstream.JetstreamClient
38 db *db.DB
39 e *rbac2.Enforcer
40 l *slog.Logger
41 n *notifier.Notifier
42 engs map[string]models.Engine
43 jq *queue.Queue
44 cfg *config.Config
45 ks *eventconsumer.Consumer
46 res *idresolver.Resolver
47 vault secrets.Manager
48 motd []byte
49 motdMu sync.RWMutex
50}
51
52// New creates a new Spindle server with the provided configuration and engines.
53func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
54 logger := log.FromContext(ctx)
55
56 d, err := db.Make(cfg.Server.DBPath)
57 if err != nil {
58 return nil, fmt.Errorf("failed to setup db: %w", err)
59 }
60
61 e, err := rbac2.NewEnforcer(cfg.Server.DBPath)
62 if err != nil {
63 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
64 }
65
66 n := notifier.New()
67
68 var vault secrets.Manager
69 switch cfg.Server.Secrets.Provider {
70 case "openbao":
71 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
72 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
73 }
74 vault, err = secrets.NewOpenBaoManager(
75 cfg.Server.Secrets.OpenBao.ProxyAddr,
76 logger,
77 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
78 )
79 if err != nil {
80 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
81 }
82 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
83 case "sqlite", "":
84 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
85 if err != nil {
86 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
87 }
88 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
89 default:
90 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
91 }
92
93 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
94 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
95
96 collections := []string{
97 tangled.SpindleMemberNSID,
98 tangled.RepoNSID,
99 tangled.RepoCollaboratorNSID,
100 }
101 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
102 if err != nil {
103 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
104 }
105 jc.AddDid(cfg.Server.Owner.String())
106
107 // Check if the spindle knows about any Dids;
108 dids, err := d.GetAllDids()
109 if err != nil {
110 return nil, fmt.Errorf("failed to get all dids: %w", err)
111 }
112 for _, d := range dids {
113 jc.AddDid(d)
114 }
115
116 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
117
118 spindle := &Spindle{
119 jc: jc,
120 e: e,
121 db: d,
122 l: logger,
123 n: &n,
124 engs: engines,
125 jq: jq,
126 cfg: cfg,
127 res: resolver,
128 vault: vault,
129 motd: defaultMotd,
130 }
131
132 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
133 if err != nil {
134 return nil, err
135 }
136 logger.Info("owner set", "did", cfg.Server.Owner)
137
138 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
139 if err != nil {
140 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
141 }
142
143 err = jc.StartJetstream(ctx, spindle.ingest())
144 if err != nil {
145 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
146 }
147
148 // for each incoming sh.tangled.pipeline, we execute
149 // spindle.processPipeline, which in turn enqueues the pipeline
150 // job in the above registered queue.
151 ccfg := eventconsumer.NewConsumerConfig()
152 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
153 ccfg.Dev = cfg.Server.Dev
154 ccfg.ProcessFunc = spindle.processPipeline
155 ccfg.CursorStore = cursorStore
156 knownKnots, err := d.Knots()
157 if err != nil {
158 return nil, err
159 }
160 for _, knot := range knownKnots {
161 logger.Info("adding source start", "knot", knot)
162 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
163 }
164 spindle.ks = eventconsumer.NewConsumer(*ccfg)
165
166 return spindle, nil
167}
168
169// DB returns the database instance.
170func (s *Spindle) DB() *db.DB {
171 return s.db
172}
173
174// Queue returns the job queue instance.
175func (s *Spindle) Queue() *queue.Queue {
176 return s.jq
177}
178
179// Engines returns the map of available engines.
180func (s *Spindle) Engines() map[string]models.Engine {
181 return s.engs
182}
183
184// Vault returns the secrets manager instance.
185func (s *Spindle) Vault() secrets.Manager {
186 return s.vault
187}
188
189// Notifier returns the notifier instance.
190func (s *Spindle) Notifier() *notifier.Notifier {
191 return s.n
192}
193
194// Enforcer returns the RBAC enforcer instance.
195func (s *Spindle) Enforcer() *rbac2.Enforcer {
196 return s.e
197}
198
199// SetMotdContent sets custom MOTD content, replacing the embedded default.
200func (s *Spindle) SetMotdContent(content []byte) {
201 s.motdMu.Lock()
202 defer s.motdMu.Unlock()
203 s.motd = content
204}
205
206// GetMotdContent returns the current MOTD content.
207func (s *Spindle) GetMotdContent() []byte {
208 s.motdMu.RLock()
209 defer s.motdMu.RUnlock()
210 return s.motd
211}
212
213// Start starts the Spindle server (blocking).
214func (s *Spindle) Start(ctx context.Context) error {
215 // starts a job queue runner in the background
216 s.jq.Start()
217 defer s.jq.Stop()
218
219 // Stop vault token renewal if it implements Stopper
220 if stopper, ok := s.vault.(secrets.Stopper); ok {
221 defer stopper.Stop()
222 }
223
224 go func() {
225 s.l.Info("starting knot event consumer")
226 s.ks.Start(ctx)
227 }()
228
229 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
230 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
231}
232
233func Run(ctx context.Context) error {
234 cfg, err := config.Load(ctx)
235 if err != nil {
236 return fmt.Errorf("failed to load config: %w", err)
237 }
238
239 nixeryEng, err := nixery.New(ctx, cfg)
240 if err != nil {
241 return err
242 }
243
244 s, err := New(ctx, cfg, map[string]models.Engine{
245 "nixery": nixeryEng,
246 })
247 if err != nil {
248 return err
249 }
250
251 return s.Start(ctx)
252}
253
254func (s *Spindle) Router() http.Handler {
255 mux := chi.NewRouter()
256
257 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
258 w.Write(s.GetMotdContent())
259 })
260 mux.HandleFunc("/events", s.Events)
261 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
262
263 mux.Mount("/xrpc", s.XrpcRouter())
264 return mux
265}
266
267func (s *Spindle) XrpcRouter() http.Handler {
268 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
269
270 l := log.SubLogger(s.l, "xrpc")
271
272 x := xrpc.Xrpc{
273 Logger: l,
274 Db: s.db,
275 Enforcer: s.e,
276 Engines: s.engs,
277 Config: s.cfg,
278 Resolver: s.res,
279 Vault: s.vault,
280 Notifier: s.Notifier(),
281 ServiceAuth: serviceAuth,
282 }
283
284 return x.Router()
285}
286
287func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
288 if msg.Nsid == tangled.PipelineNSID {
289 tpl := tangled.Pipeline{}
290 err := json.Unmarshal(msg.EventJson, &tpl)
291 if err != nil {
292 fmt.Println("error unmarshalling", err)
293 return err
294 }
295
296 if tpl.TriggerMetadata == nil {
297 return fmt.Errorf("no trigger metadata found")
298 }
299
300 if tpl.TriggerMetadata.Repo == nil {
301 return fmt.Errorf("no repo data found")
302 }
303
304 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
305 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
306 }
307
308 // filter by repos
309 _, err = s.db.GetRepo(
310 tpl.TriggerMetadata.Repo.Knot,
311 tpl.TriggerMetadata.Repo.Did,
312 tpl.TriggerMetadata.Repo.Repo,
313 )
314 if err != nil {
315 return fmt.Errorf("failed to get repo: %w", err)
316 }
317
318 pipelineId := models.PipelineId{
319 Knot: src.Key(),
320 Rkey: msg.Rkey,
321 }
322
323 workflows := make(map[models.Engine][]models.Workflow)
324
325 // Build pipeline environment variables once for all workflows
326 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
327
328 for _, w := range tpl.Workflows {
329 if w != nil {
330 if _, ok := s.engs[w.Engine]; !ok {
331 err = s.db.StatusFailed(models.WorkflowId{
332 PipelineId: pipelineId,
333 Name: w.Name,
334 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
335 if err != nil {
336 return fmt.Errorf("db.StatusFailed: %w", err)
337 }
338
339 continue
340 }
341
342 eng := s.engs[w.Engine]
343
344 if _, ok := workflows[eng]; !ok {
345 workflows[eng] = []models.Workflow{}
346 }
347
348 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
349 if err != nil {
350 return fmt.Errorf("init workflow: %w", err)
351 }
352
353 // inject TANGLED_* env vars after InitWorkflow
354 // This prevents user-defined env vars from overriding them
355 if ewf.Environment == nil {
356 ewf.Environment = make(map[string]string)
357 }
358 maps.Copy(ewf.Environment, pipelineEnv)
359
360 workflows[eng] = append(workflows[eng], *ewf)
361
362 err = s.db.StatusPending(models.WorkflowId{
363 PipelineId: pipelineId,
364 Name: w.Name,
365 }, s.n)
366 if err != nil {
367 return fmt.Errorf("db.StatusPending: %w", err)
368 }
369 }
370 }
371
372 ok := s.jq.Enqueue(queue.Job{
373 Run: func() error {
374 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
375 RepoOwner: tpl.TriggerMetadata.Repo.Did,
376 RepoName: tpl.TriggerMetadata.Repo.Repo,
377 Workflows: workflows,
378 }, pipelineId)
379 return nil
380 },
381 OnFail: func(jobError error) {
382 s.l.Error("pipeline run failed", "error", jobError)
383 },
384 })
385 if ok {
386 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
387 } else {
388 s.l.Error("failed to enqueue pipeline: queue is full")
389 }
390 }
391
392 return nil
393}