A vibe coded tangled fork which supports pijul.
at icy/pwvyvo 424 lines 11 kB view raw
1package nixery 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "log/slog" 9 "os" 10 "path" 11 "runtime" 12 "sync" 13 "time" 14 15 "github.com/docker/docker/api/types/container" 16 "github.com/docker/docker/api/types/image" 17 "github.com/docker/docker/api/types/mount" 18 "github.com/docker/docker/api/types/network" 19 "github.com/docker/docker/client" 20 "github.com/docker/docker/pkg/stdcopy" 21 "gopkg.in/yaml.v3" 22 "tangled.org/core/api/tangled" 23 "tangled.org/core/log" 24 "tangled.org/core/spindle/config" 25 "tangled.org/core/spindle/engine" 26 "tangled.org/core/spindle/engines/common" 27 engineerrors "tangled.org/core/spindle/engines/errors" 28 "tangled.org/core/spindle/models" 29 "tangled.org/core/spindle/secrets" 30) 31 32const ( 33 workspaceDir = "/tangled/workspace" 34 homeDir = "/tangled/home" 35) 36 37type cleanupFunc func(context.Context) error 38 39type Engine struct { 40 docker client.APIClient 41 l *slog.Logger 42 cfg *config.Config 43 44 cleanupMu sync.Mutex 45 cleanup map[string][]cleanupFunc 46} 47 48type Step struct { 49 name string 50 kind models.StepKind 51 command string 52 environment map[string]string 53} 54 55func (s Step) Name() string { 56 return s.name 57} 58 59func (s Step) Command() string { 60 return s.command 61} 62 63func (s Step) Kind() models.StepKind { 64 return s.kind 65} 66 67// setupSteps get added to start of Steps 68type setupSteps []models.Step 69 70// addStep adds a step to the beginning of the workflow's steps. 71func (ss *setupSteps) addStep(step models.Step) { 72 *ss = append(*ss, step) 73} 74 75type addlFields struct { 76 image string 77 container string 78} 79 80func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 81 swf := &models.Workflow{} 82 addl := addlFields{} 83 84 dwf := &struct { 85 Steps []struct { 86 Command string `yaml:"command"` 87 Name string `yaml:"name"` 88 Environment map[string]string `yaml:"environment"` 89 } `yaml:"steps"` 90 Dependencies map[string][]string `yaml:"dependencies"` 91 Environment map[string]string `yaml:"environment"` 92 }{} 93 err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 94 if err != nil { 95 return nil, err 96 } 97 98 for _, dstep := range dwf.Steps { 99 sstep := Step{} 100 sstep.environment = dstep.Environment 101 sstep.command = dstep.Command 102 sstep.name = dstep.Name 103 sstep.kind = models.StepKindUser 104 swf.Steps = append(swf.Steps, sstep) 105 } 106 swf.Name = twf.Name 107 swf.Environment = dwf.Environment 108 addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 109 110 setup := &setupSteps{} 111 112 setup.addStep(nixConfStep()) 113 setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 114 // this step could be empty 115 if s := dependencyStep(dwf.Dependencies); s != nil { 116 setup.addStep(*s) 117 } 118 119 // append setup steps in order to the start of workflow steps 120 swf.Steps = append(*setup, swf.Steps...) 121 swf.Data = addl 122 123 return swf, nil 124} 125 126func (e *Engine) WorkflowTimeout() time.Duration { 127 workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout 128 workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 129 if err != nil { 130 e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 131 workflowTimeout = 5 * time.Minute 132 } 133 134 return workflowTimeout 135} 136 137func workflowImage(deps map[string][]string, nixery string) string { 138 var dependencies string 139 for reg, ds := range deps { 140 if reg == "nixpkgs" { 141 dependencies = path.Join(ds...) 142 } 143 } 144 145 // load defaults from somewhere else 146 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 147 148 if runtime.GOARCH == "arm64" { 149 dependencies = path.Join("arm64", dependencies) 150 } 151 152 return path.Join(nixery, dependencies) 153} 154 155func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 156 dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 157 if err != nil { 158 return nil, err 159 } 160 161 l := log.FromContext(ctx).With("component", "spindle") 162 163 e := &Engine{ 164 docker: dcli, 165 l: l, 166 cfg: cfg, 167 } 168 169 e.cleanup = make(map[string][]cleanupFunc) 170 171 return e, nil 172} 173 174func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 175 e.l.Info("setting up workflow", "workflow", wid) 176 177 _, err := e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 178 Driver: "bridge", 179 }) 180 if err != nil { 181 return err 182 } 183 e.registerCleanup(wid, func(ctx context.Context) error { 184 return e.docker.NetworkRemove(ctx, networkName(wid)) 185 }) 186 187 addl := wf.Data.(addlFields) 188 189 reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 190 if err != nil { 191 e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 192 193 return fmt.Errorf("pulling image: %w", err) 194 } 195 defer reader.Close() 196 io.Copy(os.Stdout, reader) 197 198 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 199 Image: addl.image, 200 Cmd: []string{"cat"}, 201 OpenStdin: true, // so cat stays alive :3 202 Tty: false, 203 Hostname: "spindle", 204 WorkingDir: workspaceDir, 205 Labels: map[string]string{ 206 "sh.tangled.pipeline/workflow_id": wid.String(), 207 }, 208 // TODO(winter): investigate whether environment variables passed here 209 // get propagated to ContainerExec processes 210 }, &container.HostConfig{ 211 Mounts: []mount.Mount{ 212 { 213 Type: mount.TypeTmpfs, 214 Target: "/tmp", 215 ReadOnly: false, 216 TmpfsOptions: &mount.TmpfsOptions{ 217 Mode: 0o1777, // world-writeable sticky bit 218 Options: [][]string{ 219 {"exec"}, 220 }, 221 }, 222 }, 223 }, 224 ReadonlyRootfs: false, 225 CapDrop: []string{"ALL"}, 226 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, 227 SecurityOpt: []string{"no-new-privileges"}, 228 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 229 }, nil, nil, "") 230 if err != nil { 231 return fmt.Errorf("creating container: %w", err) 232 } 233 e.registerCleanup(wid, func(ctx context.Context) error { 234 err = e.docker.ContainerStop(ctx, resp.ID, container.StopOptions{}) 235 if err != nil { 236 return err 237 } 238 239 return e.docker.ContainerRemove(ctx, resp.ID, container.RemoveOptions{ 240 RemoveVolumes: true, 241 RemoveLinks: false, 242 Force: false, 243 }) 244 }) 245 246 err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 247 if err != nil { 248 return fmt.Errorf("starting container: %w", err) 249 } 250 251 mkExecResp, err := e.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 252 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 253 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 254 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 255 }) 256 if err != nil { 257 return err 258 } 259 260 // This actually *starts* the command. Thanks, Docker! 261 execResp, err := e.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 262 if err != nil { 263 return err 264 } 265 defer execResp.Close() 266 267 // This is apparently best way to wait for the command to complete. 268 _, err = io.ReadAll(execResp.Reader) 269 if err != nil { 270 return err 271 } 272 273 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 274 if err != nil { 275 return err 276 } 277 278 if execInspectResp.ExitCode != 0 { 279 return fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 280 } else if execInspectResp.Running { 281 return errors.New("mkdir is somehow still running??") 282 } 283 284 addl.container = resp.ID 285 wf.Data = addl 286 287 return nil 288} 289 290func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 291 addl := w.Data.(addlFields) 292 workflowEnvs := common.ConstructEnvs(w.Environment) 293 // TODO(winter): should SetupWorkflow also have secret access? 294 // IMO yes, but probably worth thinking on. 295 for _, s := range secrets { 296 workflowEnvs.AddEnv(s.Key, s.Value) 297 } 298 299 step := w.Steps[idx] 300 301 select { 302 case <-ctx.Done(): 303 return ctx.Err() 304 default: 305 } 306 307 envs := append(common.EnvVars(nil), workflowEnvs...) 308 if nixStep, ok := step.(Step); ok { 309 for k, v := range nixStep.environment { 310 envs.AddEnv(k, v) 311 } 312 } 313 envs.AddEnv("HOME", homeDir) 314 315 mkExecResp, err := e.docker.ContainerExecCreate(ctx, addl.container, container.ExecOptions{ 316 Cmd: []string{"bash", "-c", step.Command()}, 317 AttachStdout: true, 318 AttachStderr: true, 319 Env: envs, 320 }) 321 if err != nil { 322 return fmt.Errorf("creating exec: %w", err) 323 } 324 325 // start tailing logs in background 326 tailDone := make(chan error, 1) 327 go func() { 328 tailDone <- e.tailStep(ctx, wfLogger, mkExecResp.ID, wid, idx, step) 329 }() 330 331 select { 332 case <-tailDone: 333 334 case <-ctx.Done(): 335 // cleanup will be handled by DestroyWorkflow, since 336 // Docker doesn't provide an API to kill an exec run 337 // (sure, we could grab the PID and kill it ourselves, 338 // but that's wasted effort) 339 e.l.Warn("step timed out", "step", step.Name()) 340 341 <-tailDone 342 343 return engine.ErrTimedOut 344 } 345 346 select { 347 case <-ctx.Done(): 348 return ctx.Err() 349 default: 350 } 351 352 execInspectResp, err := e.docker.ContainerExecInspect(ctx, mkExecResp.ID) 353 if err != nil { 354 return err 355 } 356 357 if execInspectResp.ExitCode != 0 { 358 inspectResp, err := e.docker.ContainerInspect(ctx, addl.container) 359 if err != nil { 360 return err 361 } 362 363 e.l.Error("workflow failed!", "workflow_id", wid.String(), "exit_code", execInspectResp.ExitCode, "oom_killed", inspectResp.State.OOMKilled) 364 365 if inspectResp.State.OOMKilled { 366 return engineerrors.ErrOOMKilled 367 } 368 return engine.ErrWorkflowFailed 369 } 370 371 return nil 372} 373 374func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, execID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 375 if wfLogger == nil { 376 return nil 377 } 378 379 // This actually *starts* the command. Thanks, Docker! 380 logs, err := e.docker.ContainerExecAttach(ctx, execID, container.ExecAttachOptions{}) 381 if err != nil { 382 return err 383 } 384 defer logs.Close() 385 386 _, err = stdcopy.StdCopy( 387 wfLogger.DataWriter(stepIdx, "stdout"), 388 wfLogger.DataWriter(stepIdx, "stderr"), 389 logs.Reader, 390 ) 391 if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 392 return fmt.Errorf("failed to copy logs: %w", err) 393 } 394 395 return nil 396} 397 398func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 399 e.cleanupMu.Lock() 400 key := wid.String() 401 402 fns := e.cleanup[key] 403 delete(e.cleanup, key) 404 e.cleanupMu.Unlock() 405 406 for _, fn := range fns { 407 if err := fn(ctx); err != nil { 408 e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 409 } 410 } 411 return nil 412} 413 414func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 415 e.cleanupMu.Lock() 416 defer e.cleanupMu.Unlock() 417 418 key := wid.String() 419 e.cleanup[key] = append(e.cleanup[key], fn) 420} 421 422func networkName(wid models.WorkflowId) string { 423 return fmt.Sprintf("workflow-network-%s", wid) 424}