A vibe coded tangled fork which supports pijul.
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "maps"
9 "slices"
10
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/go-git/go-git/v5/plumbing"
16 "github.com/ipfs/go-cid"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/serververify"
22 "tangled.org/core/idresolver"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25)
26
27type Ingester struct {
28 Db db.DbWrapper
29 Enforcer *rbac.Enforcer
30 IdResolver *idresolver.Resolver
31 Config *config.Config
32 Logger *slog.Logger
33}
34
35type processFunc func(ctx context.Context, e *jmodels.Event) error
36
37func (i *Ingester) Ingest() processFunc {
38 return func(ctx context.Context, e *jmodels.Event) error {
39 var err error
40 defer func() {
41 eventTime := e.TimeUS
42 lastTimeUs := eventTime + 1
43 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
44 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
45 }
46 }()
47
48 l := i.Logger.With("kind", e.Kind)
49 switch e.Kind {
50 case jmodels.EventKindAccount:
51 if !e.Account.Active && *e.Account.Status == "deactivated" {
52 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
53 }
54 case jmodels.EventKindIdentity:
55 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
56 case jmodels.EventKindCommit:
57 switch e.Commit.Collection {
58 case tangled.GraphFollowNSID:
59 err = i.ingestFollow(e)
60 case tangled.FeedStarNSID:
61 err = i.ingestStar(e)
62 case tangled.PublicKeyNSID:
63 err = i.ingestPublicKey(e)
64 case tangled.RepoArtifactNSID:
65 err = i.ingestArtifact(e)
66 case tangled.ActorProfileNSID:
67 err = i.ingestProfile(e)
68 case tangled.SpindleMemberNSID:
69 err = i.ingestSpindleMember(ctx, e)
70 case tangled.SpindleNSID:
71 err = i.ingestSpindle(ctx, e)
72 case tangled.KnotMemberNSID:
73 err = i.ingestKnotMember(e)
74 case tangled.KnotNSID:
75 err = i.ingestKnot(e)
76 case tangled.StringNSID:
77 err = i.ingestString(e)
78 case tangled.RepoIssueNSID:
79 err = i.ingestIssue(ctx, e)
80 case tangled.RepoIssueCommentNSID:
81 err = i.ingestIssueComment(e)
82 case tangled.LabelDefinitionNSID:
83 err = i.ingestLabelDefinition(e)
84 case tangled.LabelOpNSID:
85 err = i.ingestLabelOp(e)
86 }
87 l = i.Logger.With("nsid", e.Commit.Collection)
88 }
89
90 if err != nil {
91 l.Warn("refused to ingest record", "err", err)
92 }
93
94 return nil
95 }
96}
97
98func (i *Ingester) ingestStar(e *jmodels.Event) error {
99 var err error
100 did := e.Did
101
102 l := i.Logger.With("handler", "ingestStar")
103 l = l.With("nsid", e.Commit.Collection)
104
105 switch e.Commit.Operation {
106 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
107 var subjectUri syntax.ATURI
108
109 raw := json.RawMessage(e.Commit.Record)
110 record := tangled.FeedStar{}
111 err := json.Unmarshal(raw, &record)
112 if err != nil {
113 l.Error("invalid record", "err", err)
114 return err
115 }
116
117 subjectUri, err = syntax.ParseATURI(record.Subject)
118 if err != nil {
119 l.Error("invalid record", "err", err)
120 return err
121 }
122 err = db.UpsertStar(i.Db, models.Star{
123 Did: did,
124 RepoAt: subjectUri,
125 Rkey: e.Commit.RKey,
126 })
127 case jmodels.CommitOperationDelete:
128 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
129 }
130
131 if err != nil {
132 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
133 }
134 l.Info("processed star", "operation", e.Commit.Operation, "rkey", e.Commit.RKey)
135
136 return nil
137}
138
139func (i *Ingester) ingestFollow(e *jmodels.Event) error {
140 var err error
141 did := e.Did
142
143 l := i.Logger.With("handler", "ingestFollow")
144 l = l.With("nsid", e.Commit.Collection)
145
146 switch e.Commit.Operation {
147 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
148 raw := json.RawMessage(e.Commit.Record)
149 record := tangled.GraphFollow{}
150 err = json.Unmarshal(raw, &record)
151 if err != nil {
152 l.Error("invalid record", "err", err)
153 return err
154 }
155
156 err = db.UpsertFollow(i.Db, models.Follow{
157 UserDid: did,
158 SubjectDid: record.Subject,
159 Rkey: e.Commit.RKey,
160 })
161 case jmodels.CommitOperationDelete:
162 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
163 }
164
165 if err != nil {
166 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
167 }
168 l.Info("processed follow", "operation", e.Commit.Operation, "rkey", e.Commit.RKey)
169
170 return nil
171}
172
173func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
174 did := e.Did
175 var err error
176
177 l := i.Logger.With("handler", "ingestPublicKey")
178 l = l.With("nsid", e.Commit.Collection)
179
180 switch e.Commit.Operation {
181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
182 l.Debug("processing add of pubkey")
183 raw := json.RawMessage(e.Commit.Record)
184 record := tangled.PublicKey{}
185 err = json.Unmarshal(raw, &record)
186 if err != nil {
187 l.Error("invalid record", "err", err)
188 return err
189 }
190 pubKey, err := models.PublicKeyFromRecord(syntax.DID(did), syntax.RecordKey(e.Commit.RKey), record)
191 if err != nil {
192 l.Error("invalid record", "err", err)
193 return err
194 }
195 if err := pubKey.Validate(); err != nil {
196 l.Error("invalid record", "err", err)
197 return err
198 }
199
200 err = db.UpsertPublicKey(i.Db, pubKey)
201 case jmodels.CommitOperationDelete:
202 l.Debug("processing delete of pubkey")
203 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
204 }
205
206 if err != nil {
207 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
208 }
209 l.Info("processed pubkey", "operation", e.Commit.Operation, "rkey", e.Commit.RKey)
210
211 return nil
212}
213
214func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
215 did := e.Did
216 var err error
217
218 l := i.Logger.With("handler", "ingestArtifact")
219 l = l.With("nsid", e.Commit.Collection)
220
221 switch e.Commit.Operation {
222 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
223 raw := json.RawMessage(e.Commit.Record)
224 record := tangled.RepoArtifact{}
225 err = json.Unmarshal(raw, &record)
226 if err != nil {
227 l.Error("invalid record", "err", err)
228 return err
229 }
230
231 repoAt, err := syntax.ParseATURI(record.Repo)
232 if err != nil {
233 return err
234 }
235
236 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
237 if err != nil {
238 return err
239 }
240
241 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
242 if err != nil || !ok {
243 return err
244 }
245
246 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
247 if err != nil {
248 createdAt = time.Now()
249 }
250
251 artifact := models.Artifact{
252 Did: did,
253 Rkey: e.Commit.RKey,
254 RepoAt: repoAt,
255 Tag: plumbing.Hash(record.Tag),
256 CreatedAt: createdAt,
257 BlobCid: cid.Cid(record.Artifact.Ref),
258 Name: record.Name,
259 Size: uint64(record.Artifact.Size),
260 MimeType: record.Artifact.MimeType,
261 }
262
263 err = db.AddArtifact(i.Db, artifact)
264 case jmodels.CommitOperationDelete:
265 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
266 }
267
268 if err != nil {
269 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
270 }
271
272 return nil
273}
274
275func (i *Ingester) ingestProfile(e *jmodels.Event) error {
276 did := e.Did
277 var err error
278
279 l := i.Logger.With("handler", "ingestProfile")
280 l = l.With("nsid", e.Commit.Collection)
281
282 if e.Commit.RKey != "self" {
283 return fmt.Errorf("ingestProfile only ingests `self` record")
284 }
285
286 switch e.Commit.Operation {
287 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
288 raw := json.RawMessage(e.Commit.Record)
289 record := tangled.ActorProfile{}
290 err = json.Unmarshal(raw, &record)
291 if err != nil {
292 l.Error("invalid record", "err", err)
293 return err
294 }
295
296 avatar := ""
297 if record.Avatar != nil {
298 avatar = record.Avatar.Ref.String()
299 }
300
301 description := ""
302 if record.Description != nil {
303 description = *record.Description
304 }
305
306 includeBluesky := record.Bluesky
307
308 pronouns := ""
309 if record.Pronouns != nil {
310 pronouns = *record.Pronouns
311 }
312
313 location := ""
314 if record.Location != nil {
315 location = *record.Location
316 }
317
318 var links [5]string
319 for i, l := range record.Links {
320 if i < 5 {
321 links[i] = l
322 }
323 }
324
325 var stats [2]models.VanityStat
326 for i, s := range record.Stats {
327 if i < 2 {
328 stats[i].Kind = models.VanityStatKind(s)
329 }
330 }
331
332 var pinned [6]syntax.ATURI
333 for i, r := range record.PinnedRepositories {
334 if i < 6 {
335 pinned[i] = syntax.ATURI(r)
336 }
337 }
338
339 profile := models.Profile{
340 Did: did,
341 Avatar: avatar,
342 Description: description,
343 IncludeBluesky: includeBluesky,
344 Location: location,
345 Links: links,
346 Stats: stats,
347 PinnedRepos: pinned,
348 Pronouns: pronouns,
349 }
350
351 ddb, ok := i.Db.Execer.(*db.DB)
352 if !ok {
353 return fmt.Errorf("failed to index profile record, invalid db cast")
354 }
355
356 tx, err := ddb.Begin()
357 if err != nil {
358 return fmt.Errorf("failed to start transaction")
359 }
360 defer tx.Rollback()
361
362 err = db.ValidateProfile(tx, &profile)
363 if err != nil {
364 return fmt.Errorf("invalid profile record")
365 }
366
367 err = db.UpsertProfile(tx, &profile)
368 if err != nil {
369 return fmt.Errorf("upserting profile: %w", err)
370 }
371
372 err = tx.Commit()
373 case jmodels.CommitOperationDelete:
374 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
375 }
376
377 if err != nil {
378 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
379 }
380
381 return nil
382}
383
384func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
385 did := e.Did
386 var err error
387
388 l := i.Logger.With("handler", "ingestSpindleMember")
389 l = l.With("nsid", e.Commit.Collection)
390
391 switch e.Commit.Operation {
392 case jmodels.CommitOperationCreate:
393 raw := json.RawMessage(e.Commit.Record)
394 record := tangled.SpindleMember{}
395 err = json.Unmarshal(raw, &record)
396 if err != nil {
397 l.Error("invalid record", "err", err)
398 return err
399 }
400
401 // only spindle owner can invite to spindles
402 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
403 if err != nil || !ok {
404 return fmt.Errorf("failed to enforce permissions: %w", err)
405 }
406
407 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
408 if err != nil {
409 return err
410 }
411
412 if memberId.Handle.IsInvalidHandle() {
413 return err
414 }
415
416 ddb, ok := i.Db.Execer.(*db.DB)
417 if !ok {
418 return fmt.Errorf("failed to index profile record, invalid db cast")
419 }
420
421 err = db.AddSpindleMember(ddb, models.SpindleMember{
422 Did: syntax.DID(did),
423 Rkey: e.Commit.RKey,
424 Instance: record.Instance,
425 Subject: memberId.DID,
426 })
427 if !ok {
428 return fmt.Errorf("failed to add to db: %w", err)
429 }
430
431 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
432 if err != nil {
433 return fmt.Errorf("failed to update ACLs: %w", err)
434 }
435
436 l.Info("added spindle member")
437 case jmodels.CommitOperationDelete:
438 rkey := e.Commit.RKey
439
440 ddb, ok := i.Db.Execer.(*db.DB)
441 if !ok {
442 return fmt.Errorf("failed to index profile record, invalid db cast")
443 }
444
445 // get record from db first
446 members, err := db.GetSpindleMembers(
447 ddb,
448 orm.FilterEq("did", did),
449 orm.FilterEq("rkey", rkey),
450 )
451 if err != nil || len(members) != 1 {
452 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
453 }
454 member := members[0]
455
456 tx, err := ddb.Begin()
457 if err != nil {
458 return fmt.Errorf("failed to start txn: %w", err)
459 }
460
461 // remove record by rkey && update enforcer
462 if err = db.RemoveSpindleMember(
463 tx,
464 orm.FilterEq("did", did),
465 orm.FilterEq("rkey", rkey),
466 ); err != nil {
467 return fmt.Errorf("failed to remove from db: %w", err)
468 }
469
470 // update enforcer
471 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
472 if err != nil {
473 return fmt.Errorf("failed to update ACLs: %w", err)
474 }
475
476 if err = tx.Commit(); err != nil {
477 return fmt.Errorf("failed to commit txn: %w", err)
478 }
479
480 if err = i.Enforcer.E.SavePolicy(); err != nil {
481 return fmt.Errorf("failed to save ACLs: %w", err)
482 }
483
484 l.Info("removed spindle member")
485 }
486
487 return nil
488}
489
490func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
491 did := e.Did
492 var err error
493
494 l := i.Logger.With("handler", "ingestSpindle")
495 l = l.With("nsid", e.Commit.Collection)
496
497 switch e.Commit.Operation {
498 case jmodels.CommitOperationCreate:
499 raw := json.RawMessage(e.Commit.Record)
500 record := tangled.Spindle{}
501 err = json.Unmarshal(raw, &record)
502 if err != nil {
503 l.Error("invalid record", "err", err)
504 return err
505 }
506
507 instance := e.Commit.RKey
508
509 ddb, ok := i.Db.Execer.(*db.DB)
510 if !ok {
511 return fmt.Errorf("failed to index profile record, invalid db cast")
512 }
513
514 err := db.AddSpindle(ddb, models.Spindle{
515 Owner: syntax.DID(did),
516 Instance: instance,
517 })
518 if err != nil {
519 l.Error("failed to add spindle to db", "err", err, "instance", instance)
520 return err
521 }
522
523 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
524 if err != nil {
525 l.Error("failed to add spindle to db", "err", err, "instance", instance)
526 return err
527 }
528
529 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
530 if err != nil {
531 return fmt.Errorf("failed to mark verified: %w", err)
532 }
533
534 return nil
535
536 case jmodels.CommitOperationDelete:
537 instance := e.Commit.RKey
538
539 ddb, ok := i.Db.Execer.(*db.DB)
540 if !ok {
541 return fmt.Errorf("failed to index profile record, invalid db cast")
542 }
543
544 // get record from db first
545 spindles, err := db.GetSpindles(
546 ddb,
547 orm.FilterEq("owner", did),
548 orm.FilterEq("instance", instance),
549 )
550 if err != nil || len(spindles) != 1 {
551 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
552 }
553 spindle := spindles[0]
554
555 tx, err := ddb.Begin()
556 if err != nil {
557 return err
558 }
559 defer func() {
560 tx.Rollback()
561 i.Enforcer.E.LoadPolicy()
562 }()
563
564 // remove spindle members first
565 err = db.RemoveSpindleMember(
566 tx,
567 orm.FilterEq("owner", did),
568 orm.FilterEq("instance", instance),
569 )
570 if err != nil {
571 return err
572 }
573
574 err = db.DeleteSpindle(
575 tx,
576 orm.FilterEq("owner", did),
577 orm.FilterEq("instance", instance),
578 )
579 if err != nil {
580 return err
581 }
582
583 if spindle.Verified != nil {
584 err = i.Enforcer.RemoveSpindle(instance)
585 if err != nil {
586 return err
587 }
588 }
589
590 err = tx.Commit()
591 if err != nil {
592 return err
593 }
594
595 err = i.Enforcer.E.SavePolicy()
596 if err != nil {
597 return err
598 }
599 }
600
601 return nil
602}
603
604func (i *Ingester) ingestString(e *jmodels.Event) error {
605 did := e.Did
606 rkey := e.Commit.RKey
607
608 var err error
609
610 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
611 l.Info("ingesting record")
612
613 ddb, ok := i.Db.Execer.(*db.DB)
614 if !ok {
615 return fmt.Errorf("failed to index string record, invalid db cast")
616 }
617
618 switch e.Commit.Operation {
619 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
620 raw := json.RawMessage(e.Commit.Record)
621 record := tangled.String{}
622 err = json.Unmarshal(raw, &record)
623 if err != nil {
624 l.Error("invalid record", "err", err)
625 return err
626 }
627
628 string := models.StringFromRecord(did, rkey, record)
629
630 if err = string.Validate(); err != nil {
631 l.Error("invalid record", "err", err)
632 return err
633 }
634
635 if err = db.AddString(ddb, string); err != nil {
636 l.Error("failed to add string", "err", err)
637 return err
638 }
639
640 return nil
641
642 case jmodels.CommitOperationDelete:
643 if err := db.DeleteString(
644 ddb,
645 orm.FilterEq("did", did),
646 orm.FilterEq("rkey", rkey),
647 ); err != nil {
648 l.Error("failed to delete", "err", err)
649 return fmt.Errorf("failed to delete string record: %w", err)
650 }
651
652 return nil
653 }
654
655 return nil
656}
657
658func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
659 did := e.Did
660 var err error
661
662 l := i.Logger.With("handler", "ingestKnotMember")
663 l = l.With("nsid", e.Commit.Collection)
664
665 switch e.Commit.Operation {
666 case jmodels.CommitOperationCreate:
667 raw := json.RawMessage(e.Commit.Record)
668 record := tangled.KnotMember{}
669 err = json.Unmarshal(raw, &record)
670 if err != nil {
671 l.Error("invalid record", "err", err)
672 return err
673 }
674
675 // only knot owner can invite to knots
676 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
677 if err != nil || !ok {
678 return fmt.Errorf("failed to enforce permissions: %w", err)
679 }
680
681 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
682 if err != nil {
683 return err
684 }
685
686 if memberId.Handle.IsInvalidHandle() {
687 return err
688 }
689
690 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
691 if err != nil {
692 return fmt.Errorf("failed to update ACLs: %w", err)
693 }
694
695 l.Info("added knot member")
696 case jmodels.CommitOperationDelete:
697 // we don't store knot members in a table (like we do for spindle)
698 // and we can't remove this just yet. possibly fixed if we switch
699 // to either:
700 // 1. a knot_members table like with spindle and store the rkey
701 // 2. use the knot host as the rkey
702 //
703 // TODO: implement member deletion
704 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
705 }
706
707 return nil
708}
709
710func (i *Ingester) ingestKnot(e *jmodels.Event) error {
711 did := e.Did
712 var err error
713
714 l := i.Logger.With("handler", "ingestKnot")
715 l = l.With("nsid", e.Commit.Collection)
716
717 switch e.Commit.Operation {
718 case jmodels.CommitOperationCreate:
719 raw := json.RawMessage(e.Commit.Record)
720 record := tangled.Knot{}
721 err = json.Unmarshal(raw, &record)
722 if err != nil {
723 l.Error("invalid record", "err", err)
724 return err
725 }
726
727 domain := e.Commit.RKey
728
729 ddb, ok := i.Db.Execer.(*db.DB)
730 if !ok {
731 return fmt.Errorf("failed to index profile record, invalid db cast")
732 }
733
734 err := db.AddKnot(ddb, domain, did)
735 if err != nil {
736 l.Error("failed to add knot to db", "err", err, "domain", domain)
737 return err
738 }
739
740 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
741 if err != nil {
742 l.Error("failed to verify knot", "err", err, "domain", domain)
743 return err
744 }
745
746 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
747 if err != nil {
748 return fmt.Errorf("failed to mark verified: %w", err)
749 }
750
751 return nil
752
753 case jmodels.CommitOperationDelete:
754 domain := e.Commit.RKey
755
756 ddb, ok := i.Db.Execer.(*db.DB)
757 if !ok {
758 return fmt.Errorf("failed to index knot record, invalid db cast")
759 }
760
761 // get record from db first
762 registrations, err := db.GetRegistrations(
763 ddb,
764 orm.FilterEq("domain", domain),
765 orm.FilterEq("did", did),
766 )
767 if err != nil {
768 return fmt.Errorf("failed to get registration: %w", err)
769 }
770 if len(registrations) != 1 {
771 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
772 }
773 registration := registrations[0]
774
775 tx, err := ddb.Begin()
776 if err != nil {
777 return err
778 }
779 defer func() {
780 tx.Rollback()
781 i.Enforcer.E.LoadPolicy()
782 }()
783
784 err = db.DeleteKnot(
785 tx,
786 orm.FilterEq("did", did),
787 orm.FilterEq("domain", domain),
788 )
789 if err != nil {
790 return err
791 }
792
793 if registration.Registered != nil {
794 err = i.Enforcer.RemoveKnot(domain)
795 if err != nil {
796 return err
797 }
798 }
799
800 err = tx.Commit()
801 if err != nil {
802 return err
803 }
804
805 err = i.Enforcer.E.SavePolicy()
806 if err != nil {
807 return err
808 }
809 }
810
811 return nil
812}
813func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
814 did := e.Did
815 rkey := e.Commit.RKey
816
817 var err error
818
819 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
820 l.Info("ingesting record")
821
822 ddb, ok := i.Db.Execer.(*db.DB)
823 if !ok {
824 return fmt.Errorf("failed to index issue record, invalid db cast")
825 }
826
827 switch e.Commit.Operation {
828 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
829 raw := json.RawMessage(e.Commit.Record)
830 record := tangled.RepoIssue{}
831 err = json.Unmarshal(raw, &record)
832 if err != nil {
833 l.Error("invalid record", "err", err)
834 return err
835 }
836
837 issue := models.IssueFromRecord(did, rkey, record)
838
839 if err := issue.Validate(); err != nil {
840 return fmt.Errorf("failed to validate issue: %w", err)
841 }
842
843 tx, err := ddb.BeginTx(ctx, nil)
844 if err != nil {
845 l.Error("failed to begin transaction", "err", err)
846 return err
847 }
848 defer tx.Rollback()
849
850 err = db.PutIssue(tx, &issue)
851 if err != nil {
852 l.Error("failed to create issue", "err", err)
853 return err
854 }
855
856 err = tx.Commit()
857 if err != nil {
858 l.Error("failed to commit txn", "err", err)
859 return err
860 }
861
862 return nil
863
864 case jmodels.CommitOperationDelete:
865 tx, err := ddb.BeginTx(ctx, nil)
866 if err != nil {
867 l.Error("failed to begin transaction", "err", err)
868 return err
869 }
870 defer tx.Rollback()
871
872 if err := db.DeleteIssues(
873 tx,
874 did,
875 rkey,
876 ); err != nil {
877 l.Error("failed to delete", "err", err)
878 return fmt.Errorf("failed to delete issue record: %w", err)
879 }
880 if err := tx.Commit(); err != nil {
881 l.Error("failed to commit txn", "err", err)
882 return err
883 }
884
885 return nil
886 }
887
888 return nil
889}
890
891func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
892 did := e.Did
893 rkey := e.Commit.RKey
894
895 var err error
896
897 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
898 l.Info("ingesting record")
899
900 ddb, ok := i.Db.Execer.(*db.DB)
901 if !ok {
902 return fmt.Errorf("failed to index issue comment record, invalid db cast")
903 }
904
905 switch e.Commit.Operation {
906 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
907 raw := json.RawMessage(e.Commit.Record)
908 record := tangled.RepoIssueComment{}
909 err = json.Unmarshal(raw, &record)
910 if err != nil {
911 return fmt.Errorf("invalid record: %w", err)
912 }
913
914 comment, err := models.IssueCommentFromRecord(did, rkey, record)
915 if err != nil {
916 return fmt.Errorf("failed to parse comment from record: %w", err)
917 }
918
919 if err := comment.Validate(); err != nil {
920 return fmt.Errorf("failed to validate comment: %w", err)
921 }
922
923 tx, err := ddb.Begin()
924 if err != nil {
925 return fmt.Errorf("failed to start transaction: %w", err)
926 }
927 defer tx.Rollback()
928
929 _, err = db.AddIssueComment(tx, *comment)
930 if err != nil {
931 return fmt.Errorf("failed to create issue comment: %w", err)
932 }
933
934 return tx.Commit()
935
936 case jmodels.CommitOperationDelete:
937 if err := db.DeleteIssueComments(
938 ddb,
939 orm.FilterEq("did", did),
940 orm.FilterEq("rkey", rkey),
941 ); err != nil {
942 return fmt.Errorf("failed to delete issue comment record: %w", err)
943 }
944
945 return nil
946 }
947
948 return nil
949}
950
951func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
952 did := e.Did
953 rkey := e.Commit.RKey
954
955 var err error
956
957 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
958 l.Info("ingesting record")
959
960 ddb, ok := i.Db.Execer.(*db.DB)
961 if !ok {
962 return fmt.Errorf("failed to index label definition, invalid db cast")
963 }
964
965 switch e.Commit.Operation {
966 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
967 raw := json.RawMessage(e.Commit.Record)
968 record := tangled.LabelDefinition{}
969 err = json.Unmarshal(raw, &record)
970 if err != nil {
971 return fmt.Errorf("invalid record: %w", err)
972 }
973
974 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
975 if err != nil {
976 return fmt.Errorf("failed to parse labeldef from record: %w", err)
977 }
978
979 if err := def.Validate(); err != nil {
980 return fmt.Errorf("failed to validate labeldef: %w", err)
981 }
982
983 _, err = db.AddLabelDefinition(ddb, def)
984 if err != nil {
985 return fmt.Errorf("failed to create labeldef: %w", err)
986 }
987
988 return nil
989
990 case jmodels.CommitOperationDelete:
991 if err := db.DeleteLabelDefinition(
992 ddb,
993 orm.FilterEq("did", did),
994 orm.FilterEq("rkey", rkey),
995 ); err != nil {
996 return fmt.Errorf("failed to delete labeldef record: %w", err)
997 }
998
999 return nil
1000 }
1001
1002 return nil
1003}
1004
1005func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1006 did := e.Did
1007 rkey := e.Commit.RKey
1008
1009 var err error
1010
1011 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1012 l.Info("ingesting record")
1013
1014 ddb, ok := i.Db.Execer.(*db.DB)
1015 if !ok {
1016 return fmt.Errorf("failed to index label op, invalid db cast")
1017 }
1018
1019 switch e.Commit.Operation {
1020 case jmodels.CommitOperationCreate:
1021 raw := json.RawMessage(e.Commit.Record)
1022 record := tangled.LabelOp{}
1023 err = json.Unmarshal(raw, &record)
1024 if err != nil {
1025 return fmt.Errorf("invalid record: %w", err)
1026 }
1027
1028 subject := syntax.ATURI(record.Subject)
1029 collection := subject.Collection()
1030
1031 var repo *models.Repo
1032 switch collection {
1033 case tangled.RepoIssueNSID:
1034 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1035 if err != nil || len(i) != 1 {
1036 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1037 }
1038 repo = i[0].Repo
1039 default:
1040 return fmt.Errorf("unsupport label subject: %s", collection)
1041 }
1042
1043 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1044 if err != nil {
1045 return fmt.Errorf("failed to build label application ctx: %w", err)
1046 }
1047
1048 ops := models.LabelOpsFromRecord(did, rkey, record)
1049
1050 for _, o := range ops {
1051 def, ok := actx.Defs[o.OperandKey]
1052 if !ok {
1053 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1054 }
1055
1056 // validate permissions: only collaborators can apply labels currently
1057 //
1058 // TODO: introduce a repo:triage permission
1059 ok, err := i.Enforcer.IsPushAllowed(o.Did, repo.Knot, repo.DidSlashRepo())
1060 if err != nil {
1061 return fmt.Errorf("enforcing permission: %w", err)
1062 }
1063 if !ok {
1064 return fmt.Errorf("unauthorized label operation")
1065 }
1066
1067 if err := def.ValidateOperandValue(&o); err != nil {
1068 return fmt.Errorf("failed to validate labelop: %w", err)
1069 }
1070 }
1071
1072 tx, err := ddb.Begin()
1073 if err != nil {
1074 return err
1075 }
1076 defer tx.Rollback()
1077
1078 for _, o := range ops {
1079 _, err = db.AddLabelOp(tx, &o)
1080 if err != nil {
1081 return fmt.Errorf("failed to add labelop: %w", err)
1082 }
1083 }
1084
1085 if err = tx.Commit(); err != nil {
1086 return err
1087 }
1088 }
1089
1090 return nil
1091}