A vibe coded tangled fork which supports pijul.
at sl/spindle-adapters 426 lines 11 kB view raw
1package nixery 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "runtime" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "gopkg.in/yaml.v3" 22 "tangled.org/core/api/tangled" 23 "tangled.org/core/log" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/models" 27 "tangled.org/core/spindle/secrets" 28) 29 30const ( 31 workspaceDir = "/tangled/workspace" 32 homeDir = "/tangled/home" 33) 34 35type cleanupFunc func(context.Context) error 36 37type Engine struct { 38 docker client.APIClient 39 l *slog.Logger 40 cfg *config.Config 41 42 cleanupMu sync.Mutex 43 cleanup map[string][]cleanupFunc 44} 45 46type Step struct { 47 name string 48 kind models.StepKind 49 command string 50 environment map[string]string 51} 52 53func (s Step) Name() string { 54 return s.name 55} 56 57func (s Step) Command() string { 58 return s.command 59} 60 61func (s Step) Kind() models.StepKind { 62 return s.kind 63} 64 65// setupSteps get added to start of Steps 66type setupSteps []models.Step 67 68// addStep adds a step to the beginning of the workflow's steps. 69func (ss *setupSteps) addStep(step models.Step) { 70 *ss = append(*ss, step) 71} 72 73type addlFields struct { 74 image string 75 container string 76} 77 78func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 79 swf := &models.Workflow{} 80 addl := addlFields{} 81 82 dwf := &struct { 83 Steps []struct { 84 Command string `yaml:"command"` 85 Name string `yaml:"name"` 86 Environment map[string]string `yaml:"environment"` 87 } `yaml:"steps"` 88 Dependencies map[string][]string `yaml:"dependencies"` 89 Environment map[string]string `yaml:"environment"` 90 }{} 91 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 92 if err != nil { 93 return nil, err 94 } 95 96 for _, dstep := range dwf.Steps { 97 sstep := Step{} 98 sstep.environment = dstep.Environment 99 sstep.command = dstep.Command 100 sstep.name = dstep.Name 101 sstep.kind = models.StepKindUser 102 swf.Steps = append(swf.Steps, sstep) 103 } 104 swf.Name = twf.Name 105 swf.Environment = dwf.Environment 106 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 107 108 setup := &setupSteps{} 109 110 setup.addStep(nixConfStep()) 111 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 112 // this step could be empty 113 if s := dependencyStep(dwf.Dependencies); s != nil { 114 setup.addStep(*s) 115 } 116 117 // append setup steps in order to the start of workflow steps 118 swf.Steps = append(*setup, swf.Steps...) 119 swf.Data = addl 120 121 return swf, nil 122} 123 124func (e *Engine) WorkflowTimeout() time.Duration { 125 return e.cfg.NixeryPipelines.WorkflowTimeout 126} 127 128func workflowImage(deps map[string][]string, nixery string) string { 129 var dependencies string 130 for reg, ds := range deps { 131 if reg == "nixpkgs" { 132 dependencies = path.Join(ds...) 133 } 134 } 135 136 // load defaults from somewhere else 137 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 138 139 if runtime.GOARCH == "arm64" { 140 dependencies = path.Join("arm64", dependencies) 141 } 142 143 return path.Join(nixery, dependencies) 144} 145 146func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 147 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 148 if err != nil { 149 return nil, err 150 } 151 152 l := log.FromContext(ctx).With("component", "spindle") 153 154 e := &Engine{ 155 docker: dcli, 156 l: l, 157 cfg: cfg, 158 } 159 160 e.cleanup = make(map[string][]cleanupFunc) 161 162 return e, nil 163} 164 165func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 166 e.l.Info("setting up workflow", "workflow", wid) 167 168 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 169 Driver: "bridge", 170 }) 171 if err != nil { 172 return err 173 } 174 e.registerCleanup(wid, func(ctx context.Context) error { 175 if err := e.docker.NetworkRemove(ctx, networkName(wid)); err != nil { 176 return fmt.Errorf("removing network: %w", err) 177 } 178 return nil 179 }) 180 181 addl := wf.Data.(addlFields) 182 183 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 184 if err != nil { 185 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 186 187 return fmt.Errorf("pulling image: %w", err) 188 } 189 defer reader.Close() 190 io.Copy(os.Stdout, reader) 191 192 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 193 Image: addl.image, 194 Cmd: []string{"cat"}, 195 OpenStdin: true, // so cat stays alive :3 196 Tty: false, 197 Hostname: "spindle", 198 WorkingDir: workspaceDir, 199 Labels: map[string]string{ 200 "sh.tangled.pipeline/workflow_id": wid.String(), 201 }, 202 // TODO(winter): investigate whether environment variables passed here 203 // get propagated to ContainerExec processes 204 }, &container.HostConfig{ 205 Mounts: []mount.Mount{ 206 { 207 Type: mount.TypeTmpfs, 208 Target: "/tmp", 209 ReadOnly: false, 210 TmpfsOptions: &mount.TmpfsOptions{ 211 Mode: 0o1777, // world-writeable sticky bit 212 Options: [][]string{ 213 {"exec"}, 214 }, 215 }, 216 }, 217 }, 218 ReadonlyRootfs: false, 219 CapDrop: []string{"ALL"}, 220 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, 221 SecurityOpt: []string{"no-new-privileges"}, 222 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 223 }, nil, nil, "") 224 if err != nil { 225 return fmt.Errorf("creating container: %w", err) 226 } 227 e.registerCleanup(wid, func(ctx context.Context) error { 228 if err := e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}); err != nil { 229 return fmt.Errorf("stopping container: %w", err) 230 } 231 232 err := e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 233 RemoveVolumes: true, 234 RemoveLinks: false, 235 Force: false, 236 }) 237 if err != nil { 238 return fmt.Errorf("removing container: %w", err) 239 } 240 return nil 241 }) 242 243 if err := e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { 244 return fmt.Errorf("starting container: %w", err) 245 } 246 247 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 248 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 249 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 250 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 251 }) 252 if err != nil { 253 return err 254 } 255 256 // This actually *starts* the command. Thanks, Docker! 257 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 258 if err != nil { 259 return err 260 } 261 defer execResp.Close() 262 263 // This is apparently best way to wait for the command to complete. 264 _, err = io.ReadAll(execResp.Reader) 265 if err != nil { 266 return err 267 } 268 269 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 270 if err != nil { 271 return err 272 } 273 274 if execInspectResp.ExitCode != 0 { 275 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 276 } else if execInspectResp.Running { 277 return errors.New("mkdir is somehow still running??") 278 } 279 280 addl.container = resp.ID 281 wf.Data = addl 282 283 return nil 284} 285 286func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 287 addl := w.Data.(addlFields) 288 workflowEnvs := ConstructEnvs(w.Environment) 289 // TODO(winter): should SetupWorkflow also have secret access? 290 // IMO yes, but probably worth thinking on. 291 for _, s := range secrets { 292 workflowEnvs.AddEnv(s.Key, s.Value) 293 } 294 295 step := w.Steps[idx] 296 297 select { 298 case <-ctx.Done(): 299 return ctx.Err() 300 default: 301 } 302 303 envs := append(EnvVars(nil), workflowEnvs...) 304 if nixStep, ok := step.(Step); ok { 305 for k, v := range nixStep.environment { 306 envs.AddEnv(k, v) 307 } 308 } 309 envs.AddEnv("HOME", homeDir) 310 311 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 312 Cmd: []string{"bash", "-c", step.Command()}, 313 AttachStdout: true, 314 AttachStderr: true, 315 Env: envs, 316 }) 317 if err != nil { 318 return fmt.Errorf("creating exec: %w", err) 319 } 320 321 // start tailing logs in background 322 tailDone := make(chan error, 1) 323 go func() { 324 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 325 }() 326 327 select { 328 case <-tailDone: 329 330 case <-ctx.Done(): 331 // cleanup will be handled by DestroyWorkflow, since 332 // Docker doesn't provide an API to kill an exec run 333 // (sure, we could grab the PID and kill it ourselves, 334 // but that's wasted effort) 335 e.l.Warn("step timed out", "step", step.Name()) 336 337 <-tailDone 338 339 return engine.ErrTimedOut 340 } 341 342 select { 343 case <-ctx.Done(): 344 return ctx.Err() 345 default: 346 } 347 348 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 349 if err != nil { 350 return err 351 } 352 353 if execInspectResp.ExitCode != 0 { 354 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 355 if err != nil { 356 return err 357 } 358 359 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 360 361 if inspectResp.State.OOMKilled { 362 return ErrOOMKilled 363 } 364 return engine.ErrWorkflowFailed 365 } 366 367 return nil 368} 369 370func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 371 if wfLogger == nil { 372 return nil 373 } 374 375 // This actually *starts* the command. Thanks, Docker! 376 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 377 if err != nil { 378 return err 379 } 380 defer logs.Close() 381 382 _, err = stdcopy.StdCopy( 383 wfLogger.DataWriter(stepIdx, "stdout"), 384 wfLogger.DataWriter(stepIdx, "stderr"), 385 logs.Reader, 386 ) 387 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 388 return fmt.Errorf("failed to copy logs: %w", err) 389 } 390 391 return nil 392} 393 394func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 395 fns := e.drainCleanups(wid) 396 397 for _, fn := range fns { 398 if err := fn(ctx); err != nil { 399 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 400 } 401 } 402 return nil 403} 404 405func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 406 e.cleanupMu.Lock() 407 defer e.cleanupMu.Unlock() 408 409 key := wid.String() 410 e.cleanup[key] = append(e.cleanup[key], fn) 411} 412 413func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc { 414 e.cleanupMu.Lock() 415 key := wid.String() 416 417 fns := e.cleanup[key] 418 delete(e.cleanup, key) 419 e.cleanupMu.Unlock() 420 421 return fns 422} 423 424func networkName(wid models.WorkflowId) string { 425 return fmt.Sprintf("workflow-network-%s", wid) 426}