A vibe coded tangled fork which supports pijul.
1package docker
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "io"
8 "log/slog"
9 "os"
10 "sync"
11 "time"
12
13 "github.com/docker/docker/api/types/container"
14 "github.com/docker/docker/api/types/image"
15 "github.com/docker/docker/api/types/mount"
16 "github.com/docker/docker/api/types/network"
17 "github.com/docker/docker/client"
18 "github.com/docker/docker/pkg/stdcopy"
19 "gopkg.in/yaml.v3"
20 "tangled.org/core/api/tangled"
21 "tangled.org/core/log"
22 "tangled.org/core/spindle/config"
23 "tangled.org/core/spindle/engine"
24 "tangled.org/core/spindle/engines/common"
25 engineerrors "tangled.org/core/spindle/engines/errors"
26 "tangled.org/core/spindle/models"
27 "tangled.org/core/spindle/secrets"
28)
29
30const (
31 workspaceDir = "/tangled/workspace"
32 homeDir = "/tangled/home"
33 defaultImage = "ubuntu:22.04"
34)
35
36type cleanupFunc func(context.Context) error
37
38type Engine struct {
39 docker client.APIClient
40 l *slog.Logger
41 cfg *config.Config
42
43 cleanupMu sync.Mutex
44 cleanup map[string][]cleanupFunc
45}
46
47type Step struct {
48 name string
49 kind models.StepKind
50 command string
51 environment map[string]string
52}
53
54func (s Step) Name() string {
55 return s.name
56}
57
58func (s Step) Command() string {
59 return s.command
60}
61
62func (s Step) Kind() models.StepKind {
63 return s.kind
64}
65
66// setupSteps get added to start of Steps
67type setupSteps []models.Step
68
69// addStep adds a step to the beginning of the workflow's steps.
70func (ss *setupSteps) addStep(step models.Step) {
71 *ss = append(*ss, step)
72}
73
74type addlFields struct {
75 image string
76 container string
77}
78
79func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
80 swf := &models.Workflow{}
81 addl := addlFields{}
82
83 dwf := &struct {
84 Steps []struct {
85 Command string `yaml:"command"`
86 Name string `yaml:"name"`
87 Environment map[string]string `yaml:"environment"`
88 } `yaml:"steps"`
89 Environment map[string]string `yaml:"environment"`
90 Image string `yaml:"image"`
91 }{}
92 err := yaml.Unmarshal([]byte(twf.Raw), &dwf)
93 if err != nil {
94 return nil, err
95 }
96
97 for _, dstep := range dwf.Steps {
98 sstep := Step{}
99 sstep.environment = dstep.Environment
100 sstep.command = dstep.Command
101 sstep.name = dstep.Name
102 sstep.kind = models.StepKindUser
103 swf.Steps = append(swf.Steps, sstep)
104 }
105 swf.Name = twf.Name
106 swf.Environment = dwf.Environment
107
108 // Use specified image or default
109 if dwf.Image != "" {
110 addl.image = dwf.Image
111 } else {
112 addl.image = defaultImage
113 }
114
115 setup := &setupSteps{}
116
117 // Add clone step
118 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
119
120 // append setup steps in order to the start of workflow steps
121 swf.Steps = append(*setup, swf.Steps...)
122 swf.Data = addl
123
124 return swf, nil
125}
126
127func (e *Engine) WorkflowTimeout() time.Duration {
128 workflowTimeoutStr := e.cfg.DockerPipelines.WorkflowTimeout
129 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr)
130 if err != nil {
131 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr)
132 workflowTimeout = 5 * time.Minute
133 }
134
135 return workflowTimeout
136}
137
138func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
139 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
140 if err != nil {
141 return nil, err
142 }
143
144 l := log.FromContext(ctx).With("component", "spindle-docker")
145
146 e := &Engine{
147 docker: dcli,
148 l: l,
149 cfg: cfg,
150 }
151
152 e.cleanup = make(map[string][]cleanupFunc)
153
154 return e, nil
155}
156
157func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error {
158 e.l.Info("setting up workflow", "workflow", wid)
159
160 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{
161 Driver: "bridge",
162 })
163 if err != nil {
164 return err
165 }
166 e.registerCleanup(wid, func(ctx context.Context) error {
167 return e.docker.NetworkRemove(ctx, networkName(wid))
168 })
169
170 addl := wf.Data.(addlFields)
171
172 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{})
173 if err != nil {
174 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error())
175
176 return fmt.Errorf("pulling image: %w", err)
177 }
178 defer reader.Close()
179 io.Copy(os.Stdout, reader)
180
181 resp, err := e.docker.ContainerCreate(ctx, &container.Config{
182 Image: addl.image,
183 Cmd: []string{"sleep", "infinity"}, // Keep container running
184 OpenStdin: true,
185 Tty: false,
186 Hostname: "spindle",
187 WorkingDir: workspaceDir,
188 Labels: map[string]string{
189 "sh.tangled.pipeline/workflow_id": wid.String(),
190 },
191 }, &container.HostConfig{
192 Mounts: []mount.Mount{
193 {
194 Type: mount.TypeTmpfs,
195 Target: "/tmp",
196 ReadOnly: false,
197 TmpfsOptions: &mount.TmpfsOptions{
198 Mode: 0o1777, // world-writeable sticky bit
199 Options: [][]string{
200 {"exec"},
201 },
202 },
203 },
204 },
205 ReadonlyRootfs: false,
206 CapDrop: []string{"ALL"},
207 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"},
208 SecurityOpt: []string{"no-new-privileges"},
209 ExtraHosts: []string{"host.docker.internal:host-gateway"},
210 }, nil, nil, "")
211 if err != nil {
212 return fmt.Errorf("creating container: %w", err)
213 }
214 e.registerCleanup(wid, func(ctx context.Context) error {
215 err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{})
216 if err != nil {
217 return err
218 }
219
220 return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{
221 RemoveVolumes: true,
222 RemoveLinks: false,
223 Force: false,
224 })
225 })
226
227 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{})
228 if err != nil {
229 return fmt.Errorf("starting container: %w", err)
230 }
231
232 // Create necessary directories
233 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{
234 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir},
235 AttachStdout: true,
236 AttachStderr: true,
237 })
238 if err != nil {
239 return err
240 }
241
242 // This actually *starts* the command. Thanks, Docker!
243 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{})
244 if err != nil {
245 return err
246 }
247 defer execResp.Close()
248
249 // This is apparently best way to wait for the command to complete.
250 _, err = io.ReadAll(execResp.Reader)
251 if err != nil {
252 return err
253 }
254
255 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
256 if err != nil {
257 return err
258 }
259
260 if execInspectResp.ExitCode != 0 {
261 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode)
262 } else if execInspectResp.Running {
263 return errors.New("mkdir is somehow still running??")
264 }
265
266 addl.container = resp.ID
267 wf.Data = addl
268
269 return nil
270}
271
272func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error {
273 addl := w.Data.(addlFields)
274 workflowEnvs := common.ConstructEnvs(w.Environment)
275 for _, s := range secrets {
276 workflowEnvs.AddEnv(s.Key, s.Value)
277 }
278
279 step := w.Steps[idx]
280
281 select {
282 case <-ctx.Done():
283 return ctx.Err()
284 default:
285 }
286
287 envs := append(common.EnvVars(nil), workflowEnvs...)
288 if dockerStep, ok := step.(Step); ok {
289 for k, v := range dockerStep.environment {
290 envs.AddEnv(k, v)
291 }
292 }
293 envs.AddEnv("HOME", homeDir)
294
295 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{
296 Cmd: []string{"sh", "-c", step.Command()},
297 AttachStdout: true,
298 AttachStderr: true,
299 Env: envs,
300 })
301 if err != nil {
302 return fmt.Errorf("creating exec: %w", err)
303 }
304
305 // start tailing logs in background
306 tailDone := make(chan error, 1)
307 go func() {
308 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step)
309 }()
310
311 select {
312 case <-tailDone:
313
314 case <-ctx.Done():
315 e.l.Warn("step timed out", "step", step.Name())
316 <-tailDone
317 return engine.ErrTimedOut
318 }
319
320 select {
321 case <-ctx.Done():
322 return ctx.Err()
323 default:
324 }
325
326 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID)
327 if err != nil {
328 return err
329 }
330
331 if execInspectResp.ExitCode != 0 {
332 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container)
333 if err != nil {
334 return err
335 }
336
337 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled)
338
339 if inspectResp.State.OOMKilled {
340 return engineerrors.ErrOOMKilled
341 }
342 return engine.ErrWorkflowFailed
343 }
344
345 return nil
346}
347
348func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error {
349 if wfLogger == nil {
350 return nil
351 }
352
353 // This actually *starts* the command. Thanks, Docker!
354 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{})
355 if err != nil {
356 return err
357 }
358 defer logs.Close()
359
360 _, err = stdcopy.StdCopy(
361 wfLogger.DataWriter(stepIdx, "stdout"),
362 wfLogger.DataWriter(stepIdx, "stderr"),
363 logs.Reader,
364 )
365 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) {
366 return fmt.Errorf("failed to copy logs: %w", err)
367 }
368
369 return nil
370}
371
372func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
373 e.cleanupMu.Lock()
374 key := wid.String()
375
376 fns := e.cleanup[key]
377 delete(e.cleanup, key)
378 e.cleanupMu.Unlock()
379
380 for _, fn := range fns {
381 if err := fn(ctx); err != nil {
382 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
383 }
384 }
385 return nil
386}
387
388func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
389 e.cleanupMu.Lock()
390 defer e.cleanupMu.Unlock()
391
392 key := wid.String()
393 e.cleanup[key] = append(e.cleanup[key], fn)
394}
395
396func networkName(wid models.WorkflowId) string {
397 return fmt.Sprintf("workflow-network-%s", wid)
398}