A vibe coded tangled fork which supports pijul.
at sl/spindle-adapters 459 lines 13 kB view raw
1package nixery 2 3import ( 4 "context" 5 "fmt" 6 "io" 7 "log/slog" 8 "os" 9 "path" 10 "path/filepath" 11 "regexp" 12 "runtime" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/docker/docker/api/types/container" 18 "github.com/docker/docker/api/types/filters" 19 "github.com/docker/docker/api/types/image" 20 "github.com/docker/docker/api/types/mount" 21 "github.com/docker/docker/api/types/network" 22 "github.com/docker/docker/client" 23 "github.com/stretchr/testify/assert/yaml" 24 "tangled.org/core/api/tangled" 25 "tangled.org/core/sets" 26 "tangled.org/core/spindle/config" 27 "tangled.org/core/spindle/models" 28 "tangled.org/core/spindle/repomanager" 29 "tangled.org/core/tid" 30 "tangled.org/core/workflow" 31) 32 33const AdapterID = "nixery" 34 35type Adapter struct { 36 l *slog.Logger 37 repoManager *repomanager.RepoManager 38 docker client.APIClient 39 Timeout time.Duration 40 spindleDid syntax.DID 41 cfg config.NixeryPipelines 42 43 mu sync.RWMutex 44 activeRuns map[syntax.ATURI]models.WorkflowRun 45 subscribers sets.Set[chan<- models.WorkflowRun] 46} 47 48var _ models.Adapter = (*Adapter)(nil) 49 50func New(l *slog.Logger, cfg config.Config, repoManager *repomanager.RepoManager) (*Adapter, error) { 51 dc, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 52 if err != nil { 53 return nil, fmt.Errorf("creating docker client: %w", err) 54 } 55 return &Adapter{ 56 l: l, 57 repoManager: repoManager, 58 docker: dc, 59 Timeout: time.Minute * 5, // TODO: set timeout from config 60 spindleDid: cfg.Server.Did(), 61 cfg: cfg.NixeryPipelines, 62 63 activeRuns: make(map[syntax.ATURI]models.WorkflowRun), 64 subscribers: sets.New[chan<- models.WorkflowRun](), 65 }, nil 66} 67 68func (a *Adapter) Init() error { 69 // no-op 70 return nil 71} 72 73func (a *Adapter) Shutdown(ctx context.Context) error { 74 // TODO: cleanup spawned containers just in case 75 panic("unimplemented") 76} 77 78func (a *Adapter) SetupRepo(ctx context.Context, repo syntax.ATURI) error { 79 if err := a.repoManager.RegisterRepo(ctx, repo, []string{"/.tangled/workflows"}); err != nil { 80 return fmt.Errorf("syncing repo: %w", err) 81 } 82 return nil 83} 84 85func (a *Adapter) ListWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]models.WorkflowDef, error) { 86 defs, err := a.listWorkflowDefs(ctx, repo, rev) 87 if err != nil { 88 return nil, err 89 } 90 retDefs := make([]models.WorkflowDef, len(defs)) 91 for i, def := range defs { 92 retDefs[i] = def.AsInfo() 93 } 94 return retDefs, nil 95} 96 97func (a *Adapter) listWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]WorkflowDef, error) { 98 workflowDir, err := a.repoManager.FileTree(ctx, repo, rev, workflow.WorkflowDir) 99 if err != nil { 100 return nil, fmt.Errorf("loading file tree: %w", err) 101 } 102 103 if len(workflowDir) == 0 { 104 return nil, nil 105 } 106 107 // TODO(boltless): repoManager.FileTree() should be smart enough so we don't need to do this: 108 gr, err := a.repoManager.Open(repo, rev) 109 if err != nil { 110 return nil, fmt.Errorf("opening git repo: %w", err) 111 } 112 113 var defs []WorkflowDef 114 for _, e := range workflowDir { 115 if !e.IsFile() { 116 continue 117 } 118 119 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 120 contents, err := gr.RawContent(fpath) 121 if err != nil { 122 return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) 123 } 124 125 var wf WorkflowDef 126 if err := yaml.Unmarshal(contents, &wf); err != nil { 127 return nil, fmt.Errorf("parsing yaml: %w", err) 128 } 129 wf.Name = e.Name 130 131 defs = append(defs, wf) 132 } 133 134 return defs, nil 135} 136 137func (a *Adapter) EvaluateEvent(ctx context.Context, event models.Event) ([]models.WorkflowRun, error) { 138 defs, err := a.listWorkflowDefs(ctx, event.SourceRepo, event.SourceSha) 139 if err != nil { 140 return nil, fmt.Errorf("fetching workflow definitions: %w", err) 141 } 142 143 // filter out triggered workflows 144 var triggered []nixeryWorkflow 145 for _, def := range defs { 146 if def.ShouldRunOn(event) { 147 triggered = append(triggered, nixeryWorkflow{ 148 event: event, 149 def: def, 150 }) 151 } 152 } 153 154 // TODO: append more workflows from "on_workflow" event 155 156 // schedule workflows and return immediately 157 runs := make([]models.WorkflowRun, len(triggered)) 158 for i, workflow := range triggered { 159 runs[i] = a.scheduleWorkflow(ctx, workflow) 160 } 161 return runs, nil 162} 163 164// NOTE: nixery adapter is volatile. GetActiveWorkflowRun will return error 165// when the workflow is terminated. It lets spindle to mark lost workflow.run 166// as "Failed". 167func (a *Adapter) GetActiveWorkflowRun(ctx context.Context, runId syntax.ATURI) (models.WorkflowRun, error) { 168 a.mu.RLock() 169 run, exists := a.activeRuns[runId] 170 a.mu.RUnlock() 171 if !exists { 172 return run, fmt.Errorf("unknown or terminated workflow") 173 } 174 return run, nil 175} 176 177func (a *Adapter) ListActiveWorkflowRuns(ctx context.Context) ([]models.WorkflowRun, error) { 178 a.mu.RLock() 179 defer a.mu.RUnlock() 180 181 runs := make([]models.WorkflowRun, 0, len(a.activeRuns)) 182 for _, run := range a.activeRuns { 183 runs = append(runs, run) 184 } 185 return runs, nil 186} 187 188func (a *Adapter) SubscribeWorkflowRun(ctx context.Context) <-chan models.WorkflowRun { 189 ch := make(chan models.WorkflowRun, 1) 190 191 a.mu.Lock() 192 a.subscribers.Insert(ch) 193 a.mu.Unlock() 194 195 // cleanup spindle stops listening 196 go func() { 197 <-ctx.Done() 198 a.mu.Lock() 199 a.subscribers.Remove(ch) 200 a.mu.Unlock() 201 close(ch) 202 }() 203 204 return ch 205} 206 207func (a *Adapter) emit(run models.WorkflowRun) { 208 a.mu.Lock() 209 if run.Status.IsActive() { 210 a.activeRuns[run.AtUri()] = run 211 } else { 212 delete(a.activeRuns, run.AtUri()) 213 } 214 215 // Snapshot subscribers to broadcast outside the lock 216 subs := make([]chan<- models.WorkflowRun, 0, a.subscribers.Len()) 217 for ch := range a.subscribers.All() { 218 subs = append(subs, ch) 219 } 220 a.mu.Unlock() 221 222 for _, ch := range subs { 223 select { 224 case ch <- run: 225 default: 226 // avoid blocking if channel is full 227 // spindle will catch the state by regular GetWorkflowRun poll 228 } 229 } 230} 231 232func (a *Adapter) StreamWorkflowRunLogs(ctx context.Context, runId syntax.ATURI, handle func(line models.LogLine) error) error { 233 panic("unimplemented") 234} 235 236func (a *Adapter) CancelWorkflowRun(ctx context.Context, runId syntax.ATURI) error { 237 // remove network 238 if err := a.docker.NetworkRemove(ctx, networkName(runId)); err != nil { 239 return fmt.Errorf("removing network: %w", err) 240 } 241 242 // stop & remove docker containers with label 243 containers, err := a.docker.ContainerList(ctx, container.ListOptions{ 244 Filters: labelFilter(tangled.CiWorkflowRunNSID, runId.String()), 245 }) 246 if err != nil { 247 return fmt.Errorf("finding container with label: %w", err) 248 } 249 for _, c := range containers { 250 if err := a.docker.ContainerStop(ctx, c.ID, container.StopOptions{}); err != nil { 251 return fmt.Errorf("stopping container: %w", err) 252 } 253 254 if err := a.docker.ContainerRemove(ctx, c.ID, container.RemoveOptions{ 255 RemoveVolumes: true, 256 RemoveLinks: false, 257 Force: false, 258 }); err != nil { 259 return fmt.Errorf("removing container: %w", err) 260 } 261 } 262 return nil 263} 264 265func labelFilter(labelKey, labelVal string) filters.Args { 266 filterArgs := filters.NewArgs() 267 filterArgs.Add("label", fmt.Sprintf("%s=%s", labelKey, labelVal)) 268 return filterArgs 269} 270 271const ( 272 workspaceDir = "/tangled/workspace" 273 homeDir = "/tangled/home" 274) 275 276// scheduleWorkflow schedules a workflow run in job queue and return queued run 277func (a *Adapter) scheduleWorkflow(ctx context.Context, workflow nixeryWorkflow) models.WorkflowRun { 278 l := a.l 279 280 run := models.WorkflowRun{ 281 Did: a.spindleDid, 282 Rkey: syntax.RecordKey(tid.TID()), 283 AdapterId: AdapterID, 284 Name: workflow.def.Name, 285 Status: models.WorkflowStatusPending, 286 } 287 288 a.mu.Lock() 289 a.activeRuns[run.AtUri()] = run 290 a.mu.Unlock() 291 292 go func() { 293 defer a.CancelWorkflowRun(ctx, run.AtUri()) 294 295 containerId, err := a.initNixeryContainer(ctx, workflow.def, run.AtUri()) 296 if err != nil { 297 l.Error("failed to intialize container", "err", err) 298 // TODO: put user-facing logs in workflow log 299 a.emit(run.WithStatus(models.WorkflowStatusFailed)) 300 return 301 } 302 303 ctx, cancel := context.WithTimeout(ctx, a.Timeout) 304 defer cancel() 305 306 for stepIdx, step := range workflow.def.Steps { 307 if err := a.runStep(ctx, containerId, stepIdx, step); err != nil { 308 l.Error("failed to run step", "stepIdx", stepIdx, "err", err) 309 return 310 } 311 } 312 l.Info("all steps completed successfully") 313 }() 314 315 l.Info("workflow scheduled to background", "workflow.run", run.AtUri()) 316 317 return run 318} 319 320func (a *Adapter) runStep(ctx context.Context, containerId string, stepIdx int, step Step) error { 321 // TODO: implement this 322 323 // TODO: configure envs 324 var envs []string 325 326 select { 327 case <-ctx.Done(): 328 return ctx.Err() 329 default: 330 } 331 332 mkExecResp, err := a.docker.ContainerExecCreate(ctx, containerId, container.ExecOptions{ 333 Cmd: []string{"bash", "-c", step.Command}, 334 AttachStdout: true, 335 AttachStderr: true, 336 Env: envs, 337 }) 338 if err != nil { 339 return fmt.Errorf("creating exec: %w", err) 340 } 341 342 panic("unimplemented") 343} 344 345// initNixeryContainer pulls the image from nixery and start the container. 346func (a *Adapter) initNixeryContainer(ctx context.Context, def WorkflowDef, runAt syntax.ATURI) (string, error) { 347 imageName := workflowImageName(def.Dependencies, a.cfg.Nixery) 348 349 _, err := a.docker.NetworkCreate(ctx, networkName(runAt), network.CreateOptions{ 350 Driver: "bridge", 351 }) 352 if err != nil { 353 return "", fmt.Errorf("creating network: %w", err) 354 } 355 356 reader, err := a.docker.ImagePull(ctx, imageName, image.PullOptions{}) 357 if err != nil { 358 return "", fmt.Errorf("pulling image: %w", err) 359 } 360 defer reader.Close() 361 io.Copy(os.Stdout, reader) 362 363 resp, err := a.docker.ContainerCreate(ctx, &container.Config{ 364 Image: imageName, 365 Cmd: []string{"cat"}, 366 OpenStdin: true, // so cat stays alive :3 367 Tty: false, 368 Hostname: "spindle", 369 WorkingDir: workspaceDir, 370 Labels: map[string]string{ 371 tangled.CiWorkflowRunNSID: runAt.String(), 372 }, 373 // TODO(winter): investigate whether environment variables passed here 374 // get propagated to ContainerExec processes 375 }, &container.HostConfig{ 376 Mounts: []mount.Mount{ 377 { 378 Type: mount.TypeTmpfs, 379 Target: "/tmp", 380 ReadOnly: false, 381 TmpfsOptions: &mount.TmpfsOptions{ 382 Mode: 0o1777, // world-writeable sticky bit 383 Options: [][]string{ 384 {"exec"}, 385 }, 386 }, 387 }, 388 }, 389 ReadonlyRootfs: false, 390 CapDrop: []string{"ALL"}, 391 CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, 392 SecurityOpt: []string{"no-new-privileges"}, 393 ExtraHosts: []string{"host.docker.internal:host-gateway"}, 394 }, nil, nil, "") 395 if err != nil { 396 return "", fmt.Errorf("creating container: %w", err) 397 } 398 399 if err := a.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { 400 return "", fmt.Errorf("starting container: %w", err) 401 } 402 403 mkExecResp, err := a.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ 404 Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, 405 AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? 406 AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") 407 }) 408 if err != nil { 409 return "", err 410 } 411 412 // This actually *starts* the command. Thanks, Docker! 413 execResp, err := a.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) 414 if err != nil { 415 return "", err 416 } 417 defer execResp.Close() 418 419 // This is apparently best way to wait for the command to complete. 420 _, err = io.ReadAll(execResp.Reader) 421 if err != nil { 422 return "", err 423 } 424 425 execInspectResp, err := a.docker.ContainerExecInspect(ctx, mkExecResp.ID) 426 if err != nil { 427 return "", err 428 } 429 430 if execInspectResp.ExitCode != 0 { 431 return "", fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) 432 } else if execInspectResp.Running { 433 return "", fmt.Errorf("mkdir is somehow still running??") 434 } 435 436 return resp.ID, nil 437} 438 439func workflowImageName(deps map[string][]string, nixery string) string { 440 var dependencies string 441 for reg, ds := range deps { 442 if reg == "nixpkgs" { 443 dependencies = path.Join(ds...) 444 } 445 } 446 // NOTE: shouldn't base dependencies come first? 447 // like: nixery.tangled.sh/arm64/bash/git/coreutils/nix 448 dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 449 if runtime.GOARCH == "arm64" { 450 dependencies = path.Join("arm64", dependencies) 451 } 452 453 return path.Join(nixery, dependencies) 454} 455 456var re = regexp.MustCompile(`[^a-zA-Z0-9_.-]`) 457func networkName(runId syntax.ATURI) string { 458 return re.ReplaceAllString(runId.String()[5:], "-") 459}