A vibe coded tangled fork which supports pijul.
1package nixery
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "log/slog"
8 "os"
9 "path"
10 "path/filepath"
11 "regexp"
12 "runtime"
13 "sync"
14 "time"
15
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "github.com/docker/docker/api/types/container"
18 "github.com/docker/docker/api/types/filters"
19 "github.com/docker/docker/api/types/image"
20 "github.com/docker/docker/api/types/mount"
21 "github.com/docker/docker/api/types/network"
22 "github.com/docker/docker/client"
23 "github.com/stretchr/testify/assert/yaml"
24 "tangled.org/core/api/tangled"
25 "tangled.org/core/sets"
26 "tangled.org/core/spindle/config"
27 "tangled.org/core/spindle/models"
28 "tangled.org/core/spindle/repomanager"
29 "tangled.org/core/tid"
30 "tangled.org/core/workflow"
31)
32
33const AdapterID = "nixery"
34
35type Adapter struct {
36 l *slog.Logger
37 repoManager *repomanager.RepoManager
38 docker client.APIClient
39 Timeout time.Duration
40 spindleDid syntax.DID
41 cfg config.NixeryPipelines
42
43 mu sync.RWMutex
44 activeRuns map[syntax.ATURI]models.WorkflowRun
45 subscribers sets.Set[chan<- models.WorkflowRun]
46}
47
48var _ models.Adapter = (*Adapter)(nil)
49
50func New(l *slog.Logger, cfg config.Config, repoManager *repomanager.RepoManager) (*Adapter, error) {
51 dc, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
52 if err != nil {
53 return nil, fmt.Errorf("creating docker client: %w", err)
54 }
55 return &Adapter{
56 l: l,
57 repoManager: repoManager,
58 docker: dc,
59 Timeout: time.Minute * 5, // TODO: set timeout from config
60 spindleDid: cfg.Server.Did(),
61 cfg: cfg.NixeryPipelines,
62
63 activeRuns: make(map[syntax.ATURI]models.WorkflowRun),
64 subscribers: sets.New[chan<- models.WorkflowRun](),
65 }, nil
66}
67
68func (a *Adapter) Init() error {
69 // no-op
70 return nil
71}
72
73func (a *Adapter) Shutdown(ctx context.Context) error {
74 // TODO: cleanup spawned containers just in case
75 panic("unimplemented")
76}
77
78func (a *Adapter) SetupRepo(ctx context.Context, repo syntax.ATURI) error {
79 if err := a.repoManager.RegisterRepo(ctx, repo, []string{"/.tangled/workflows"}); err != nil {
80 return fmt.Errorf("syncing repo: %w", err)
81 }
82 return nil
83}
84
85func (a *Adapter) ListWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]models.WorkflowDef, error) {
86 defs, err := a.listWorkflowDefs(ctx, repo, rev)
87 if err != nil {
88 return nil, err
89 }
90 retDefs := make([]models.WorkflowDef, len(defs))
91 for i, def := range defs {
92 retDefs[i] = def.AsInfo()
93 }
94 return retDefs, nil
95}
96
97func (a *Adapter) listWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]WorkflowDef, error) {
98 workflowDir, err := a.repoManager.FileTree(ctx, repo, rev, workflow.WorkflowDir)
99 if err != nil {
100 return nil, fmt.Errorf("loading file tree: %w", err)
101 }
102
103 if len(workflowDir) == 0 {
104 return nil, nil
105 }
106
107 // TODO(boltless): repoManager.FileTree() should be smart enough so we don't need to do this:
108 gr, err := a.repoManager.Open(repo, rev)
109 if err != nil {
110 return nil, fmt.Errorf("opening git repo: %w", err)
111 }
112
113 var defs []WorkflowDef
114 for _, e := range workflowDir {
115 if !e.IsFile() {
116 continue
117 }
118
119 fpath := filepath.Join(workflow.WorkflowDir, e.Name)
120 contents, err := gr.RawContent(fpath)
121 if err != nil {
122 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err)
123 }
124
125 var wf WorkflowDef
126 if err := yaml.Unmarshal(contents, &wf); err != nil {
127 return nil, fmt.Errorf("parsing yaml: %w", err)
128 }
129 wf.Name = e.Name
130
131 defs = append(defs, wf)
132 }
133
134 return defs, nil
135}
136
137func (a *Adapter) EvaluateEvent(ctx context.Context, event models.Event) ([]models.WorkflowRun, error) {
138 defs, err := a.listWorkflowDefs(ctx, event.SourceRepo, event.SourceSha)
139 if err != nil {
140 return nil, fmt.Errorf("fetching workflow definitions: %w", err)
141 }
142
143 // filter out triggered workflows
144 var triggered []nixeryWorkflow
145 for _, def := range defs {
146 if def.ShouldRunOn(event) {
147 triggered = append(triggered, nixeryWorkflow{
148 event: event,
149 def: def,
150 })
151 }
152 }
153
154 // TODO: append more workflows from "on_workflow" event
155
156 // schedule workflows and return immediately
157 runs := make([]models.WorkflowRun, len(triggered))
158 for i, workflow := range triggered {
159 runs[i] = a.scheduleWorkflow(ctx, workflow)
160 }
161 return runs, nil
162}
163
164// NOTE: nixery adapter is volatile. GetActiveWorkflowRun will return error
165// when the workflow is terminated. It lets spindle to mark lost workflow.run
166// as "Failed".
167func (a *Adapter) GetActiveWorkflowRun(ctx context.Context, runId syntax.ATURI) (models.WorkflowRun, error) {
168 a.mu.RLock()
169 run, exists := a.activeRuns[runId]
170 a.mu.RUnlock()
171 if !exists {
172 return run, fmt.Errorf("unknown or terminated workflow")
173 }
174 return run, nil
175}
176
177func (a *Adapter) ListActiveWorkflowRuns(ctx context.Context) ([]models.WorkflowRun, error) {
178 a.mu.RLock()
179 defer a.mu.RUnlock()
180
181 runs := make([]models.WorkflowRun, 0, len(a.activeRuns))
182 for _, run := range a.activeRuns {
183 runs = append(runs, run)
184 }
185 return runs, nil
186}
187
188func (a *Adapter) SubscribeWorkflowRun(ctx context.Context) <-chan models.WorkflowRun {
189 ch := make(chan models.WorkflowRun, 1)
190
191 a.mu.Lock()
192 a.subscribers.Insert(ch)
193 a.mu.Unlock()
194
195 // cleanup spindle stops listening
196 go func() {
197 <-ctx.Done()
198 a.mu.Lock()
199 a.subscribers.Remove(ch)
200 a.mu.Unlock()
201 close(ch)
202 }()
203
204 return ch
205}
206
207func (a *Adapter) emit(run models.WorkflowRun) {
208 a.mu.Lock()
209 if run.Status.IsActive() {
210 a.activeRuns[run.AtUri()] = run
211 } else {
212 delete(a.activeRuns, run.AtUri())
213 }
214
215 // Snapshot subscribers to broadcast outside the lock
216 subs := make([]chan<- models.WorkflowRun, 0, a.subscribers.Len())
217 for ch := range a.subscribers.All() {
218 subs = append(subs, ch)
219 }
220 a.mu.Unlock()
221
222 for _, ch := range subs {
223 select {
224 case ch <- run:
225 default:
226 // avoid blocking if channel is full
227 // spindle will catch the state by regular GetWorkflowRun poll
228 }
229 }
230}
231
232func (a *Adapter) StreamWorkflowRunLogs(ctx context.Context, runId syntax.ATURI, handle func(line models.LogLine) error) error {
233 panic("unimplemented")
234}
235
236func (a *Adapter) CancelWorkflowRun(ctx context.Context, runId syntax.ATURI) error {
237 // remove network
238 if err := a.docker.NetworkRemove(ctx, networkName(runId)); err != nil {
239 return fmt.Errorf("removing network: %w", err)
240 }
241
242 // stop & remove docker containers with label
243 containers, err := a.docker.ContainerList(ctx, container.ListOptions{
244 Filters: labelFilter(tangled.CiWorkflowRunNSID, runId.String()),
245 })
246 if err != nil {
247 return fmt.Errorf("finding container with label: %w", err)
248 }
249 for _, c := range containers {
250 if err := a.docker.ContainerStop(ctx, c.ID, container.StopOptions{}); err != nil {
251 return fmt.Errorf("stopping container: %w", err)
252 }
253
254 if err := a.docker.ContainerRemove(ctx, c.ID, container.RemoveOptions{
255 RemoveVolumes: true,
256 RemoveLinks: false,
257 Force: false,
258 }); err != nil {
259 return fmt.Errorf("removing container: %w", err)
260 }
261 }
262 return nil
263}
264
265func labelFilter(labelKey, labelVal string) filters.Args {
266 filterArgs := filters.NewArgs()
267 filterArgs.Add("label", fmt.Sprintf("%s=%s", labelKey, labelVal))
268 return filterArgs
269}
270
271const (
272 workspaceDir = "/tangled/workspace"
273 homeDir = "/tangled/home"
274)
275
276// scheduleWorkflow schedules a workflow run in job queue and return queued run
277func (a *Adapter) scheduleWorkflow(ctx context.Context, workflow nixeryWorkflow) models.WorkflowRun {
278 l := a.l
279
280 run := models.WorkflowRun{
281 Did: a.spindleDid,
282 Rkey: syntax.RecordKey(tid.TID()),
283 AdapterId: AdapterID,
284 Name: workflow.def.Name,
285 Status: models.WorkflowStatusPending,
286 }
287
288 a.mu.Lock()
289 a.activeRuns[run.AtUri()] = run
290 a.mu.Unlock()
291
292 go func() {
293 defer a.CancelWorkflowRun(ctx, run.AtUri())
294
295 containerId, err := a.initNixeryContainer(ctx, workflow.def, run.AtUri())
296 if err != nil {
297 l.Error("failed to intialize container", "err", err)
298 // TODO: put user-facing logs in workflow log
299 a.emit(run.WithStatus(models.WorkflowStatusFailed))
300 return
301 }
302
303 ctx, cancel := context.WithTimeout(ctx, a.Timeout)
304 defer cancel()
305
306 for stepIdx, step := range workflow.def.Steps {
307 if err := a.runStep(ctx, containerId, stepIdx, step); err != nil {
308 l.Error("failed to run step", "stepIdx", stepIdx, "err", err)
309 return
310 }
311 }
312 l.Info("all steps completed successfully")
313 }()
314
315 l.Info("workflow scheduled to background", "workflow.run", run.AtUri())
316
317 return run
318}
319
320func (a *Adapter) runStep(ctx context.Context, containerId string, stepIdx int, step Step) error {
321 // TODO: implement this
322
323 // TODO: configure envs
324 var envs []string
325
326 select {
327 case <-ctx.Done():
328 return ctx.Err()
329 default:
330 }
331
332 mkExecResp, err := a.docker.ContainerExecCreate(ctx, containerId, container.ExecOptions{
333 Cmd: []string{"bash", "-c", step.Command},
334 AttachStdout: true,
335 AttachStderr: true,
336 Env: envs,
337 })
338 if err != nil {
339 return fmt.Errorf("creating exec: %w", err)
340 }
341
342 panic("unimplemented")
343}
344
345// initNixeryContainer pulls the image from nixery and start the container.
346func (a *Adapter) initNixeryContainer(ctx context.Context, def WorkflowDef, runAt syntax.ATURI) (string, error) {
347 imageName := workflowImageName(def.Dependencies, a.cfg.Nixery)
348
349 _, err := a.docker.NetworkCreate(ctx, networkName(runAt), network.CreateOptions{
350 Driver: "bridge",
351 })
352 if err != nil {
353 return "", fmt.Errorf("creating network: %w", err)
354 }
355
356 reader, err := a.docker.ImagePull(ctx, imageName, image.PullOptions{})
357 if err != nil {
358 return "", fmt.Errorf("pulling image: %w", err)
359 }
360 defer reader.Close()
361 io.Copy(os.Stdout, reader)
362
363 resp, err := a.docker.ContainerCreate(ctx, &container.Config{
364 Image: imageName,
365 Cmd: []string{"cat"},
366 OpenStdin: true, // so cat stays alive :3
367 Tty: false,
368 Hostname: "spindle",
369 WorkingDir: workspaceDir,
370 Labels: map[string]string{
371 tangled.CiWorkflowRunNSID: runAt.String(),
372 },
373 // TODO(winter): investigate whether environment variables passed here
374 // get propagated to ContainerExec processes
375 }, &container.HostConfig{
376 Mounts: []mount.Mount{
377 {
378 Type: mount.TypeTmpfs,
379 Target: "/tmp",
380 ReadOnly: false,
381 TmpfsOptions: &mount.TmpfsOptions{
382 Mode: 0o1777, // world-writeable sticky bit
383 Options: [][]string{
384 {"exec"},
385 },
386 },
387 },
388 },
389 ReadonlyRootfs: false,
390 CapDrop: []string{"ALL"},
391 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"},
392 SecurityOpt: []string{"no-new-privileges"},
393 ExtraHosts: []string{"host.docker.internal:host-gateway"},
394 }, nil, nil, "")
395 if err != nil {
396 return "", fmt.Errorf("creating container: %w", err)
397 }
398
399 if err := a.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
400 return "", fmt.Errorf("starting container: %w", err)
401 }
402
403 mkExecResp, err := a.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
404 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir},
405 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
406 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
407 })
408 if err != nil {
409 return "", err
410 }
411
412 // This actually *starts* the command. Thanks, Docker!
413 execResp, err := a.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
414 if err != nil {
415 return "", err
416 }
417 defer execResp.Close()
418
419 // This is apparently best way to wait for the command to complete.
420 _, err = io.ReadAll(execResp.Reader)
421 if err != nil {
422 return "", err
423 }
424
425 execInspectResp, err := a.docker.ContainerExecInspect(ctx, mkExecResp.ID)
426 if err != nil {
427 return "", err
428 }
429
430 if execInspectResp.ExitCode != 0 {
431 return "", fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
432 } else if execInspectResp.Running {
433 return "", fmt.Errorf("mkdir is somehow still running??")
434 }
435
436 return resp.ID, nil
437}
438
439func workflowImageName(deps map[string][]string, nixery string) string {
440 var dependencies string
441 for reg, ds := range deps {
442 if reg == "nixpkgs" {
443 dependencies = path.Join(ds...)
444 }
445 }
446 // NOTE: shouldn't base dependencies come first?
447 // like: nixery.tangled.sh/arm64/bash/git/coreutils/nix
448 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
449 if runtime.GOARCH == "arm64" {
450 dependencies = path.Join("arm64", dependencies)
451 }
452
453 return path.Join(nixery, dependencies)
454}
455
456var re = regexp.MustCompile(`[^a-zA-Z0-9_.-]`)
457func networkName(runId syntax.ATURI) string {
458 return re.ReplaceAllString(runId.String()[5:], "-")
459}