A vibe coded tangled fork which supports pijul.
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "maps"
9 "slices"
10
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/go-git/go-git/v5/plumbing"
16 "github.com/ipfs/go-cid"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/serververify"
22 "tangled.org/core/idresolver"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25)
26
27type Ingester struct {
28 Db db.DbWrapper
29 Enforcer *rbac.Enforcer
30 IdResolver *idresolver.Resolver
31 Config *config.Config
32 Logger *slog.Logger
33}
34
35type processFunc func(ctx context.Context, e *jmodels.Event) error
36
37func (i *Ingester) Ingest() processFunc {
38 return func(ctx context.Context, e *jmodels.Event) error {
39 var err error
40 defer func() {
41 eventTime := e.TimeUS
42 lastTimeUs := eventTime + 1
43 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
44 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
45 }
46 }()
47
48 l := i.Logger.With("kind", e.Kind)
49 switch e.Kind {
50 case jmodels.EventKindAccount:
51 if !e.Account.Active && *e.Account.Status == "deactivated" {
52 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
53 }
54 case jmodels.EventKindIdentity:
55 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
56 case jmodels.EventKindCommit:
57 switch e.Commit.Collection {
58 case tangled.GraphFollowNSID:
59 err = i.ingestFollow(e)
60 case tangled.FeedStarNSID:
61 err = i.ingestStar(e)
62 case tangled.PublicKeyNSID:
63 err = i.ingestPublicKey(e)
64 case tangled.RepoArtifactNSID:
65 err = i.ingestArtifact(e)
66 case tangled.ActorProfileNSID:
67 err = i.ingestProfile(e)
68 case tangled.SpindleMemberNSID:
69 err = i.ingestSpindleMember(ctx, e)
70 case tangled.SpindleNSID:
71 err = i.ingestSpindle(ctx, e)
72 case tangled.KnotMemberNSID:
73 err = i.ingestKnotMember(e)
74 case tangled.KnotNSID:
75 err = i.ingestKnot(e)
76 case tangled.StringNSID:
77 err = i.ingestString(e)
78 case tangled.RepoIssueNSID:
79 err = i.ingestIssue(ctx, e)
80 case tangled.RepoIssueCommentNSID:
81 err = i.ingestIssueComment(e)
82 case tangled.LabelDefinitionNSID:
83 err = i.ingestLabelDefinition(e)
84 case tangled.LabelOpNSID:
85 err = i.ingestLabelOp(e)
86 }
87 l = i.Logger.With("nsid", e.Commit.Collection)
88 }
89
90 if err != nil {
91 l.Warn("refused to ingest record", "err", err)
92 }
93
94 return nil
95 }
96}
97
98func (i *Ingester) ingestStar(e *jmodels.Event) error {
99 var err error
100 did := e.Did
101
102 l := i.Logger.With("handler", "ingestStar")
103 l = l.With("nsid", e.Commit.Collection)
104
105 switch e.Commit.Operation {
106 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
107 var subjectUri syntax.ATURI
108
109 raw := json.RawMessage(e.Commit.Record)
110 record := tangled.FeedStar{}
111 err := json.Unmarshal(raw, &record)
112 if err != nil {
113 l.Error("invalid record", "err", err)
114 return err
115 }
116
117 subjectUri, err = syntax.ParseATURI(record.Subject)
118 if err != nil {
119 l.Error("invalid record", "err", err)
120 return err
121 }
122 err = db.AddStar(i.Db, &models.Star{
123 Did: did,
124 RepoAt: subjectUri,
125 Rkey: e.Commit.RKey,
126 })
127 case jmodels.CommitOperationDelete:
128 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
129 }
130
131 if err != nil {
132 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
133 }
134
135 return nil
136}
137
138func (i *Ingester) ingestFollow(e *jmodels.Event) error {
139 var err error
140 did := e.Did
141
142 l := i.Logger.With("handler", "ingestFollow")
143 l = l.With("nsid", e.Commit.Collection)
144
145 switch e.Commit.Operation {
146 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
147 raw := json.RawMessage(e.Commit.Record)
148 record := tangled.GraphFollow{}
149 err = json.Unmarshal(raw, &record)
150 if err != nil {
151 l.Error("invalid record", "err", err)
152 return err
153 }
154
155 err = db.AddFollow(i.Db, &models.Follow{
156 UserDid: did,
157 SubjectDid: record.Subject,
158 Rkey: e.Commit.RKey,
159 })
160 case jmodels.CommitOperationDelete:
161 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
162 }
163
164 if err != nil {
165 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
166 }
167
168 return nil
169}
170
171func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
172 did := e.Did
173 var err error
174
175 l := i.Logger.With("handler", "ingestPublicKey")
176 l = l.With("nsid", e.Commit.Collection)
177
178 switch e.Commit.Operation {
179 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
180 l.Debug("processing add of pubkey")
181 raw := json.RawMessage(e.Commit.Record)
182 record := tangled.PublicKey{}
183 err = json.Unmarshal(raw, &record)
184 if err != nil {
185 l.Error("invalid record", "err", err)
186 return err
187 }
188
189 name := record.Name
190 key := record.Key
191 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
192 case jmodels.CommitOperationDelete:
193 l.Debug("processing delete of pubkey")
194 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
195 }
196
197 if err != nil {
198 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
199 }
200
201 return nil
202}
203
204func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
205 did := e.Did
206 var err error
207
208 l := i.Logger.With("handler", "ingestArtifact")
209 l = l.With("nsid", e.Commit.Collection)
210
211 switch e.Commit.Operation {
212 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
213 raw := json.RawMessage(e.Commit.Record)
214 record := tangled.RepoArtifact{}
215 err = json.Unmarshal(raw, &record)
216 if err != nil {
217 l.Error("invalid record", "err", err)
218 return err
219 }
220
221 repoAt, err := syntax.ParseATURI(record.Repo)
222 if err != nil {
223 return err
224 }
225
226 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
227 if err != nil {
228 return err
229 }
230
231 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
232 if err != nil || !ok {
233 return err
234 }
235
236 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
237 if err != nil {
238 createdAt = time.Now()
239 }
240
241 artifact := models.Artifact{
242 Did: did,
243 Rkey: e.Commit.RKey,
244 RepoAt: repoAt,
245 Tag: plumbing.Hash(record.Tag),
246 CreatedAt: createdAt,
247 BlobCid: cid.Cid(record.Artifact.Ref),
248 Name: record.Name,
249 Size: uint64(record.Artifact.Size),
250 MimeType: record.Artifact.MimeType,
251 }
252
253 err = db.AddArtifact(i.Db, artifact)
254 case jmodels.CommitOperationDelete:
255 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
256 }
257
258 if err != nil {
259 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
260 }
261
262 return nil
263}
264
265func (i *Ingester) ingestProfile(e *jmodels.Event) error {
266 did := e.Did
267 var err error
268
269 l := i.Logger.With("handler", "ingestProfile")
270 l = l.With("nsid", e.Commit.Collection)
271
272 if e.Commit.RKey != "self" {
273 return fmt.Errorf("ingestProfile only ingests `self` record")
274 }
275
276 switch e.Commit.Operation {
277 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
278 raw := json.RawMessage(e.Commit.Record)
279 record := tangled.ActorProfile{}
280 err = json.Unmarshal(raw, &record)
281 if err != nil {
282 l.Error("invalid record", "err", err)
283 return err
284 }
285
286 avatar := ""
287 if record.Avatar != nil {
288 avatar = record.Avatar.Ref.String()
289 }
290
291 description := ""
292 if record.Description != nil {
293 description = *record.Description
294 }
295
296 includeBluesky := record.Bluesky
297
298 pronouns := ""
299 if record.Pronouns != nil {
300 pronouns = *record.Pronouns
301 }
302
303 location := ""
304 if record.Location != nil {
305 location = *record.Location
306 }
307
308 var links [5]string
309 for i, l := range record.Links {
310 if i < 5 {
311 links[i] = l
312 }
313 }
314
315 var stats [2]models.VanityStat
316 for i, s := range record.Stats {
317 if i < 2 {
318 stats[i].Kind = models.ParseVanityStatKind(s)
319 }
320 }
321
322 var pinned [6]syntax.ATURI
323 for i, r := range record.PinnedRepositories {
324 if i < 6 {
325 pinned[i] = syntax.ATURI(r)
326 }
327 }
328
329 profile := models.Profile{
330 Did: did,
331 Avatar: avatar,
332 Description: description,
333 IncludeBluesky: includeBluesky,
334 Location: location,
335 Links: links,
336 Stats: stats,
337 PinnedRepos: pinned,
338 Pronouns: pronouns,
339 }
340
341 ddb, ok := i.Db.Execer.(*db.DB)
342 if !ok {
343 return fmt.Errorf("failed to index profile record, invalid db cast")
344 }
345
346 tx, err := ddb.Begin()
347 if err != nil {
348 return fmt.Errorf("failed to start transaction")
349 }
350
351 err = db.ValidateProfile(tx, &profile)
352 if err != nil {
353 return fmt.Errorf("invalid profile record")
354 }
355
356 err = db.UpsertProfile(tx, &profile)
357 case jmodels.CommitOperationDelete:
358 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
359 }
360
361 if err != nil {
362 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
363 }
364
365 return nil
366}
367
368func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
369 did := e.Did
370 var err error
371
372 l := i.Logger.With("handler", "ingestSpindleMember")
373 l = l.With("nsid", e.Commit.Collection)
374
375 switch e.Commit.Operation {
376 case jmodels.CommitOperationCreate:
377 raw := json.RawMessage(e.Commit.Record)
378 record := tangled.SpindleMember{}
379 err = json.Unmarshal(raw, &record)
380 if err != nil {
381 l.Error("invalid record", "err", err)
382 return err
383 }
384
385 // only spindle owner can invite to spindles
386 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
387 if err != nil || !ok {
388 return fmt.Errorf("failed to enforce permissions: %w", err)
389 }
390
391 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
392 if err != nil {
393 return err
394 }
395
396 if memberId.Handle.IsInvalidHandle() {
397 return err
398 }
399
400 ddb, ok := i.Db.Execer.(*db.DB)
401 if !ok {
402 return fmt.Errorf("invalid db cast")
403 }
404
405 err = db.AddSpindleMember(ddb, models.SpindleMember{
406 Did: syntax.DID(did),
407 Rkey: e.Commit.RKey,
408 Instance: record.Instance,
409 Subject: memberId.DID,
410 })
411 if !ok {
412 return fmt.Errorf("failed to add to db: %w", err)
413 }
414
415 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
416 if err != nil {
417 return fmt.Errorf("failed to update ACLs: %w", err)
418 }
419
420 l.Info("added spindle member")
421 case jmodels.CommitOperationDelete:
422 rkey := e.Commit.RKey
423
424 ddb, ok := i.Db.Execer.(*db.DB)
425 if !ok {
426 return fmt.Errorf("failed to index profile record, invalid db cast")
427 }
428
429 // get record from db first
430 members, err := db.GetSpindleMembers(
431 ddb,
432 orm.FilterEq("did", did),
433 orm.FilterEq("rkey", rkey),
434 )
435 if err != nil || len(members) != 1 {
436 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
437 }
438 member := members[0]
439
440 tx, err := ddb.Begin()
441 if err != nil {
442 return fmt.Errorf("failed to start txn: %w", err)
443 }
444
445 // remove record by rkey && update enforcer
446 if err = db.RemoveSpindleMember(
447 tx,
448 orm.FilterEq("did", did),
449 orm.FilterEq("rkey", rkey),
450 ); err != nil {
451 return fmt.Errorf("failed to remove from db: %w", err)
452 }
453
454 // update enforcer
455 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
456 if err != nil {
457 return fmt.Errorf("failed to update ACLs: %w", err)
458 }
459
460 if err = tx.Commit(); err != nil {
461 return fmt.Errorf("failed to commit txn: %w", err)
462 }
463
464 if err = i.Enforcer.E.SavePolicy(); err != nil {
465 return fmt.Errorf("failed to save ACLs: %w", err)
466 }
467
468 l.Info("removed spindle member")
469 }
470
471 return nil
472}
473
474func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
475 did := e.Did
476 var err error
477
478 l := i.Logger.With("handler", "ingestSpindle")
479 l = l.With("nsid", e.Commit.Collection)
480
481 switch e.Commit.Operation {
482 case jmodels.CommitOperationCreate:
483 raw := json.RawMessage(e.Commit.Record)
484 record := tangled.Spindle{}
485 err = json.Unmarshal(raw, &record)
486 if err != nil {
487 l.Error("invalid record", "err", err)
488 return err
489 }
490
491 instance := e.Commit.RKey
492
493 ddb, ok := i.Db.Execer.(*db.DB)
494 if !ok {
495 return fmt.Errorf("failed to index profile record, invalid db cast")
496 }
497
498 err := db.AddSpindle(ddb, models.Spindle{
499 Owner: syntax.DID(did),
500 Instance: instance,
501 })
502 if err != nil {
503 l.Error("failed to add spindle to db", "err", err, "instance", instance)
504 return err
505 }
506
507 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
508 if err != nil {
509 l.Error("failed to add spindle to db", "err", err, "instance", instance)
510 return err
511 }
512
513 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
514 if err != nil {
515 return fmt.Errorf("failed to mark verified: %w", err)
516 }
517
518 return nil
519
520 case jmodels.CommitOperationDelete:
521 instance := e.Commit.RKey
522
523 ddb, ok := i.Db.Execer.(*db.DB)
524 if !ok {
525 return fmt.Errorf("failed to index profile record, invalid db cast")
526 }
527
528 // get record from db first
529 spindles, err := db.GetSpindles(
530 ddb,
531 orm.FilterEq("owner", did),
532 orm.FilterEq("instance", instance),
533 )
534 if err != nil || len(spindles) != 1 {
535 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
536 }
537 spindle := spindles[0]
538
539 tx, err := ddb.Begin()
540 if err != nil {
541 return err
542 }
543 defer func() {
544 tx.Rollback()
545 i.Enforcer.E.LoadPolicy()
546 }()
547
548 // remove spindle members first
549 err = db.RemoveSpindleMember(
550 tx,
551 orm.FilterEq("owner", did),
552 orm.FilterEq("instance", instance),
553 )
554 if err != nil {
555 return err
556 }
557
558 err = db.DeleteSpindle(
559 tx,
560 orm.FilterEq("owner", did),
561 orm.FilterEq("instance", instance),
562 )
563 if err != nil {
564 return err
565 }
566
567 if spindle.Verified != nil {
568 err = i.Enforcer.RemoveSpindle(instance)
569 if err != nil {
570 return err
571 }
572 }
573
574 err = tx.Commit()
575 if err != nil {
576 return err
577 }
578
579 err = i.Enforcer.E.SavePolicy()
580 if err != nil {
581 return err
582 }
583 }
584
585 return nil
586}
587
588func (i *Ingester) ingestString(e *jmodels.Event) error {
589 did := e.Did
590 rkey := e.Commit.RKey
591
592 var err error
593
594 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
595 l.Info("ingesting record")
596
597 ddb, ok := i.Db.Execer.(*db.DB)
598 if !ok {
599 return fmt.Errorf("failed to index string record, invalid db cast")
600 }
601
602 switch e.Commit.Operation {
603 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
604 raw := json.RawMessage(e.Commit.Record)
605 record := tangled.String{}
606 err = json.Unmarshal(raw, &record)
607 if err != nil {
608 l.Error("invalid record", "err", err)
609 return err
610 }
611
612 string := models.StringFromRecord(did, rkey, record)
613
614 if err = string.Validate(); err != nil {
615 l.Error("invalid record", "err", err)
616 return err
617 }
618
619 if err = db.AddString(ddb, string); err != nil {
620 l.Error("failed to add string", "err", err)
621 return err
622 }
623
624 return nil
625
626 case jmodels.CommitOperationDelete:
627 if err := db.DeleteString(
628 ddb,
629 orm.FilterEq("did", did),
630 orm.FilterEq("rkey", rkey),
631 ); err != nil {
632 l.Error("failed to delete", "err", err)
633 return fmt.Errorf("failed to delete string record: %w", err)
634 }
635
636 return nil
637 }
638
639 return nil
640}
641
642func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
643 did := e.Did
644 var err error
645
646 l := i.Logger.With("handler", "ingestKnotMember")
647 l = l.With("nsid", e.Commit.Collection)
648
649 switch e.Commit.Operation {
650 case jmodels.CommitOperationCreate:
651 raw := json.RawMessage(e.Commit.Record)
652 record := tangled.KnotMember{}
653 err = json.Unmarshal(raw, &record)
654 if err != nil {
655 l.Error("invalid record", "err", err)
656 return err
657 }
658
659 // only knot owner can invite to knots
660 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
661 if err != nil || !ok {
662 return fmt.Errorf("failed to enforce permissions: %w", err)
663 }
664
665 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
666 if err != nil {
667 return err
668 }
669
670 if memberId.Handle.IsInvalidHandle() {
671 return err
672 }
673
674 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
675 if err != nil {
676 return fmt.Errorf("failed to update ACLs: %w", err)
677 }
678
679 l.Info("added knot member")
680 case jmodels.CommitOperationDelete:
681 // we don't store knot members in a table (like we do for spindle)
682 // and we can't remove this just yet. possibly fixed if we switch
683 // to either:
684 // 1. a knot_members table like with spindle and store the rkey
685 // 2. use the knot host as the rkey
686 //
687 // TODO: implement member deletion
688 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
689 }
690
691 return nil
692}
693
694func (i *Ingester) ingestKnot(e *jmodels.Event) error {
695 did := e.Did
696 var err error
697
698 l := i.Logger.With("handler", "ingestKnot")
699 l = l.With("nsid", e.Commit.Collection)
700
701 switch e.Commit.Operation {
702 case jmodels.CommitOperationCreate:
703 raw := json.RawMessage(e.Commit.Record)
704 record := tangled.Knot{}
705 err = json.Unmarshal(raw, &record)
706 if err != nil {
707 l.Error("invalid record", "err", err)
708 return err
709 }
710
711 domain := e.Commit.RKey
712
713 ddb, ok := i.Db.Execer.(*db.DB)
714 if !ok {
715 return fmt.Errorf("failed to index profile record, invalid db cast")
716 }
717
718 err := db.AddKnot(ddb, domain, did)
719 if err != nil {
720 l.Error("failed to add knot to db", "err", err, "domain", domain)
721 return err
722 }
723
724 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
725 if err != nil {
726 l.Error("failed to verify knot", "err", err, "domain", domain)
727 return err
728 }
729
730 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
731 if err != nil {
732 return fmt.Errorf("failed to mark verified: %w", err)
733 }
734
735 return nil
736
737 case jmodels.CommitOperationDelete:
738 domain := e.Commit.RKey
739
740 ddb, ok := i.Db.Execer.(*db.DB)
741 if !ok {
742 return fmt.Errorf("failed to index knot record, invalid db cast")
743 }
744
745 // get record from db first
746 registrations, err := db.GetRegistrations(
747 ddb,
748 orm.FilterEq("domain", domain),
749 orm.FilterEq("did", did),
750 )
751 if err != nil {
752 return fmt.Errorf("failed to get registration: %w", err)
753 }
754 if len(registrations) != 1 {
755 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
756 }
757 registration := registrations[0]
758
759 tx, err := ddb.Begin()
760 if err != nil {
761 return err
762 }
763 defer func() {
764 tx.Rollback()
765 i.Enforcer.E.LoadPolicy()
766 }()
767
768 err = db.DeleteKnot(
769 tx,
770 orm.FilterEq("did", did),
771 orm.FilterEq("domain", domain),
772 )
773 if err != nil {
774 return err
775 }
776
777 if registration.Registered != nil {
778 err = i.Enforcer.RemoveKnot(domain)
779 if err != nil {
780 return err
781 }
782 }
783
784 err = tx.Commit()
785 if err != nil {
786 return err
787 }
788
789 err = i.Enforcer.E.SavePolicy()
790 if err != nil {
791 return err
792 }
793 }
794
795 return nil
796}
797func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
798 did := e.Did
799 rkey := e.Commit.RKey
800
801 var err error
802
803 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
804 l.Info("ingesting record")
805
806 ddb, ok := i.Db.Execer.(*db.DB)
807 if !ok {
808 return fmt.Errorf("failed to index issue record, invalid db cast")
809 }
810
811 switch e.Commit.Operation {
812 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
813 raw := json.RawMessage(e.Commit.Record)
814 record := tangled.RepoIssue{}
815 err = json.Unmarshal(raw, &record)
816 if err != nil {
817 l.Error("invalid record", "err", err)
818 return err
819 }
820
821 issue := models.IssueFromRecord(did, rkey, record)
822
823 if err := issue.Validate(); err != nil {
824 return fmt.Errorf("failed to validate issue: %w", err)
825 }
826
827 tx, err := ddb.BeginTx(ctx, nil)
828 if err != nil {
829 l.Error("failed to begin transaction", "err", err)
830 return err
831 }
832 defer tx.Rollback()
833
834 err = db.PutIssue(tx, &issue)
835 if err != nil {
836 l.Error("failed to create issue", "err", err)
837 return err
838 }
839
840 err = tx.Commit()
841 if err != nil {
842 l.Error("failed to commit txn", "err", err)
843 return err
844 }
845
846 return nil
847
848 case jmodels.CommitOperationDelete:
849 tx, err := ddb.BeginTx(ctx, nil)
850 if err != nil {
851 l.Error("failed to begin transaction", "err", err)
852 return err
853 }
854 defer tx.Rollback()
855
856 if err := db.DeleteIssues(
857 tx,
858 did,
859 rkey,
860 ); err != nil {
861 l.Error("failed to delete", "err", err)
862 return fmt.Errorf("failed to delete issue record: %w", err)
863 }
864 if err := tx.Commit(); err != nil {
865 l.Error("failed to commit txn", "err", err)
866 return err
867 }
868
869 return nil
870 }
871
872 return nil
873}
874
875func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
876 did := e.Did
877 rkey := e.Commit.RKey
878
879 var err error
880
881 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
882 l.Info("ingesting record")
883
884 ddb, ok := i.Db.Execer.(*db.DB)
885 if !ok {
886 return fmt.Errorf("failed to index issue comment record, invalid db cast")
887 }
888
889 switch e.Commit.Operation {
890 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
891 raw := json.RawMessage(e.Commit.Record)
892 record := tangled.RepoIssueComment{}
893 err = json.Unmarshal(raw, &record)
894 if err != nil {
895 return fmt.Errorf("invalid record: %w", err)
896 }
897
898 comment, err := models.IssueCommentFromRecord(did, rkey, record)
899 if err != nil {
900 return fmt.Errorf("failed to parse comment from record: %w", err)
901 }
902
903 if err := comment.Validate(); err != nil {
904 return fmt.Errorf("failed to validate comment: %w", err)
905 }
906
907 tx, err := ddb.Begin()
908 if err != nil {
909 return fmt.Errorf("failed to start transaction: %w", err)
910 }
911 defer tx.Rollback()
912
913 _, err = db.AddIssueComment(tx, *comment)
914 if err != nil {
915 return fmt.Errorf("failed to create issue comment: %w", err)
916 }
917
918 return tx.Commit()
919
920 case jmodels.CommitOperationDelete:
921 if err := db.DeleteIssueComments(
922 ddb,
923 orm.FilterEq("did", did),
924 orm.FilterEq("rkey", rkey),
925 ); err != nil {
926 return fmt.Errorf("failed to delete issue comment record: %w", err)
927 }
928
929 return nil
930 }
931
932 return nil
933}
934
935func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
936 did := e.Did
937 rkey := e.Commit.RKey
938
939 var err error
940
941 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
942 l.Info("ingesting record")
943
944 ddb, ok := i.Db.Execer.(*db.DB)
945 if !ok {
946 return fmt.Errorf("failed to index label definition, invalid db cast")
947 }
948
949 switch e.Commit.Operation {
950 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
951 raw := json.RawMessage(e.Commit.Record)
952 record := tangled.LabelDefinition{}
953 err = json.Unmarshal(raw, &record)
954 if err != nil {
955 return fmt.Errorf("invalid record: %w", err)
956 }
957
958 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
959 if err != nil {
960 return fmt.Errorf("failed to parse labeldef from record: %w", err)
961 }
962
963 if err := def.Validate(); err != nil {
964 return fmt.Errorf("failed to validate labeldef: %w", err)
965 }
966
967 _, err = db.AddLabelDefinition(ddb, def)
968 if err != nil {
969 return fmt.Errorf("failed to create labeldef: %w", err)
970 }
971
972 return nil
973
974 case jmodels.CommitOperationDelete:
975 if err := db.DeleteLabelDefinition(
976 ddb,
977 orm.FilterEq("did", did),
978 orm.FilterEq("rkey", rkey),
979 ); err != nil {
980 return fmt.Errorf("failed to delete labeldef record: %w", err)
981 }
982
983 return nil
984 }
985
986 return nil
987}
988
989func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
990 did := e.Did
991 rkey := e.Commit.RKey
992
993 var err error
994
995 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
996 l.Info("ingesting record")
997
998 ddb, ok := i.Db.Execer.(*db.DB)
999 if !ok {
1000 return fmt.Errorf("failed to index label op, invalid db cast")
1001 }
1002
1003 switch e.Commit.Operation {
1004 case jmodels.CommitOperationCreate:
1005 raw := json.RawMessage(e.Commit.Record)
1006 record := tangled.LabelOp{}
1007 err = json.Unmarshal(raw, &record)
1008 if err != nil {
1009 return fmt.Errorf("invalid record: %w", err)
1010 }
1011
1012 subject := syntax.ATURI(record.Subject)
1013 collection := subject.Collection()
1014
1015 var repo *models.Repo
1016 switch collection {
1017 case tangled.RepoIssueNSID:
1018 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1019 if err != nil || len(i) != 1 {
1020 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1021 }
1022 repo = i[0].Repo
1023 default:
1024 return fmt.Errorf("unsupport label subject: %s", collection)
1025 }
1026
1027 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1028 if err != nil {
1029 return fmt.Errorf("failed to build label application ctx: %w", err)
1030 }
1031
1032 ops := models.LabelOpsFromRecord(did, rkey, record)
1033
1034 for _, o := range ops {
1035 def, ok := actx.Defs[o.OperandKey]
1036 if !ok {
1037 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1038 }
1039
1040 // validate permissions: only collaborators can apply labels currently
1041 //
1042 // TODO: introduce a repo:triage permission
1043 ok, err := i.Enforcer.IsPushAllowed(o.Did, repo.Knot, repo.DidSlashRepo())
1044 if err != nil {
1045 return fmt.Errorf("enforcing permission: %w", err)
1046 }
1047 if !ok {
1048 return fmt.Errorf("unauthorized label operation")
1049 }
1050
1051 if err := def.ValidateOperandValue(&o); err != nil {
1052 return fmt.Errorf("failed to validate labelop: %w", err)
1053 }
1054 }
1055
1056 tx, err := ddb.Begin()
1057 if err != nil {
1058 return err
1059 }
1060 defer tx.Rollback()
1061
1062 for _, o := range ops {
1063 _, err = db.AddLabelOp(tx, &o)
1064 if err != nil {
1065 return fmt.Errorf("failed to add labelop: %w", err)
1066 }
1067 }
1068
1069 if err = tx.Commit(); err != nil {
1070 return err
1071 }
1072 }
1073
1074 return nil
1075}