A vibe coded tangled fork which supports pijul.
1package nixery
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "runtime"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "gopkg.in/yaml.v3"
22 "tangled.org/core/api/tangled"
23 "tangled.org/core/log"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/secrets"
28)
29
30const (
31 workspaceDir = "/tangled/workspace"
32 homeDir = "/tangled/home"
33)
34
35type cleanupFunc func(context.Context) error
36
37type Engine struct {
38 docker client.APIClient
39 l *slog.Logger
40 cfg *config.Config
41
42 cleanupMu sync.Mutex
43 cleanup map[string][]cleanupFunc
44}
45
46type Step struct {
47 name string
48 kind models.StepKind
49 command string
50 environment map[string]string
51}
52
53func (s Step) Name() string {
54 return s.name
55}
56
57func (s Step) Command() string {
58 return s.command
59}
60
61func (s Step) Kind() models.StepKind {
62 return s.kind
63}
64
65// setupSteps get added to start of Steps
66type setupSteps []models.Step
67
68// addStep adds a step to the beginning of the workflow's steps.
69func (ss *setupSteps) addStep(step models.Step) {
70 *ss = append(*ss, step)
71}
72
73type addlFields struct {
74 image string
75 container string
76}
77
78func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
79 swf := &models.Workflow{}
80 addl := addlFields{}
81
82 dwf := &struct {
83 Steps []struct {
84 Command string `yaml:"command"`
85 Name string `yaml:"name"`
86 Environment map[string]string `yaml:"environment"`
87 } `yaml:"steps"`
88 Dependencies map[string][]string `yaml:"dependencies"`
89 Environment map[string]string `yaml:"environment"`
90 }{}
91 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
92 if err != nil {
93 return nil, err
94 }
95
96 for _, dstep := range dwf.Steps {
97 sstep := Step{}
98 sstep.environment = dstep.Environment
99 sstep.command = dstep.Command
100 sstep.name = dstep.Name
101 sstep.kind = models.StepKindUser
102 swf.Steps = append(swf.Steps, sstep)
103 }
104 swf.Name = twf.Name
105 swf.Environment = dwf.Environment
106 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery)
107
108 setup := &setupSteps{}
109
110 setup.addStep(nixConfStep())
111 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
112 // this step could be empty
113 if s := dependencyStep(dwf.Dependencies); s != nil {
114 setup.addStep(*s)
115 }
116
117 // append setup steps in order to the start of workflow steps
118 swf.Steps = append(*setup, swf.Steps...)
119 swf.Data = addl
120
121 return swf, nil
122}
123
124func (e *Engine) WorkflowTimeout() time.Duration {
125 return e.cfg.NixeryPipelines.WorkflowTimeout
126}
127
128func workflowImage(deps map[string][]string, nixery string) string {
129 var dependencies string
130 for reg, ds := range deps {
131 if reg == "nixpkgs" {
132 dependencies = path.Join(ds...)
133 }
134 }
135
136 // load defaults from somewhere else
137 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
138
139 if runtime.GOARCH == "arm64" {
140 dependencies = path.Join("arm64", dependencies)
141 }
142
143 return path.Join(nixery, dependencies)
144}
145
146func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
147 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
148 if err != nil {
149 return nil, err
150 }
151
152 l := log.FromContext(ctx).With("component", "spindle")
153
154 e := &Engine{
155 docker: dcli,
156 l: l,
157 cfg: cfg,
158 }
159
160 e.cleanup = make(map[string][]cleanupFunc)
161
162 return e, nil
163}
164
165func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
166 e.l.Info("setting up workflow", "workflow", wid)
167
168 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
169 Driver: "bridge",
170 })
171 if err != nil {
172 return err
173 }
174 e.registerCleanup(wid, func(ctx context.Context) error {
175 if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil {
176 return fmt.Errorf("removing network: %w", err)
177 }
178 return nil
179 })
180
181 addl := wf.Data.(addlFields)
182
183 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
184 if err != nil {
185 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
186
187 return fmt.Errorf("pulling image: %w", err)
188 }
189 defer reader.Close()
190 io.Copy(os.Stdout, reader)
191
192 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
193 Image: addl.image,
194 Cmd: []string{"cat"},
195 OpenStdin: true, // so cat stays alive :3
196 Tty: false,
197 Hostname: "spindle",
198 WorkingDir: workspaceDir,
199 Labels: map[string]string{
200 "sh.tangled.pipeline/workflow_id": wid.String(),
201 },
202 // TODO(winter): investigate whether environment variables passed here
203 // get propagated to ContainerExec processes
204 }, &container.HostConfig{
205 Mounts: []mount.Mount{
206 {
207 Type: mount.TypeTmpfs,
208 Target: "/tmp",
209 ReadOnly: false,
210 TmpfsOptions: &mount.TmpfsOptions{
211 Mode: 0o1777, // world-writeable sticky bit
212 Options: [][]string{
213 {"exec"},
214 },
215 },
216 },
217 },
218 ReadonlyRootfs: false,
219 CapDrop: []string{"ALL"},
220 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"},
221 SecurityOpt: []string{"no-new-privileges"},
222 ExtraHosts: []string{"host.docker.internal:host-gateway"},
223 }, nil, nil, "")
224 if err != nil {
225 return fmt.Errorf("creating container: %w", err)
226 }
227 e.registerCleanup(wid, func(ctx context.Context) error {
228 if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil {
229 return fmt.Errorf("stopping container: %w", err)
230 }
231
232 err := e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
233 RemoveVolumes: true,
234 RemoveLinks: false,
235 Force: false,
236 })
237 if err != nil {
238 return fmt.Errorf("removing container: %w", err)
239 }
240 return nil
241 })
242
243 if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
244 return fmt.Errorf("starting container: %w", err)
245 }
246
247 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
248 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir},
249 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
250 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
251 })
252 if err != nil {
253 return err
254 }
255
256 // This actually *starts* the command. Thanks, Docker!
257 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
258 if err != nil {
259 return err
260 }
261 defer execResp.Close()
262
263 // This is apparently best way to wait for the command to complete.
264 _, err = io.ReadAll(execResp.Reader)
265 if err != nil {
266 return err
267 }
268
269 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
270 if err != nil {
271 return err
272 }
273
274 if execInspectResp.ExitCode != 0 {
275 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
276 } else if execInspectResp.Running {
277 return errors.New("mkdir is somehow still running??")
278 }
279
280 addl.container = resp.ID
281 wf.Data = addl
282
283 return nil
284}
285
286func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
287 addl := w.Data.(addlFields)
288 workflowEnvs := ConstructEnvs(w.Environment)
289 // TODO(winter): should SetupWorkflow also have secret access?
290 // IMO yes, but probably worth thinking on.
291 for _, s := range secrets {
292 workflowEnvs.AddEnv(s.Key, s.Value)
293 }
294
295 step := w.Steps[idx]
296
297 select {
298 case <-ctx.Done():
299 return ctx.Err()
300 default:
301 }
302
303 envs := append(EnvVars(nil), workflowEnvs...)
304 if nixStep, ok := step.(Step); ok {
305 for k, v := range nixStep.environment {
306 envs.AddEnv(k, v)
307 }
308 }
309 envs.AddEnv("HOME", homeDir)
310
311 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
312 Cmd: []string{"bash", "-c", step.Command()},
313 AttachStdout: true,
314 AttachStderr: true,
315 Env: envs,
316 })
317 if err != nil {
318 return fmt.Errorf("creating exec: %w", err)
319 }
320
321 // start tailing logs in background
322 tailDone := make(chan error, 1)
323 go func() {
324 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
325 }()
326
327 select {
328 case <-tailDone:
329
330 case <-ctx.Done():
331 // cleanup will be handled by DestroyWorkflow, since
332 // Docker doesn't provide an API to kill an exec run
333 // (sure, we could grab the PID and kill it ourselves,
334 // but that's wasted effort)
335 e.l.Warn("step timed out", "step", step.Name())
336
337 <-tailDone
338
339 return engine.ErrTimedOut
340 }
341
342 select {
343 case <-ctx.Done():
344 return ctx.Err()
345 default:
346 }
347
348 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
349 if err != nil {
350 return err
351 }
352
353 if execInspectResp.ExitCode != 0 {
354 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
355 if err != nil {
356 return err
357 }
358
359 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
360
361 if inspectResp.State.OOMKilled {
362 return ErrOOMKilled
363 }
364 return engine.ErrWorkflowFailed
365 }
366
367 return nil
368}
369
370func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
371 if wfLogger == nil {
372 return nil
373 }
374
375 // This actually *starts* the command. Thanks, Docker!
376 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
377 if err != nil {
378 return err
379 }
380 defer logs.Close()
381
382 _, err = stdcopy.StdCopy(
383 wfLogger.DataWriter(stepIdx, "stdout"),
384 wfLogger.DataWriter(stepIdx, "stderr"),
385 logs.Reader,
386 )
387 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
388 return fmt.Errorf("failed to copy logs: %w", err)
389 }
390
391 return nil
392}
393
394func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
395 fns := e.drainCleanups(wid)
396
397 for _, fn := range fns {
398 if err := fn(ctx); err != nil {
399 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
400 }
401 }
402 return nil
403}
404
405func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
406 e.cleanupMu.Lock()
407 defer e.cleanupMu.Unlock()
408
409 key := wid.String()
410 e.cleanup[key] = append(e.cleanup[key], fn)
411}
412
413func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
414 e.cleanupMu.Lock()
415 key := wid.String()
416
417 fns := e.cleanup[key]
418 delete(e.cleanup, key)
419 e.cleanupMu.Unlock()
420
421 return fns
422}
423
424func networkName(wid models.WorkflowId) string {
425 return fmt.Sprintf("workflow-network-%s", wid)
426}