A vibe coded tangled fork which supports pijul.
at de80ec95c4696ed8214e88b413b7ed917dd9e606 398 lines 9.8 kB view raw
1package docker 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "sync" 11 "time" 12 13 "github.com/docker/docker/api/types/container" 14 "github.com/docker/docker/api/types/image" 15 "github.com/docker/docker/api/types/mount" 16 "github.com/docker/docker/api/types/network" 17 "github.com/docker/docker/client" 18 "github.com/docker/docker/pkg/stdcopy" 19 "gopkg.in/yaml.v3" 20 "tangled.org/core/api/tangled" 21 "tangled.org/core/log" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/engine" 24 "tangled.org/core/spindle/engines/common" 25 engineerrors "tangled.org/core/spindle/engines/errors" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/secrets" 28) 29 30const ( 31 workspaceDir = "/tangled/workspace" 32 homeDir = "/tangled/home" 33 defaultImage = "ubuntu:22.04" 34) 35 36type cleanupFunc func(context.Context) error 37 38type Engine struct { 39 docker client.APIClient 40 l *slog.Logger 41 cfg *config.Config 42 43 cleanupMu sync.Mutex 44 cleanup map[string][]cleanupFunc 45} 46 47type Step struct { 48 name string 49 kind models.StepKind 50 command string 51 environment map[string]string 52} 53 54func (s Step) Name() string { 55 return s.name 56} 57 58func (s Step) Command() string { 59 return s.command 60} 61 62func (s Step) Kind() models.StepKind { 63 return s.kind 64} 65 66// setupSteps get added to start of Steps 67type setupSteps []models.Step 68 69// addStep adds a step to the beginning of the workflow's steps. 70func (ss *setupSteps) addStep(step models.Step) { 71 *ss = append(*ss, step) 72} 73 74type addlFields struct { 75 image string 76 container string 77} 78 79func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 80 swf := &models.Workflow{} 81 addl := addlFields{} 82 83 dwf := &struct { 84 Steps []struct { 85 Command string `yaml:"command"` 86 Name string `yaml:"name"` 87 Environment map[string]string `yaml:"environment"` 88 } `yaml:"steps"` 89 Environment map[string]string `yaml:"environment"` 90 Image string `yaml:"image"` 91 }{} 92 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 93 if err != nil { 94 return nil, err 95 } 96 97 for _, dstep := range dwf.Steps { 98 sstep := Step{} 99 sstep.environment = dstep.Environment 100 sstep.command = dstep.Command 101 sstep.name = dstep.Name 102 sstep.kind = models.StepKindUser 103 swf.Steps = append(swf.Steps, sstep) 104 } 105 swf.Name = twf.Name 106 swf.Environment = dwf.Environment 107 108 // Use specified image or default 109 if dwf.Image != "" { 110 addl.image = dwf.Image 111 } else { 112 addl.image = defaultImage 113 } 114 115 setup := &setupSteps{} 116 117 // Add clone step 118 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 119 120 // append setup steps in order to the start of workflow steps 121 swf.Steps = append(*setup, swf.Steps...) 122 swf.Data = addl 123 124 return swf, nil 125} 126 127func (e *Engine) WorkflowTimeout() time.Duration { 128 workflowTimeoutStr := e.cfg.DockerPipelines.WorkflowTimeout 129 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 130 if err != nil { 131 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 132 workflowTimeout = 5 * time.Minute 133 } 134 135 return workflowTimeout 136} 137 138func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 139 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 140 if err != nil { 141 return nil, err 142 } 143 144 l := log.FromContext(ctx).With("component", "spindle-docker") 145 146 e := &Engine{ 147 docker: dcli, 148 l: l, 149 cfg: cfg, 150 } 151 152 e.cleanup = make(map[string][]cleanupFunc) 153 154 return e, nil 155} 156 157func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 158 e.l.Info("setting up workflow", "workflow", wid) 159 160 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 161 Driver: "bridge", 162 }) 163 if err != nil { 164 return err 165 } 166 e.registerCleanup(wid, func(ctx context.Context) error { 167 return e.docker.NetworkRemove(ctx, networkName(wid)) 168 }) 169 170 addl := wf.Data.(addlFields) 171 172 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 173 if err != nil { 174 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 175 176 return fmt.Errorf("pulling image: %w", err) 177 } 178 defer reader.Close() 179 io.Copy(os.Stdout, reader) 180 181 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 182 Image: addl.image, 183 Cmd: []string{"sleep", "infinity"}, // Keep container running 184 OpenStdin: true, 185 Tty: false, 186 Hostname: "spindle", 187 WorkingDir: workspaceDir, 188 Labels: map[string]string{ 189 "sh.tangled.pipeline/workflow_id": wid.String(), 190 }, 191 }, &container.HostConfig{ 192 Mounts: []mount.Mount{ 193 { 194 Type: mount.TypeTmpfs, 195 Target: "/tmp", 196 ReadOnly: false, 197 TmpfsOptions: &mount.TmpfsOptions{ 198 Mode: 0o1777, // world-writeable sticky bit 199 Options: [][]string{ 200 {"exec"}, 201 }, 202 }, 203 }, 204 }, 205 ReadonlyRootfs: false, 206 CapDrop: []string{"ALL"}, 207 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, 208 SecurityOpt: []string{"no-new-privileges"}, 209 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 210 }, nil, nil, "") 211 if err != nil { 212 return fmt.Errorf("creating container: %w", err) 213 } 214 e.registerCleanup(wid, func(ctx context.Context) error { 215 err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 216 if err != nil { 217 return err 218 } 219 220 return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 221 RemoveVolumes: true, 222 RemoveLinks: false, 223 Force: false, 224 }) 225 }) 226 227 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 228 if err != nil { 229 return fmt.Errorf("starting container: %w", err) 230 } 231 232 // Create necessary directories 233 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 234 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 235 AttachStdout: true, 236 AttachStderr: true, 237 }) 238 if err != nil { 239 return err 240 } 241 242 // This actually *starts* the command. Thanks, Docker! 243 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 244 if err != nil { 245 return err 246 } 247 defer execResp.Close() 248 249 // This is apparently best way to wait for the command to complete. 250 _, err = io.ReadAll(execResp.Reader) 251 if err != nil { 252 return err 253 } 254 255 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 256 if err != nil { 257 return err 258 } 259 260 if execInspectResp.ExitCode != 0 { 261 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 262 } else if execInspectResp.Running { 263 return errors.New("mkdir is somehow still running??") 264 } 265 266 addl.container = resp.ID 267 wf.Data = addl 268 269 return nil 270} 271 272func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 273 addl := w.Data.(addlFields) 274 workflowEnvs := common.ConstructEnvs(w.Environment) 275 for _, s := range secrets { 276 workflowEnvs.AddEnv(s.Key, s.Value) 277 } 278 279 step := w.Steps[idx] 280 281 select { 282 case <-ctx.Done(): 283 return ctx.Err() 284 default: 285 } 286 287 envs := append(common.EnvVars(nil), workflowEnvs...) 288 if dockerStep, ok := step.(Step); ok { 289 for k, v := range dockerStep.environment { 290 envs.AddEnv(k, v) 291 } 292 } 293 envs.AddEnv("HOME", homeDir) 294 295 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 296 Cmd: []string{"sh", "-c", step.Command()}, 297 AttachStdout: true, 298 AttachStderr: true, 299 Env: envs, 300 }) 301 if err != nil { 302 return fmt.Errorf("creating exec: %w", err) 303 } 304 305 // start tailing logs in background 306 tailDone := make(chan error, 1) 307 go func() { 308 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 309 }() 310 311 select { 312 case <-tailDone: 313 314 case <-ctx.Done(): 315 e.l.Warn("step timed out", "step", step.Name()) 316 <-tailDone 317 return engine.ErrTimedOut 318 } 319 320 select { 321 case <-ctx.Done(): 322 return ctx.Err() 323 default: 324 } 325 326 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 327 if err != nil { 328 return err 329 } 330 331 if execInspectResp.ExitCode != 0 { 332 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 333 if err != nil { 334 return err 335 } 336 337 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 338 339 if inspectResp.State.OOMKilled { 340 return engineerrors.ErrOOMKilled 341 } 342 return engine.ErrWorkflowFailed 343 } 344 345 return nil 346} 347 348func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 349 if wfLogger == nil { 350 return nil 351 } 352 353 // This actually *starts* the command. Thanks, Docker! 354 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 355 if err != nil { 356 return err 357 } 358 defer logs.Close() 359 360 _, err = stdcopy.StdCopy( 361 wfLogger.DataWriter(stepIdx, "stdout"), 362 wfLogger.DataWriter(stepIdx, "stderr"), 363 logs.Reader, 364 ) 365 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 366 return fmt.Errorf("failed to copy logs: %w", err) 367 } 368 369 return nil 370} 371 372func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 373 e.cleanupMu.Lock() 374 key := wid.String() 375 376 fns := e.cleanup[key] 377 delete(e.cleanup, key) 378 e.cleanupMu.Unlock() 379 380 for _, fn := range fns { 381 if err := fn(ctx); err != nil { 382 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 383 } 384 } 385 return nil 386} 387 388func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 389 e.cleanupMu.Lock() 390 defer e.cleanupMu.Unlock() 391 392 key := wid.String() 393 e.cleanup[key] = append(e.cleanup[key], fn) 394} 395 396func networkName(wid models.WorkflowId) string { 397 return fmt.Sprintf("workflow-network-%s", wid) 398}