A vibe coded tangled fork which supports pijul.
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/service/tap"
16 "github.com/go-chi/chi/v5"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/eventconsumer"
19 "tangled.org/core/eventconsumer/cursor"
20 "tangled.org/core/idresolver"
21 "tangled.org/core/log"
22 "tangled.org/core/notifier"
23 "tangled.org/core/rbac2"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/db"
26 "tangled.org/core/spindle/engine"
27 "tangled.org/core/spindle/engines/nixery"
28 "tangled.org/core/spindle/models"
29 "tangled.org/core/spindle/queue"
30 "tangled.org/core/spindle/secrets"
31 "tangled.org/core/spindle/xrpc"
32 "tangled.org/core/tapc"
33 "tangled.org/core/xrpc/serviceauth"
34)
35
36//go:embed motd
37var defaultMotd []byte
38
39type Spindle struct {
40 tap *tapc.Client
41 db *db.DB
42 e *rbac2.Enforcer
43 l *slog.Logger
44 n *notifier.Notifier
45 engs map[string]models.Engine
46 jq *queue.Queue
47 cfg *config.Config
48 ks *eventconsumer.Consumer
49 res *idresolver.Resolver
50 vault secrets.Manager
51 motd []byte
52 motdMu sync.RWMutex
53}
54
55// New creates a new Spindle server with the provided configuration and engines.
56func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
57 logger := log.FromContext(ctx)
58
59 d, err := db.Make(ctx, cfg.Server.DBPath)
60 if err != nil {
61 return nil, fmt.Errorf("failed to setup db: %w", err)
62 }
63
64 e, err := rbac2.NewEnforcer(cfg.Server.DBPath)
65 if err != nil {
66 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
67 }
68
69 n := notifier.New()
70
71 var vault secrets.Manager
72 switch cfg.Server.Secrets.Provider {
73 case "openbao":
74 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
75 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
76 }
77 vault, err = secrets.NewOpenBaoManager(
78 cfg.Server.Secrets.OpenBao.ProxyAddr,
79 logger,
80 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
81 )
82 if err != nil {
83 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
84 }
85 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
86 case "sqlite", "":
87 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
88 if err != nil {
89 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
90 }
91 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
92 default:
93 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
94 }
95
96 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
97 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
98
99 tap := tapc.NewClient("http://localhost:"+cfg.Server.TapPort, "")
100
101 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
102
103 spindle := &Spindle{
104 tap: &tap,
105 e: e,
106 db: d,
107 l: logger,
108 n: &n,
109 engs: engines,
110 jq: jq,
111 cfg: cfg,
112 res: resolver,
113 vault: vault,
114 motd: defaultMotd,
115 }
116
117 err = e.SetSpindleOwner(spindle.cfg.Server.Owner, spindle.cfg.Server.Did())
118 if err != nil {
119 return nil, err
120 }
121 logger.Info("owner set", "did", cfg.Server.Owner)
122
123 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
124 if err != nil {
125 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
126 }
127
128 // for each incoming sh.tangled.pipeline, we execute
129 // spindle.processPipeline, which in turn enqueues the pipeline
130 // job in the above registered queue.
131 ccfg := eventconsumer.NewConsumerConfig()
132 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
133 ccfg.Dev = cfg.Server.Dev
134 ccfg.ProcessFunc = spindle.processPipeline
135 ccfg.CursorStore = cursorStore
136 knownKnots, err := d.Knots()
137 if err != nil {
138 return nil, err
139 }
140 for _, knot := range knownKnots {
141 logger.Info("adding source start", "knot", knot)
142 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
143 }
144 spindle.ks = eventconsumer.NewConsumer(*ccfg)
145
146 return spindle, nil
147}
148
149// DB returns the database instance.
150func (s *Spindle) DB() *db.DB {
151 return s.db
152}
153
154// Queue returns the job queue instance.
155func (s *Spindle) Queue() *queue.Queue {
156 return s.jq
157}
158
159// Engines returns the map of available engines.
160func (s *Spindle) Engines() map[string]models.Engine {
161 return s.engs
162}
163
164// Vault returns the secrets manager instance.
165func (s *Spindle) Vault() secrets.Manager {
166 return s.vault
167}
168
169// Notifier returns the notifier instance.
170func (s *Spindle) Notifier() *notifier.Notifier {
171 return s.n
172}
173
174// Enforcer returns the RBAC enforcer instance.
175func (s *Spindle) Enforcer() *rbac2.Enforcer {
176 return s.e
177}
178
179// SetMotdContent sets custom MOTD content, replacing the embedded default.
180func (s *Spindle) SetMotdContent(content []byte) {
181 s.motdMu.Lock()
182 defer s.motdMu.Unlock()
183 s.motd = content
184}
185
186// GetMotdContent returns the current MOTD content.
187func (s *Spindle) GetMotdContent() []byte {
188 s.motdMu.RLock()
189 defer s.motdMu.RUnlock()
190 return s.motd
191}
192
193// Start starts the Spindle server (blocking).
194func (s *Spindle) Start(ctx context.Context) error {
195 svcErr := make(chan error, 1)
196
197 // starts a job queue runner in the background
198 s.jq.Start()
199 defer s.jq.Stop()
200
201 // Stop vault token renewal if it implements Stopper
202 if stopper, ok := s.vault.(secrets.Stopper); ok {
203 defer stopper.Stop()
204 }
205
206 go func() {
207 s.l.Info("starting knot event consumer")
208 s.ks.Start(ctx)
209 }()
210
211 tap, err := tap.New(tap.Config{
212 DatabaseURL: s.cfg.Server.TapDBUrl,
213 DBMaxConns: 32,
214 PLCURL: s.cfg.Server.PlcUrl,
215 RelayUrl: s.cfg.Server.RelayUrl,
216 FirehoseParallelism: 10,
217 ResyncParallelism: 5,
218 OutboxParallelism: 1,
219 FirehoseCursorSaveInterval: 1 * time.Second,
220 RepoFetchTimeout: 300 * time.Second,
221 IdentityCacheSize: 2_000_000,
222 EventCacheSize: 100_000,
223 CollectionFilters: []string{tangled.RepoNSID, tangled.RepoCollaboratorNSID, tangled.SpindleMemberNSID},
224 RetryTimeout: 3 * time.Second,
225 })
226 if err != nil {
227 return err
228 }
229 go func() {
230 if err := tap.RunTap(ctx, ":"+s.cfg.Server.TapPort); err != nil {
231 svcErr <- err
232 }
233 }()
234 go func() {
235 s.l.Info("starting embedded tap server")
236
237 s.l.Info("starting tap stream consumer")
238 s.tap.Connect(ctx, &tapc.SimpleIndexer{
239 EventHandler: s.processEvent,
240 })
241 }()
242
243 s.l.Debug("waiting for tap readiness")
244 tapReadyCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
245 defer cancel()
246 if err := s.tap.WaitReady(tapReadyCtx); err != nil {
247 return fmt.Errorf("tap not ready: %w", err)
248 }
249
250 // ensure server owner is tracked
251 s.l.Debug("adding server owner to tap")
252 if err := s.tap.AddRepos(ctx, []syntax.DID{s.cfg.Server.Owner}); err != nil {
253 return err
254 }
255
256 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
257 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
258}
259
260func Run(ctx context.Context) error {
261 cfg, err := config.Load(ctx)
262 if err != nil {
263 return fmt.Errorf("failed to load config: %w", err)
264 }
265
266 nixeryEng, err := nixery.New(ctx, cfg)
267 if err != nil {
268 return err
269 }
270
271 s, err := New(ctx, cfg, map[string]models.Engine{
272 "nixery": nixeryEng,
273 })
274 if err != nil {
275 return err
276 }
277
278 return s.Start(ctx)
279}
280
281func (s *Spindle) Router() http.Handler {
282 mux := chi.NewRouter()
283
284 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
285 w.Write(s.GetMotdContent())
286 })
287 mux.HandleFunc("/events", s.Events)
288 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
289
290 mux.Mount("/xrpc", s.XrpcRouter())
291 return mux
292}
293
294func (s *Spindle) XrpcRouter() http.Handler {
295 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
296
297 l := log.SubLogger(s.l, "xrpc")
298
299 x := xrpc.Xrpc{
300 Logger: l,
301 Db: s.db,
302 Enforcer: s.e,
303 Engines: s.engs,
304 Config: s.cfg,
305 Resolver: s.res,
306 Vault: s.vault,
307 Notifier: s.Notifier(),
308 ServiceAuth: serviceAuth,
309 }
310
311 return x.Router()
312}
313
314func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
315 if msg.Nsid == tangled.PipelineNSID {
316 tpl := tangled.Pipeline{}
317 err := json.Unmarshal(msg.EventJson, &tpl)
318 if err != nil {
319 fmt.Println("error unmarshalling", err)
320 return err
321 }
322
323 if tpl.TriggerMetadata == nil {
324 return fmt.Errorf("no trigger metadata found")
325 }
326
327 if tpl.TriggerMetadata.Repo == nil {
328 return fmt.Errorf("no repo data found")
329 }
330
331 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
332 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
333 }
334
335 // filter by repos
336 _, err = s.db.GetRepoWithName(
337 syntax.DID(tpl.TriggerMetadata.Repo.Did),
338 tpl.TriggerMetadata.Repo.Repo,
339 )
340 if err != nil {
341 return fmt.Errorf("failed to get repo: %w", err)
342 }
343
344 pipelineId := models.PipelineId{
345 Knot: src.Key(),
346 Rkey: msg.Rkey,
347 }
348
349 workflows := make(map[models.Engine][]models.Workflow)
350
351 // Build pipeline environment variables once for all workflows
352 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId, s.cfg.Server.Dev)
353
354 for _, w := range tpl.Workflows {
355 if w != nil {
356 if _, ok := s.engs[w.Engine]; !ok {
357 err = s.db.StatusFailed(models.WorkflowId{
358 PipelineId: pipelineId,
359 Name: w.Name,
360 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
361 if err != nil {
362 return fmt.Errorf("db.StatusFailed: %w", err)
363 }
364
365 continue
366 }
367
368 eng := s.engs[w.Engine]
369
370 if _, ok := workflows[eng]; !ok {
371 workflows[eng] = []models.Workflow{}
372 }
373
374 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
375 if err != nil {
376 return fmt.Errorf("init workflow: %w", err)
377 }
378
379 // inject TANGLED_* env vars after InitWorkflow
380 // This prevents user-defined env vars from overriding them
381 if ewf.Environment == nil {
382 ewf.Environment = make(map[string]string)
383 }
384 maps.Copy(ewf.Environment, pipelineEnv)
385
386 workflows[eng] = append(workflows[eng], *ewf)
387
388 err = s.db.StatusPending(models.WorkflowId{
389 PipelineId: pipelineId,
390 Name: w.Name,
391 }, s.n)
392 if err != nil {
393 return fmt.Errorf("db.StatusPending: %w", err)
394 }
395 }
396 }
397
398 ok := s.jq.Enqueue(queue.Job{
399 Run: func() error {
400 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
401 RepoOwner: tpl.TriggerMetadata.Repo.Did,
402 RepoName: tpl.TriggerMetadata.Repo.Repo,
403 Workflows: workflows,
404 }, pipelineId)
405 return nil
406 },
407 OnFail: func(jobError error) {
408 s.l.Error("pipeline run failed", "error", jobError)
409 },
410 })
411 if ok {
412 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
413 } else {
414 s.l.Error("failed to enqueue pipeline: queue is full")
415 }
416 }
417
418 return nil
419}