A vibe coded tangled fork which supports pijul.
at master 353 lines 9.7 kB view raw
1package knotserver 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10 "path/filepath" 11 "strings" 12 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/xrpc" 16 "github.com/bluesky-social/jetstream/pkg/models" 17 securejoin "github.com/cyphar/filepath-securejoin" 18 "tangled.org/core/api/tangled" 19 "tangled.org/core/knotserver/db" 20 "tangled.org/core/knotserver/git" 21 22 "tangled.org/core/log" 23 "tangled.org/core/rbac" 24 "tangled.org/core/workflow" 25) 26 27func (h *Knot) processPublicKey(ctx context.Context, event *models.Event) error { 28 l := log.FromContext(ctx) 29 raw := json.RawMessage(event.Commit.Record) 30 did := event.Did 31 32 var record tangled.PublicKey 33 if err := json.Unmarshal(raw, &record); err != nil { 34 return fmt.Errorf("failed to unmarshal record: %w", err) 35 } 36 37 pk := db.PublicKey{ 38 Did: did, 39 PublicKey: record, 40 } 41 if err := h.db.AddPublicKey(pk); err != nil { 42 l.Error("failed to add public key", "error", err) 43 return fmt.Errorf("failed to add public key: %w", err) 44 } 45 l.Info("added public key from firehose", "did", did) 46 return nil 47} 48 49func (h *Knot) processKnotMember(ctx context.Context, event *models.Event) error { 50 l := log.FromContext(ctx) 51 raw := json.RawMessage(event.Commit.Record) 52 did := event.Did 53 54 var record tangled.KnotMember 55 if err := json.Unmarshal(raw, &record); err != nil { 56 return fmt.Errorf("failed to unmarshal record: %w", err) 57 } 58 59 if record.Domain != h.c.Server.Hostname { 60 l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 61 return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 62 } 63 64 ok, err := h.e.E.Enforce(did, rbac.ThisServer, rbac.ThisServer, "server:invite") 65 if err != nil || !ok { 66 l.Error("failed to add member", "did", did) 67 return fmt.Errorf("failed to enforce permissions: %w", err) 68 } 69 70 if err := h.e.AddKnotMember(rbac.ThisServer, record.Subject); err != nil { 71 l.Error("failed to add member", "error", err) 72 return fmt.Errorf("failed to add member: %w", err) 73 } 74 l.Info("added member from firehose", "member", record.Subject) 75 76 if err := h.db.AddDid(record.Subject); err != nil { 77 l.Error("failed to add did", "error", err) 78 return fmt.Errorf("failed to add did: %w", err) 79 } 80 h.jc.AddDid(record.Subject) 81 82 if err := h.fetchAndAddKeys(ctx, record.Subject); err != nil { 83 return fmt.Errorf("failed to fetch and add keys: %w", err) 84 } 85 86 return nil 87} 88 89func (h *Knot) processPull(ctx context.Context, event *models.Event) error { 90 raw := json.RawMessage(event.Commit.Record) 91 did := event.Did 92 93 var record tangled.RepoPull 94 if err := json.Unmarshal(raw, &record); err != nil { 95 return fmt.Errorf("failed to unmarshal record: %w", err) 96 } 97 98 l := log.FromContext(ctx) 99 l = l.With("handler", "processPull") 100 l = l.With("did", did) 101 102 if record.Target == nil { 103 return fmt.Errorf("ignoring pull record: target repo is nil") 104 } 105 106 l = l.With("target_repo", record.Target.Repo) 107 l = l.With("target_branch", record.Target.Branch) 108 109 if record.Source == nil { 110 return fmt.Errorf("ignoring pull record: not a branch-based pull request") 111 } 112 113 if record.Source.Repo != nil { 114 return fmt.Errorf("ignoring pull record: fork based pull") 115 } 116 117 repoAt, err := syntax.ParseATURI(record.Target.Repo) 118 if err != nil { 119 return fmt.Errorf("failed to parse ATURI: %w", err) 120 } 121 122 // resolve this aturi to extract the repo record 123 ident, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 124 if err != nil || ident.Handle.IsInvalidHandle() { 125 return fmt.Errorf("failed to resolve handle: %w", err) 126 } 127 128 xrpcc := xrpc.Client{ 129 Host: ident.PDSEndpoint(), 130 } 131 132 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 133 if err != nil { 134 return fmt.Errorf("failed to resolver repo: %w", err) 135 } 136 137 repo := resp.Value.Val.(*tangled.Repo) 138 139 if repo.Knot != h.c.Server.Hostname { 140 return fmt.Errorf("rejected pull record: not this knot, %s != %s", repo.Knot, h.c.Server.Hostname) 141 } 142 143 didSlashRepo, err := securejoin.SecureJoin(ident.DID.String(), repo.Name) 144 if err != nil { 145 return fmt.Errorf("failed to construct relative repo path: %w", err) 146 } 147 148 repoPath, err := securejoin.SecureJoin(h.c.Repo.ScanPath, didSlashRepo) 149 if err != nil { 150 return fmt.Errorf("failed to construct absolute repo path: %w", err) 151 } 152 153 gr, err := git.Open(repoPath, record.Source.Sha) 154 if err != nil { 155 return fmt.Errorf("failed to open git repository: %w", err) 156 } 157 158 workflowDir, err := gr.FileTree(ctx, workflow.WorkflowDir) 159 if err != nil { 160 return fmt.Errorf("failed to open workflow directory: %w", err) 161 } 162 163 var pipeline workflow.RawPipeline 164 for _, e := range workflowDir { 165 if !e.IsFile() { 166 continue 167 } 168 169 fpath := filepath.Join(workflow.WorkflowDir, e.Name) 170 contents, err := gr.RawContent(fpath) 171 if err != nil { 172 continue 173 } 174 175 pipeline = append(pipeline, workflow.RawWorkflow{ 176 Name: e.Name, 177 Contents: contents, 178 }) 179 } 180 181 trigger := tangled.Pipeline_PullRequestTriggerData{ 182 Action: "create", 183 SourceBranch: record.Source.Branch, 184 SourceSha: record.Source.Sha, 185 TargetBranch: record.Target.Branch, 186 } 187 188 compiler := workflow.Compiler{ 189 Trigger: tangled.Pipeline_TriggerMetadata{ 190 Kind: string(workflow.TriggerKindPullRequest), 191 PullRequest: &trigger, 192 Repo: &tangled.Pipeline_TriggerRepo{ 193 Did: ident.DID.String(), 194 Knot: repo.Knot, 195 Repo: repo.Name, 196 }, 197 }, 198 } 199 200 cp := compiler.Compile(compiler.Parse(pipeline)) 201 eventJson, err := json.Marshal(cp) 202 if err != nil { 203 return fmt.Errorf("failed to marshal pipeline event: %w", err) 204 } 205 206 // do not run empty pipelines 207 if cp.Workflows == nil { 208 return nil 209 } 210 211 ev := db.Event{ 212 Rkey: TID(), 213 Nsid: tangled.PipelineNSID, 214 EventJson: string(eventJson), 215 } 216 217 return h.db.InsertEvent(ev, h.n) 218} 219 220// duplicated from add collaborator 221func (h *Knot) processCollaborator(ctx context.Context, event *models.Event) error { 222 raw := json.RawMessage(event.Commit.Record) 223 did := event.Did 224 225 var record tangled.RepoCollaborator 226 if err := json.Unmarshal(raw, &record); err != nil { 227 return fmt.Errorf("failed to unmarshal record: %w", err) 228 } 229 230 repoAt, err := syntax.ParseATURI(record.Repo) 231 if err != nil { 232 return err 233 } 234 235 subjectId, err := h.resolver.ResolveIdent(ctx, record.Subject) 236 if err != nil || subjectId.Handle.IsInvalidHandle() { 237 return err 238 } 239 240 // TODO: fix this for good, we need to fetch the record here unfortunately 241 // resolve this aturi to extract the repo record 242 owner, err := h.resolver.ResolveIdent(ctx, repoAt.Authority().String()) 243 if err != nil || owner.Handle.IsInvalidHandle() { 244 return fmt.Errorf("failed to resolve handle: %w", err) 245 } 246 247 xrpcc := xrpc.Client{ 248 Host: owner.PDSEndpoint(), 249 } 250 251 resp, err := comatproto.RepoGetRecord(ctx, &xrpcc, "", tangled.RepoNSID, repoAt.Authority().String(), repoAt.RecordKey().String()) 252 if err != nil { 253 return err 254 } 255 256 repo := resp.Value.Val.(*tangled.Repo) 257 didSlashRepo, _ := securejoin.SecureJoin(owner.DID.String(), repo.Name) 258 259 // check perms for this user 260 ok, err := h.e.IsCollaboratorInviteAllowed(did, rbac.ThisServer, didSlashRepo) 261 if err != nil { 262 return fmt.Errorf("failed to check permissions: %w", err) 263 } 264 if !ok { 265 return fmt.Errorf("insufficient permissions: %s, %s, %s", did, "IsCollaboratorInviteAllowed", didSlashRepo) 266 } 267 268 if err := h.db.AddDid(subjectId.DID.String()); err != nil { 269 return err 270 } 271 h.jc.AddDid(subjectId.DID.String()) 272 273 if err := h.e.AddCollaborator(subjectId.DID.String(), rbac.ThisServer, didSlashRepo); err != nil { 274 return err 275 } 276 277 return h.fetchAndAddKeys(ctx, subjectId.DID.String()) 278} 279 280func (h *Knot) fetchAndAddKeys(ctx context.Context, did string) error { 281 l := log.FromContext(ctx) 282 283 keysEndpoint, err := url.JoinPath(h.c.AppViewEndpoint, "keys", did) 284 if err != nil { 285 l.Error("error building endpoint url", "did", did, "error", err.Error()) 286 return fmt.Errorf("error building endpoint url: %w", err) 287 } 288 289 resp, err := http.Get(keysEndpoint) 290 if err != nil { 291 l.Error("error getting keys", "did", did, "error", err) 292 return fmt.Errorf("error getting keys: %w", err) 293 } 294 defer resp.Body.Close() 295 296 if resp.StatusCode == http.StatusNotFound { 297 l.Info("no keys found for did", "did", did) 298 return nil 299 } 300 301 plaintext, err := io.ReadAll(resp.Body) 302 if err != nil { 303 l.Error("error reading response body", "error", err) 304 return fmt.Errorf("error reading response body: %w", err) 305 } 306 307 for key := range strings.SplitSeq(string(plaintext), "\n") { 308 if key == "" { 309 continue 310 } 311 pk := db.PublicKey{ 312 Did: did, 313 } 314 pk.Key = key 315 if err := h.db.AddPublicKey(pk); err != nil { 316 l.Error("failed to add public key", "error", err) 317 return fmt.Errorf("failed to add public key: %w", err) 318 } 319 } 320 return nil 321} 322 323func (h *Knot) processMessages(ctx context.Context, event *models.Event) error { 324 if event.Kind != models.EventKindCommit { 325 return nil 326 } 327 328 var err error 329 defer func() { 330 eventTime := event.TimeUS 331 lastTimeUs := eventTime + 1 332 if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 333 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 334 } 335 }() 336 337 switch event.Commit.Collection { 338 case tangled.PublicKeyNSID: 339 err = h.processPublicKey(ctx, event) 340 case tangled.KnotMemberNSID: 341 err = h.processKnotMember(ctx, event) 342 case tangled.RepoPullNSID: 343 err = h.processPull(ctx, event) 344 case tangled.RepoCollaboratorNSID: 345 err = h.processCollaborator(ctx, event) 346 } 347 348 if err != nil { 349 h.l.Debug("failed to process event", "nsid", event.Commit.Collection, "err", err) 350 } 351 352 return nil 353}