A vibe coded tangled fork which supports pijul.
1package nixery
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "path"
11 "runtime"
12 "sync"
13 "time"
14
15 "github.com/docker/docker/api/types/container"
16 "github.com/docker/docker/api/types/image"
17 "github.com/docker/docker/api/types/mount"
18 "github.com/docker/docker/api/types/network"
19 "github.com/docker/docker/client"
20 "github.com/docker/docker/pkg/stdcopy"
21 "gopkg.in/yaml.v3"
22 "tangled.org/core/api/tangled"
23 "tangled.org/core/log"
24 "tangled.org/core/spindle/config"
25 "tangled.org/core/spindle/engine"
26 "tangled.org/core/spindle/engines/common"
27 engineerrors "tangled.org/core/spindle/engines/errors"
28 "tangled.org/core/spindle/models"
29 "tangled.org/core/spindle/secrets"
30)
31
32const (
33 workspaceDir = "/tangled/workspace"
34 homeDir = "/tangled/home"
35)
36
37type cleanupFunc func(context.Context) error
38
39type Engine struct {
40 docker client.APIClient
41 l *slog.Logger
42 cfg *config.Config
43
44 cleanupMu sync.Mutex
45 cleanup map[string][]cleanupFunc
46}
47
48type Step struct {
49 name string
50 kind models.StepKind
51 command string
52 environment map[string]string
53}
54
55func (s Step) Name() string {
56 return s.name
57}
58
59func (s Step) Command() string {
60 return s.command
61}
62
63func (s Step) Kind() models.StepKind {
64 return s.kind
65}
66
67// setupSteps get added to start of Steps
68type setupSteps []models.Step
69
70// addStep adds a step to the beginning of the workflow's steps.
71func (ss *setupSteps) addStep(step models.Step) {
72 *ss = append(*ss, step)
73}
74
75type addlFields struct {
76 image string
77 container string
78}
79
80func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
81 swf := &models.Workflow{}
82 addl := addlFields{}
83
84 dwf := &struct {
85 Steps []struct {
86 Command string `yaml:"command"`
87 Name string `yaml:"name"`
88 Environment map[string]string `yaml:"environment"`
89 } `yaml:"steps"`
90 Dependencies map[string][]string `yaml:"dependencies"`
91 Environment map[string]string `yaml:"environment"`
92 }{}
93 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
94 if err != nil {
95 return nil, err
96 }
97
98 for _, dstep := range dwf.Steps {
99 sstep := Step{}
100 sstep.environment = dstep.Environment
101 sstep.command = dstep.Command
102 sstep.name = dstep.Name
103 sstep.kind = models.StepKindUser
104 swf.Steps = append(swf.Steps, sstep)
105 }
106 swf.Name = twf.Name
107 swf.Environment = dwf.Environment
108 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery)
109
110 setup := &setupSteps{}
111
112 setup.addStep(nixConfStep())
113 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
114 // this step could be empty
115 if s := dependencyStep(dwf.Dependencies); s != nil {
116 setup.addStep(*s)
117 }
118
119 // append setup steps in order to the start of workflow steps
120 swf.Steps = append(*setup, swf.Steps...)
121 swf.Data = addl
122
123 return swf, nil
124}
125
126func (e *Engine) WorkflowTimeout() time.Duration {
127 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout
128 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
129 if err != nil {
130 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
131 workflowTimeout = 5 * time.Minute
132 }
133
134 return workflowTimeout
135}
136
137func workflowImage(deps map[string][]string, nixery string) string {
138 var dependencies string
139 for reg, ds := range deps {
140 if reg == "nixpkgs" {
141 dependencies = path.Join(ds...)
142 }
143 }
144
145 // load defaults from somewhere else
146 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix")
147
148 if runtime.GOARCH == "arm64" {
149 dependencies = path.Join("arm64", dependencies)
150 }
151
152 return path.Join(nixery, dependencies)
153}
154
155func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
156 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
157 if err != nil {
158 return nil, err
159 }
160
161 l := log.FromContext(ctx).With("component", "spindle")
162
163 e := &Engine{
164 docker: dcli,
165 l: l,
166 cfg: cfg,
167 }
168
169 e.cleanup = make(map[string][]cleanupFunc)
170
171 return e, nil
172}
173
174func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
175 e.l.Info("setting up workflow", "workflow", wid)
176
177 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
178 Driver: "bridge",
179 })
180 if err != nil {
181 return err
182 }
183 e.registerCleanup(wid, func(ctx context.Context) error {
184 return e.docker.NetworkRemove(ctx, networkName(wid))
185 })
186
187 addl := wf.Data.(addlFields)
188
189 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
190 if err != nil {
191 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
192
193 return fmt.Errorf("pulling image: %w", err)
194 }
195 defer reader.Close()
196 io.Copy(os.Stdout, reader)
197
198 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
199 Image: addl.image,
200 Cmd: []string{"cat"},
201 OpenStdin: true, // so cat stays alive :3
202 Tty: false,
203 Hostname: "spindle",
204 WorkingDir: workspaceDir,
205 Labels: map[string]string{
206 "sh.tangled.pipeline/workflow_id": wid.String(),
207 },
208 // TODO(winter): investigate whether environment variables passed here
209 // get propagated to ContainerExec processes
210 }, &container.HostConfig{
211 Mounts: []mount.Mount{
212 {
213 Type: mount.TypeTmpfs,
214 Target: "/tmp",
215 ReadOnly: false,
216 TmpfsOptions: &mount.TmpfsOptions{
217 Mode: 0o1777, // world-writeable sticky bit
218 Options: [][]string{
219 {"exec"},
220 },
221 },
222 },
223 },
224 ReadonlyRootfs: false,
225 CapDrop: []string{"ALL"},
226 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"},
227 SecurityOpt: []string{"no-new-privileges"},
228 ExtraHosts: []string{"host.docker.internal:host-gateway"},
229 }, nil, nil, "")
230 if err != nil {
231 return fmt.Errorf("creating container: %w", err)
232 }
233 e.registerCleanup(wid, func(ctx context.Context) error {
234 err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
235 if err != nil {
236 return err
237 }
238
239 return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
240 RemoveVolumes: true,
241 RemoveLinks: false,
242 Force: false,
243 })
244 })
245
246 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
247 if err != nil {
248 return fmt.Errorf("starting container: %w", err)
249 }
250
251 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
252 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir},
253 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe??
254 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default")
255 })
256 if err != nil {
257 return err
258 }
259
260 // This actually *starts* the command. Thanks, Docker!
261 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
262 if err != nil {
263 return err
264 }
265 defer execResp.Close()
266
267 // This is apparently best way to wait for the command to complete.
268 _, err = io.ReadAll(execResp.Reader)
269 if err != nil {
270 return err
271 }
272
273 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
274 if err != nil {
275 return err
276 }
277
278 if execInspectResp.ExitCode != 0 {
279 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
280 } else if execInspectResp.Running {
281 return errors.New("mkdir is somehow still running??")
282 }
283
284 addl.container = resp.ID
285 wf.Data = addl
286
287 return nil
288}
289
290func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
291 addl := w.Data.(addlFields)
292 workflowEnvs := common.ConstructEnvs(w.Environment)
293 // TODO(winter): should SetupWorkflow also have secret access?
294 // IMO yes, but probably worth thinking on.
295 for _, s := range secrets {
296 workflowEnvs.AddEnv(s.Key, s.Value)
297 }
298
299 step := w.Steps[idx]
300
301 select {
302 case <-ctx.Done():
303 return ctx.Err()
304 default:
305 }
306
307 envs := append(common.EnvVars(nil), workflowEnvs...)
308 if nixStep, ok := step.(Step); ok {
309 for k, v := range nixStep.environment {
310 envs.AddEnv(k, v)
311 }
312 }
313 envs.AddEnv("HOME", homeDir)
314
315 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
316 Cmd: []string{"bash", "-c", step.Command()},
317 AttachStdout: true,
318 AttachStderr: true,
319 Env: envs,
320 })
321 if err != nil {
322 return fmt.Errorf("creating exec: %w", err)
323 }
324
325 // start tailing logs in background
326 tailDone := make(chan error, 1)
327 go func() {
328 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
329 }()
330
331 select {
332 case <-tailDone:
333
334 case <-ctx.Done():
335 // cleanup will be handled by DestroyWorkflow, since
336 // Docker doesn't provide an API to kill an exec run
337 // (sure, we could grab the PID and kill it ourselves,
338 // but that's wasted effort)
339 e.l.Warn("step timed out", "step", step.Name())
340
341 <-tailDone
342
343 return engine.ErrTimedOut
344 }
345
346 select {
347 case <-ctx.Done():
348 return ctx.Err()
349 default:
350 }
351
352 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
353 if err != nil {
354 return err
355 }
356
357 if execInspectResp.ExitCode != 0 {
358 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
359 if err != nil {
360 return err
361 }
362
363 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
364
365 if inspectResp.State.OOMKilled {
366 return engineerrors.ErrOOMKilled
367 }
368 return engine.ErrWorkflowFailed
369 }
370
371 return nil
372}
373
374func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
375 if wfLogger == nil {
376 return nil
377 }
378
379 // This actually *starts* the command. Thanks, Docker!
380 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
381 if err != nil {
382 return err
383 }
384 defer logs.Close()
385
386 _, err = stdcopy.StdCopy(
387 wfLogger.DataWriter(stepIdx, "stdout"),
388 wfLogger.DataWriter(stepIdx, "stderr"),
389 logs.Reader,
390 )
391 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
392 return fmt.Errorf("failed to copy logs: %w", err)
393 }
394
395 return nil
396}
397
398func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
399 e.cleanupMu.Lock()
400 key := wid.String()
401
402 fns := e.cleanup[key]
403 delete(e.cleanup, key)
404 e.cleanupMu.Unlock()
405
406 for _, fn := range fns {
407 if err := fn(ctx); err != nil {
408 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
409 }
410 }
411 return nil
412}
413
414func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
415 e.cleanupMu.Lock()
416 defer e.cleanupMu.Unlock()
417
418 key := wid.String()
419 e.cleanup[key] = append(e.cleanup[key], fn)
420}
421
422func networkName(wid models.WorkflowId) string {
423 return fmt.Sprintf("workflow-network-%s", wid)
424}