package nixery import ( "context" "fmt" "io" "log/slog" "os" "path" "path/filepath" "regexp" "runtime" "sync" "time" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/network" "github.com/docker/docker/client" "github.com/stretchr/testify/assert/yaml" "tangled.org/core/api/tangled" "tangled.org/core/sets" "tangled.org/core/spindle/config" "tangled.org/core/spindle/models" "tangled.org/core/spindle/repomanager" "tangled.org/core/tid" "tangled.org/core/workflow" ) const AdapterID = "nixery" type Adapter struct { l *slog.Logger repoManager *repomanager.RepoManager docker client.APIClient Timeout time.Duration spindleDid syntax.DID cfg config.NixeryPipelines mu sync.RWMutex activeRuns map[syntax.ATURI]models.WorkflowRun subscribers sets.Set[chan<- models.WorkflowRun] } var _ models.Adapter = (*Adapter)(nil) func New(l *slog.Logger, cfg config.Config, repoManager *repomanager.RepoManager) (*Adapter, error) { dc, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) if err != nil { return nil, fmt.Errorf("creating docker client: %w", err) } return &Adapter{ l: l, repoManager: repoManager, docker: dc, Timeout: time.Minute * 5, // TODO: set timeout from config spindleDid: cfg.Server.Did(), cfg: cfg.NixeryPipelines, activeRuns: make(map[syntax.ATURI]models.WorkflowRun), subscribers: sets.New[chan<- models.WorkflowRun](), }, nil } func (a *Adapter) Init() error { // no-op return nil } func (a *Adapter) Shutdown(ctx context.Context) error { // TODO: cleanup spawned containers just in case panic("unimplemented") } func (a *Adapter) SetupRepo(ctx context.Context, repo syntax.ATURI) error { if err := a.repoManager.RegisterRepo(ctx, repo, []string{"/.tangled/workflows"}); err != nil { return fmt.Errorf("syncing repo: %w", err) } return nil } func (a *Adapter) ListWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]models.WorkflowDef, error) { defs, err := a.listWorkflowDefs(ctx, repo, rev) if err != nil { return nil, err } retDefs := make([]models.WorkflowDef, len(defs)) for i, def := range defs { retDefs[i] = def.AsInfo() } return retDefs, nil } func (a *Adapter) listWorkflowDefs(ctx context.Context, repo syntax.ATURI, rev string) ([]WorkflowDef, error) { workflowDir, err := a.repoManager.FileTree(ctx, repo, rev, workflow.WorkflowDir) if err != nil { return nil, fmt.Errorf("loading file tree: %w", err) } if len(workflowDir) == 0 { return nil, nil } // TODO(boltless): repoManager.FileTree() should be smart enough so we don't need to do this: gr, err := a.repoManager.Open(repo, rev) if err != nil { return nil, fmt.Errorf("opening git repo: %w", err) } var defs []WorkflowDef for _, e := range workflowDir { if !e.IsFile() { continue } fpath := filepath.Join(workflow.WorkflowDir, e.Name) contents, err := gr.RawContent(fpath) if err != nil { return nil, fmt.Errorf("reading raw content of '%s': %w", fpath, err) } var wf WorkflowDef if err := yaml.Unmarshal(contents, &wf); err != nil { return nil, fmt.Errorf("parsing yaml: %w", err) } wf.Name = e.Name defs = append(defs, wf) } return defs, nil } func (a *Adapter) EvaluateEvent(ctx context.Context, event models.Event) ([]models.WorkflowRun, error) { defs, err := a.listWorkflowDefs(ctx, event.SourceRepo, event.SourceSha) if err != nil { return nil, fmt.Errorf("fetching workflow definitions: %w", err) } // filter out triggered workflows var triggered []nixeryWorkflow for _, def := range defs { if def.ShouldRunOn(event) { triggered = append(triggered, nixeryWorkflow{ event: event, def: def, }) } } // TODO: append more workflows from "on_workflow" event // schedule workflows and return immediately runs := make([]models.WorkflowRun, len(triggered)) for i, workflow := range triggered { runs[i] = a.scheduleWorkflow(ctx, workflow) } return runs, nil } // NOTE: nixery adapter is volatile. GetActiveWorkflowRun will return error // when the workflow is terminated. It lets spindle to mark lost workflow.run // as "Failed". func (a *Adapter) GetActiveWorkflowRun(ctx context.Context, runId syntax.ATURI) (models.WorkflowRun, error) { a.mu.RLock() run, exists := a.activeRuns[runId] a.mu.RUnlock() if !exists { return run, fmt.Errorf("unknown or terminated workflow") } return run, nil } func (a *Adapter) ListActiveWorkflowRuns(ctx context.Context) ([]models.WorkflowRun, error) { a.mu.RLock() defer a.mu.RUnlock() runs := make([]models.WorkflowRun, 0, len(a.activeRuns)) for _, run := range a.activeRuns { runs = append(runs, run) } return runs, nil } func (a *Adapter) SubscribeWorkflowRun(ctx context.Context) <-chan models.WorkflowRun { ch := make(chan models.WorkflowRun, 1) a.mu.Lock() a.subscribers.Insert(ch) a.mu.Unlock() // cleanup spindle stops listening go func() { <-ctx.Done() a.mu.Lock() a.subscribers.Remove(ch) a.mu.Unlock() close(ch) }() return ch } func (a *Adapter) emit(run models.WorkflowRun) { a.mu.Lock() if run.Status.IsActive() { a.activeRuns[run.AtUri()] = run } else { delete(a.activeRuns, run.AtUri()) } // Snapshot subscribers to broadcast outside the lock subs := make([]chan<- models.WorkflowRun, 0, a.subscribers.Len()) for ch := range a.subscribers.All() { subs = append(subs, ch) } a.mu.Unlock() for _, ch := range subs { select { case ch <- run: default: // avoid blocking if channel is full // spindle will catch the state by regular GetWorkflowRun poll } } } func (a *Adapter) StreamWorkflowRunLogs(ctx context.Context, runId syntax.ATURI, handle func(line models.LogLine) error) error { panic("unimplemented") } func (a *Adapter) CancelWorkflowRun(ctx context.Context, runId syntax.ATURI) error { // remove network if err := a.docker.NetworkRemove(ctx, networkName(runId)); err != nil { return fmt.Errorf("removing network: %w", err) } // stop & remove docker containers with label containers, err := a.docker.ContainerList(ctx, container.ListOptions{ Filters: labelFilter(tangled.CiWorkflowRunNSID, runId.String()), }) if err != nil { return fmt.Errorf("finding container with label: %w", err) } for _, c := range containers { if err := a.docker.ContainerStop(ctx, c.ID, container.StopOptions{}); err != nil { return fmt.Errorf("stopping container: %w", err) } if err := a.docker.ContainerRemove(ctx, c.ID, container.RemoveOptions{ RemoveVolumes: true, RemoveLinks: false, Force: false, }); err != nil { return fmt.Errorf("removing container: %w", err) } } return nil } func labelFilter(labelKey, labelVal string) filters.Args { filterArgs := filters.NewArgs() filterArgs.Add("label", fmt.Sprintf("%s=%s", labelKey, labelVal)) return filterArgs } const ( workspaceDir = "/tangled/workspace" homeDir = "/tangled/home" ) // scheduleWorkflow schedules a workflow run in job queue and return queued run func (a *Adapter) scheduleWorkflow(ctx context.Context, workflow nixeryWorkflow) models.WorkflowRun { l := a.l run := models.WorkflowRun{ Did: a.spindleDid, Rkey: syntax.RecordKey(tid.TID()), AdapterId: AdapterID, Name: workflow.def.Name, Status: models.WorkflowStatusPending, } a.mu.Lock() a.activeRuns[run.AtUri()] = run a.mu.Unlock() go func() { defer a.CancelWorkflowRun(ctx, run.AtUri()) containerId, err := a.initNixeryContainer(ctx, workflow.def, run.AtUri()) if err != nil { l.Error("failed to intialize container", "err", err) // TODO: put user-facing logs in workflow log a.emit(run.WithStatus(models.WorkflowStatusFailed)) return } ctx, cancel := context.WithTimeout(ctx, a.Timeout) defer cancel() for stepIdx, step := range workflow.def.Steps { if err := a.runStep(ctx, containerId, stepIdx, step); err != nil { l.Error("failed to run step", "stepIdx", stepIdx, "err", err) return } } l.Info("all steps completed successfully") }() l.Info("workflow scheduled to background", "workflow.run", run.AtUri()) return run } func (a *Adapter) runStep(ctx context.Context, containerId string, stepIdx int, step Step) error { // TODO: implement this // TODO: configure envs var envs []string select { case <-ctx.Done(): return ctx.Err() default: } mkExecResp, err := a.docker.ContainerExecCreate(ctx, containerId, container.ExecOptions{ Cmd: []string{"bash", "-c", step.Command}, AttachStdout: true, AttachStderr: true, Env: envs, }) if err != nil { return fmt.Errorf("creating exec: %w", err) } panic("unimplemented") } // initNixeryContainer pulls the image from nixery and start the container. func (a *Adapter) initNixeryContainer(ctx context.Context, def WorkflowDef, runAt syntax.ATURI) (string, error) { imageName := workflowImageName(def.Dependencies, a.cfg.Nixery) _, err := a.docker.NetworkCreate(ctx, networkName(runAt), network.CreateOptions{ Driver: "bridge", }) if err != nil { return "", fmt.Errorf("creating network: %w", err) } reader, err := a.docker.ImagePull(ctx, imageName, image.PullOptions{}) if err != nil { return "", fmt.Errorf("pulling image: %w", err) } defer reader.Close() io.Copy(os.Stdout, reader) resp, err := a.docker.ContainerCreate(ctx, &container.Config{ Image: imageName, Cmd: []string{"cat"}, OpenStdin: true, // so cat stays alive :3 Tty: false, Hostname: "spindle", WorkingDir: workspaceDir, Labels: map[string]string{ tangled.CiWorkflowRunNSID: runAt.String(), }, // TODO(winter): investigate whether environment variables passed here // get propagated to ContainerExec processes }, &container.HostConfig{ Mounts: []mount.Mount{ { Type: mount.TypeTmpfs, Target: "/tmp", ReadOnly: false, TmpfsOptions: &mount.TmpfsOptions{ Mode: 0o1777, // world-writeable sticky bit Options: [][]string{ {"exec"}, }, }, }, }, ReadonlyRootfs: false, CapDrop: []string{"ALL"}, CapAdd: []string{"CAP_DAC_OVERRIDE", "CAP_CHOWN", "CAP_FOWNER", "CAP_SETUID", "CAP_SETGID"}, SecurityOpt: []string{"no-new-privileges"}, ExtraHosts: []string{"host.docker.internal:host-gateway"}, }, nil, nil, "") if err != nil { return "", fmt.Errorf("creating container: %w", err) } if err := a.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { return "", fmt.Errorf("starting container: %w", err) } mkExecResp, err := a.docker.ContainerExecCreate(ctx, resp.ID, container.ExecOptions{ Cmd: []string{"mkdir", "-p", workspaceDir, homeDir}, AttachStdout: true, // NOTE(winter): pretty sure this will make it so that when stdout read is done below, mkdir is done. maybe?? AttachStderr: true, // for good measure, backed up by docker/cli ("If -d is not set, attach to everything by default") }) if err != nil { return "", err } // This actually *starts* the command. Thanks, Docker! execResp, err := a.docker.ContainerExecAttach(ctx, mkExecResp.ID, container.ExecAttachOptions{}) if err != nil { return "", err } defer execResp.Close() // This is apparently best way to wait for the command to complete. _, err = io.ReadAll(execResp.Reader) if err != nil { return "", err } execInspectResp, err := a.docker.ContainerExecInspect(ctx, mkExecResp.ID) if err != nil { return "", err } if execInspectResp.ExitCode != 0 { return "", fmt.Errorf("mkdir exited with exit code %d", execInspectResp.ExitCode) } else if execInspectResp.Running { return "", fmt.Errorf("mkdir is somehow still running??") } return resp.ID, nil } func workflowImageName(deps map[string][]string, nixery string) string { var dependencies string for reg, ds := range deps { if reg == "nixpkgs" { dependencies = path.Join(ds...) } } // NOTE: shouldn't base dependencies come first? // like: nixery.tangled.sh/arm64/bash/git/coreutils/nix dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") if runtime.GOARCH == "arm64" { dependencies = path.Join("arm64", dependencies) } return path.Join(nixery, dependencies) } var re = regexp.MustCompile(`[^a-zA-Z0-9_.-]`) func networkName(runId syntax.ATURI) string { return re.ReplaceAllString(runId.String()[5:], "-") }