A vibe coded tangled fork which supports pijul.
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8 "maps"
9 "slices"
10
11 "time"
12
13 "github.com/bluesky-social/indigo/atproto/syntax"
14 jmodels "github.com/bluesky-social/jetstream/pkg/models"
15 "github.com/go-git/go-git/v5/plumbing"
16 "github.com/ipfs/go-cid"
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/serververify"
22 "tangled.org/core/idresolver"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25)
26
27type Ingester struct {
28 Db db.DbWrapper
29 Enforcer *rbac.Enforcer
30 IdResolver *idresolver.Resolver
31 Config *config.Config
32 Logger *slog.Logger
33}
34
35type processFunc func(ctx context.Context, e *jmodels.Event) error
36
37func (i *Ingester) Ingest() processFunc {
38 return func(ctx context.Context, e *jmodels.Event) error {
39 var err error
40 defer func() {
41 eventTime := e.TimeUS
42 lastTimeUs := eventTime + 1
43 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
44 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
45 }
46 }()
47
48 l := i.Logger.With("kind", e.Kind)
49 switch e.Kind {
50 case jmodels.EventKindAccount:
51 if !e.Account.Active && *e.Account.Status == "deactivated" {
52 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
53 }
54 case jmodels.EventKindIdentity:
55 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
56 case jmodels.EventKindCommit:
57 switch e.Commit.Collection {
58 case tangled.GraphFollowNSID:
59 err = i.ingestFollow(e)
60 case tangled.FeedStarNSID:
61 err = i.ingestStar(e)
62 case tangled.PublicKeyNSID:
63 err = i.ingestPublicKey(e)
64 case tangled.RepoArtifactNSID:
65 err = i.ingestArtifact(e)
66 case tangled.ActorProfileNSID:
67 err = i.ingestProfile(e)
68 case tangled.SpindleMemberNSID:
69 err = i.ingestSpindleMember(ctx, e)
70 case tangled.SpindleNSID:
71 err = i.ingestSpindle(ctx, e)
72 case tangled.KnotMemberNSID:
73 err = i.ingestKnotMember(e)
74 case tangled.KnotNSID:
75 err = i.ingestKnot(e)
76 case tangled.StringNSID:
77 err = i.ingestString(e)
78 case tangled.RepoIssueNSID:
79 err = i.ingestIssue(ctx, e)
80 case tangled.RepoIssueCommentNSID:
81 err = i.ingestIssueComment(e)
82 case tangled.LabelDefinitionNSID:
83 err = i.ingestLabelDefinition(e)
84 case tangled.LabelOpNSID:
85 err = i.ingestLabelOp(e)
86 }
87 l = i.Logger.With("nsid", e.Commit.Collection)
88 }
89
90 if err != nil {
91 l.Warn("refused to ingest record", "err", err)
92 }
93
94 return nil
95 }
96}
97
98func (i *Ingester) ingestStar(e *jmodels.Event) error {
99 var err error
100 did := e.Did
101
102 l := i.Logger.With("handler", "ingestStar")
103 l = l.With("nsid", e.Commit.Collection)
104
105 switch e.Commit.Operation {
106 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
107 var subjectUri syntax.ATURI
108
109 raw := json.RawMessage(e.Commit.Record)
110 record := tangled.FeedStar{}
111 err := json.Unmarshal(raw, &record)
112 if err != nil {
113 l.Error("invalid record", "err", err)
114 return err
115 }
116
117 subjectUri, err = syntax.ParseATURI(record.Subject)
118 if err != nil {
119 l.Error("invalid record", "err", err)
120 return err
121 }
122 err = db.AddStar(i.Db, &models.Star{
123 Did: did,
124 RepoAt: subjectUri,
125 Rkey: e.Commit.RKey,
126 })
127 case jmodels.CommitOperationDelete:
128 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
129 }
130
131 if err != nil {
132 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
133 }
134
135 return nil
136}
137
138func (i *Ingester) ingestFollow(e *jmodels.Event) error {
139 var err error
140 did := e.Did
141
142 l := i.Logger.With("handler", "ingestFollow")
143 l = l.With("nsid", e.Commit.Collection)
144
145 switch e.Commit.Operation {
146 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
147 raw := json.RawMessage(e.Commit.Record)
148 record := tangled.GraphFollow{}
149 err = json.Unmarshal(raw, &record)
150 if err != nil {
151 l.Error("invalid record", "err", err)
152 return err
153 }
154
155 err = db.AddFollow(i.Db, &models.Follow{
156 UserDid: did,
157 SubjectDid: record.Subject,
158 Rkey: e.Commit.RKey,
159 })
160 case jmodels.CommitOperationDelete:
161 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
162 }
163
164 if err != nil {
165 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
166 }
167
168 return nil
169}
170
171func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
172 did := e.Did
173 var err error
174
175 l := i.Logger.With("handler", "ingestPublicKey")
176 l = l.With("nsid", e.Commit.Collection)
177
178 switch e.Commit.Operation {
179 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
180 l.Debug("processing add of pubkey")
181 raw := json.RawMessage(e.Commit.Record)
182 record := tangled.PublicKey{}
183 err = json.Unmarshal(raw, &record)
184 if err != nil {
185 l.Error("invalid record", "err", err)
186 return err
187 }
188
189 name := record.Name
190 key := record.Key
191 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
192 case jmodels.CommitOperationDelete:
193 l.Debug("processing delete of pubkey")
194 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
195 }
196
197 if err != nil {
198 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
199 }
200
201 return nil
202}
203
204func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
205 did := e.Did
206 var err error
207
208 l := i.Logger.With("handler", "ingestArtifact")
209 l = l.With("nsid", e.Commit.Collection)
210
211 switch e.Commit.Operation {
212 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
213 raw := json.RawMessage(e.Commit.Record)
214 record := tangled.RepoArtifact{}
215 err = json.Unmarshal(raw, &record)
216 if err != nil {
217 l.Error("invalid record", "err", err)
218 return err
219 }
220
221 repoAt, err := syntax.ParseATURI(record.Repo)
222 if err != nil {
223 return err
224 }
225
226 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
227 if err != nil {
228 return err
229 }
230
231 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
232 if err != nil || !ok {
233 return err
234 }
235
236 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
237 if err != nil {
238 createdAt = time.Now()
239 }
240
241 artifact := models.Artifact{
242 Did: did,
243 Rkey: e.Commit.RKey,
244 RepoAt: repoAt,
245 Tag: plumbing.Hash(record.Tag),
246 CreatedAt: createdAt,
247 BlobCid: cid.Cid(record.Artifact.Ref),
248 Name: record.Name,
249 Size: uint64(record.Artifact.Size),
250 MimeType: record.Artifact.MimeType,
251 }
252
253 err = db.AddArtifact(i.Db, artifact)
254 case jmodels.CommitOperationDelete:
255 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
256 }
257
258 if err != nil {
259 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
260 }
261
262 return nil
263}
264
265func (i *Ingester) ingestProfile(e *jmodels.Event) error {
266 did := e.Did
267 var err error
268
269 l := i.Logger.With("handler", "ingestProfile")
270 l = l.With("nsid", e.Commit.Collection)
271
272 if e.Commit.RKey != "self" {
273 return fmt.Errorf("ingestProfile only ingests `self` record")
274 }
275
276 switch e.Commit.Operation {
277 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
278 raw := json.RawMessage(e.Commit.Record)
279 record := tangled.ActorProfile{}
280 err = json.Unmarshal(raw, &record)
281 if err != nil {
282 l.Error("invalid record", "err", err)
283 return err
284 }
285
286 avatar := ""
287 if record.Avatar != nil {
288 avatar = record.Avatar.Ref.String()
289 }
290
291 description := ""
292 if record.Description != nil {
293 description = *record.Description
294 }
295
296 includeBluesky := record.Bluesky
297
298 pronouns := ""
299 if record.Pronouns != nil {
300 pronouns = *record.Pronouns
301 }
302
303 location := ""
304 if record.Location != nil {
305 location = *record.Location
306 }
307
308 var links [5]string
309 for i, l := range record.Links {
310 if i < 5 {
311 links[i] = l
312 }
313 }
314
315 var stats [2]models.VanityStat
316 for i, s := range record.Stats {
317 if i < 2 {
318 stats[i].Kind = models.VanityStatKind(s)
319 }
320 }
321
322 var pinned [6]syntax.ATURI
323 for i, r := range record.PinnedRepositories {
324 if i < 6 {
325 pinned[i] = syntax.ATURI(r)
326 }
327 }
328
329 profile := models.Profile{
330 Did: did,
331 Avatar: avatar,
332 Description: description,
333 IncludeBluesky: includeBluesky,
334 Location: location,
335 Links: links,
336 Stats: stats,
337 PinnedRepos: pinned,
338 Pronouns: pronouns,
339 }
340
341 ddb, ok := i.Db.Execer.(*db.DB)
342 if !ok {
343 return fmt.Errorf("failed to index profile record, invalid db cast")
344 }
345
346 tx, err := ddb.Begin()
347 if err != nil {
348 return fmt.Errorf("failed to start transaction")
349 }
350 defer tx.Rollback()
351
352 err = db.ValidateProfile(tx, &profile)
353 if err != nil {
354 return fmt.Errorf("invalid profile record")
355 }
356
357 err = db.UpsertProfile(tx, &profile)
358 if err != nil {
359 return fmt.Errorf("upserting profile: %w", err)
360 }
361
362 err = tx.Commit()
363 case jmodels.CommitOperationDelete:
364 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
365 }
366
367 if err != nil {
368 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
369 }
370
371 return nil
372}
373
374func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
375 did := e.Did
376 var err error
377
378 l := i.Logger.With("handler", "ingestSpindleMember")
379 l = l.With("nsid", e.Commit.Collection)
380
381 switch e.Commit.Operation {
382 case jmodels.CommitOperationCreate:
383 raw := json.RawMessage(e.Commit.Record)
384 record := tangled.SpindleMember{}
385 err = json.Unmarshal(raw, &record)
386 if err != nil {
387 l.Error("invalid record", "err", err)
388 return err
389 }
390
391 // only spindle owner can invite to spindles
392 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
393 if err != nil || !ok {
394 return fmt.Errorf("failed to enforce permissions: %w", err)
395 }
396
397 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
398 if err != nil {
399 return err
400 }
401
402 if memberId.Handle.IsInvalidHandle() {
403 return err
404 }
405
406 ddb, ok := i.Db.Execer.(*db.DB)
407 if !ok {
408 return fmt.Errorf("failed to index profile record, invalid db cast")
409 }
410
411 err = db.AddSpindleMember(ddb, models.SpindleMember{
412 Did: syntax.DID(did),
413 Rkey: e.Commit.RKey,
414 Instance: record.Instance,
415 Subject: memberId.DID,
416 })
417 if !ok {
418 return fmt.Errorf("failed to add to db: %w", err)
419 }
420
421 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
422 if err != nil {
423 return fmt.Errorf("failed to update ACLs: %w", err)
424 }
425
426 l.Info("added spindle member")
427 case jmodels.CommitOperationDelete:
428 rkey := e.Commit.RKey
429
430 ddb, ok := i.Db.Execer.(*db.DB)
431 if !ok {
432 return fmt.Errorf("failed to index profile record, invalid db cast")
433 }
434
435 // get record from db first
436 members, err := db.GetSpindleMembers(
437 ddb,
438 orm.FilterEq("did", did),
439 orm.FilterEq("rkey", rkey),
440 )
441 if err != nil || len(members) != 1 {
442 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
443 }
444 member := members[0]
445
446 tx, err := ddb.Begin()
447 if err != nil {
448 return fmt.Errorf("failed to start txn: %w", err)
449 }
450
451 // remove record by rkey && update enforcer
452 if err = db.RemoveSpindleMember(
453 tx,
454 orm.FilterEq("did", did),
455 orm.FilterEq("rkey", rkey),
456 ); err != nil {
457 return fmt.Errorf("failed to remove from db: %w", err)
458 }
459
460 // update enforcer
461 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
462 if err != nil {
463 return fmt.Errorf("failed to update ACLs: %w", err)
464 }
465
466 if err = tx.Commit(); err != nil {
467 return fmt.Errorf("failed to commit txn: %w", err)
468 }
469
470 if err = i.Enforcer.E.SavePolicy(); err != nil {
471 return fmt.Errorf("failed to save ACLs: %w", err)
472 }
473
474 l.Info("removed spindle member")
475 }
476
477 return nil
478}
479
480func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
481 did := e.Did
482 var err error
483
484 l := i.Logger.With("handler", "ingestSpindle")
485 l = l.With("nsid", e.Commit.Collection)
486
487 switch e.Commit.Operation {
488 case jmodels.CommitOperationCreate:
489 raw := json.RawMessage(e.Commit.Record)
490 record := tangled.Spindle{}
491 err = json.Unmarshal(raw, &record)
492 if err != nil {
493 l.Error("invalid record", "err", err)
494 return err
495 }
496
497 instance := e.Commit.RKey
498
499 ddb, ok := i.Db.Execer.(*db.DB)
500 if !ok {
501 return fmt.Errorf("failed to index profile record, invalid db cast")
502 }
503
504 err := db.AddSpindle(ddb, models.Spindle{
505 Owner: syntax.DID(did),
506 Instance: instance,
507 })
508 if err != nil {
509 l.Error("failed to add spindle to db", "err", err, "instance", instance)
510 return err
511 }
512
513 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
514 if err != nil {
515 l.Error("failed to add spindle to db", "err", err, "instance", instance)
516 return err
517 }
518
519 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
520 if err != nil {
521 return fmt.Errorf("failed to mark verified: %w", err)
522 }
523
524 return nil
525
526 case jmodels.CommitOperationDelete:
527 instance := e.Commit.RKey
528
529 ddb, ok := i.Db.Execer.(*db.DB)
530 if !ok {
531 return fmt.Errorf("failed to index profile record, invalid db cast")
532 }
533
534 // get record from db first
535 spindles, err := db.GetSpindles(
536 ddb,
537 orm.FilterEq("owner", did),
538 orm.FilterEq("instance", instance),
539 )
540 if err != nil || len(spindles) != 1 {
541 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
542 }
543 spindle := spindles[0]
544
545 tx, err := ddb.Begin()
546 if err != nil {
547 return err
548 }
549 defer func() {
550 tx.Rollback()
551 i.Enforcer.E.LoadPolicy()
552 }()
553
554 // remove spindle members first
555 err = db.RemoveSpindleMember(
556 tx,
557 orm.FilterEq("owner", did),
558 orm.FilterEq("instance", instance),
559 )
560 if err != nil {
561 return err
562 }
563
564 err = db.DeleteSpindle(
565 tx,
566 orm.FilterEq("owner", did),
567 orm.FilterEq("instance", instance),
568 )
569 if err != nil {
570 return err
571 }
572
573 if spindle.Verified != nil {
574 err = i.Enforcer.RemoveSpindle(instance)
575 if err != nil {
576 return err
577 }
578 }
579
580 err = tx.Commit()
581 if err != nil {
582 return err
583 }
584
585 err = i.Enforcer.E.SavePolicy()
586 if err != nil {
587 return err
588 }
589 }
590
591 return nil
592}
593
594func (i *Ingester) ingestString(e *jmodels.Event) error {
595 did := e.Did
596 rkey := e.Commit.RKey
597
598 var err error
599
600 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
601 l.Info("ingesting record")
602
603 ddb, ok := i.Db.Execer.(*db.DB)
604 if !ok {
605 return fmt.Errorf("failed to index string record, invalid db cast")
606 }
607
608 switch e.Commit.Operation {
609 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
610 raw := json.RawMessage(e.Commit.Record)
611 record := tangled.String{}
612 err = json.Unmarshal(raw, &record)
613 if err != nil {
614 l.Error("invalid record", "err", err)
615 return err
616 }
617
618 string := models.StringFromRecord(did, rkey, record)
619
620 if err = string.Validate(); err != nil {
621 l.Error("invalid record", "err", err)
622 return err
623 }
624
625 if err = db.AddString(ddb, string); err != nil {
626 l.Error("failed to add string", "err", err)
627 return err
628 }
629
630 return nil
631
632 case jmodels.CommitOperationDelete:
633 if err := db.DeleteString(
634 ddb,
635 orm.FilterEq("did", did),
636 orm.FilterEq("rkey", rkey),
637 ); err != nil {
638 l.Error("failed to delete", "err", err)
639 return fmt.Errorf("failed to delete string record: %w", err)
640 }
641
642 return nil
643 }
644
645 return nil
646}
647
648func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
649 did := e.Did
650 var err error
651
652 l := i.Logger.With("handler", "ingestKnotMember")
653 l = l.With("nsid", e.Commit.Collection)
654
655 switch e.Commit.Operation {
656 case jmodels.CommitOperationCreate:
657 raw := json.RawMessage(e.Commit.Record)
658 record := tangled.KnotMember{}
659 err = json.Unmarshal(raw, &record)
660 if err != nil {
661 l.Error("invalid record", "err", err)
662 return err
663 }
664
665 // only knot owner can invite to knots
666 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
667 if err != nil || !ok {
668 return fmt.Errorf("failed to enforce permissions: %w", err)
669 }
670
671 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
672 if err != nil {
673 return err
674 }
675
676 if memberId.Handle.IsInvalidHandle() {
677 return err
678 }
679
680 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
681 if err != nil {
682 return fmt.Errorf("failed to update ACLs: %w", err)
683 }
684
685 l.Info("added knot member")
686 case jmodels.CommitOperationDelete:
687 // we don't store knot members in a table (like we do for spindle)
688 // and we can't remove this just yet. possibly fixed if we switch
689 // to either:
690 // 1. a knot_members table like with spindle and store the rkey
691 // 2. use the knot host as the rkey
692 //
693 // TODO: implement member deletion
694 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
695 }
696
697 return nil
698}
699
700func (i *Ingester) ingestKnot(e *jmodels.Event) error {
701 did := e.Did
702 var err error
703
704 l := i.Logger.With("handler", "ingestKnot")
705 l = l.With("nsid", e.Commit.Collection)
706
707 switch e.Commit.Operation {
708 case jmodels.CommitOperationCreate:
709 raw := json.RawMessage(e.Commit.Record)
710 record := tangled.Knot{}
711 err = json.Unmarshal(raw, &record)
712 if err != nil {
713 l.Error("invalid record", "err", err)
714 return err
715 }
716
717 domain := e.Commit.RKey
718
719 ddb, ok := i.Db.Execer.(*db.DB)
720 if !ok {
721 return fmt.Errorf("failed to index profile record, invalid db cast")
722 }
723
724 err := db.AddKnot(ddb, domain, did)
725 if err != nil {
726 l.Error("failed to add knot to db", "err", err, "domain", domain)
727 return err
728 }
729
730 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
731 if err != nil {
732 l.Error("failed to verify knot", "err", err, "domain", domain)
733 return err
734 }
735
736 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
737 if err != nil {
738 return fmt.Errorf("failed to mark verified: %w", err)
739 }
740
741 return nil
742
743 case jmodels.CommitOperationDelete:
744 domain := e.Commit.RKey
745
746 ddb, ok := i.Db.Execer.(*db.DB)
747 if !ok {
748 return fmt.Errorf("failed to index knot record, invalid db cast")
749 }
750
751 // get record from db first
752 registrations, err := db.GetRegistrations(
753 ddb,
754 orm.FilterEq("domain", domain),
755 orm.FilterEq("did", did),
756 )
757 if err != nil {
758 return fmt.Errorf("failed to get registration: %w", err)
759 }
760 if len(registrations) != 1 {
761 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
762 }
763 registration := registrations[0]
764
765 tx, err := ddb.Begin()
766 if err != nil {
767 return err
768 }
769 defer func() {
770 tx.Rollback()
771 i.Enforcer.E.LoadPolicy()
772 }()
773
774 err = db.DeleteKnot(
775 tx,
776 orm.FilterEq("did", did),
777 orm.FilterEq("domain", domain),
778 )
779 if err != nil {
780 return err
781 }
782
783 if registration.Registered != nil {
784 err = i.Enforcer.RemoveKnot(domain)
785 if err != nil {
786 return err
787 }
788 }
789
790 err = tx.Commit()
791 if err != nil {
792 return err
793 }
794
795 err = i.Enforcer.E.SavePolicy()
796 if err != nil {
797 return err
798 }
799 }
800
801 return nil
802}
803func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
804 did := e.Did
805 rkey := e.Commit.RKey
806
807 var err error
808
809 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
810 l.Info("ingesting record")
811
812 ddb, ok := i.Db.Execer.(*db.DB)
813 if !ok {
814 return fmt.Errorf("failed to index issue record, invalid db cast")
815 }
816
817 switch e.Commit.Operation {
818 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
819 raw := json.RawMessage(e.Commit.Record)
820 record := tangled.RepoIssue{}
821 err = json.Unmarshal(raw, &record)
822 if err != nil {
823 l.Error("invalid record", "err", err)
824 return err
825 }
826
827 issue := models.IssueFromRecord(did, rkey, record)
828
829 if err := issue.Validate(); err != nil {
830 return fmt.Errorf("failed to validate issue: %w", err)
831 }
832
833 tx, err := ddb.BeginTx(ctx, nil)
834 if err != nil {
835 l.Error("failed to begin transaction", "err", err)
836 return err
837 }
838 defer tx.Rollback()
839
840 err = db.PutIssue(tx, &issue)
841 if err != nil {
842 l.Error("failed to create issue", "err", err)
843 return err
844 }
845
846 err = tx.Commit()
847 if err != nil {
848 l.Error("failed to commit txn", "err", err)
849 return err
850 }
851
852 return nil
853
854 case jmodels.CommitOperationDelete:
855 tx, err := ddb.BeginTx(ctx, nil)
856 if err != nil {
857 l.Error("failed to begin transaction", "err", err)
858 return err
859 }
860 defer tx.Rollback()
861
862 if err := db.DeleteIssues(
863 tx,
864 did,
865 rkey,
866 ); err != nil {
867 l.Error("failed to delete", "err", err)
868 return fmt.Errorf("failed to delete issue record: %w", err)
869 }
870 if err := tx.Commit(); err != nil {
871 l.Error("failed to commit txn", "err", err)
872 return err
873 }
874
875 return nil
876 }
877
878 return nil
879}
880
881func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
882 did := e.Did
883 rkey := e.Commit.RKey
884
885 var err error
886
887 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
888 l.Info("ingesting record")
889
890 ddb, ok := i.Db.Execer.(*db.DB)
891 if !ok {
892 return fmt.Errorf("failed to index issue comment record, invalid db cast")
893 }
894
895 switch e.Commit.Operation {
896 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
897 raw := json.RawMessage(e.Commit.Record)
898 record := tangled.RepoIssueComment{}
899 err = json.Unmarshal(raw, &record)
900 if err != nil {
901 return fmt.Errorf("invalid record: %w", err)
902 }
903
904 comment, err := models.IssueCommentFromRecord(did, rkey, record)
905 if err != nil {
906 return fmt.Errorf("failed to parse comment from record: %w", err)
907 }
908
909 if err := comment.Validate(); err != nil {
910 return fmt.Errorf("failed to validate comment: %w", err)
911 }
912
913 tx, err := ddb.Begin()
914 if err != nil {
915 return fmt.Errorf("failed to start transaction: %w", err)
916 }
917 defer tx.Rollback()
918
919 _, err = db.AddIssueComment(tx, *comment)
920 if err != nil {
921 return fmt.Errorf("failed to create issue comment: %w", err)
922 }
923
924 return tx.Commit()
925
926 case jmodels.CommitOperationDelete:
927 if err := db.DeleteIssueComments(
928 ddb,
929 orm.FilterEq("did", did),
930 orm.FilterEq("rkey", rkey),
931 ); err != nil {
932 return fmt.Errorf("failed to delete issue comment record: %w", err)
933 }
934
935 return nil
936 }
937
938 return nil
939}
940
941func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
942 did := e.Did
943 rkey := e.Commit.RKey
944
945 var err error
946
947 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
948 l.Info("ingesting record")
949
950 ddb, ok := i.Db.Execer.(*db.DB)
951 if !ok {
952 return fmt.Errorf("failed to index label definition, invalid db cast")
953 }
954
955 switch e.Commit.Operation {
956 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
957 raw := json.RawMessage(e.Commit.Record)
958 record := tangled.LabelDefinition{}
959 err = json.Unmarshal(raw, &record)
960 if err != nil {
961 return fmt.Errorf("invalid record: %w", err)
962 }
963
964 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
965 if err != nil {
966 return fmt.Errorf("failed to parse labeldef from record: %w", err)
967 }
968
969 if err := def.Validate(); err != nil {
970 return fmt.Errorf("failed to validate labeldef: %w", err)
971 }
972
973 _, err = db.AddLabelDefinition(ddb, def)
974 if err != nil {
975 return fmt.Errorf("failed to create labeldef: %w", err)
976 }
977
978 return nil
979
980 case jmodels.CommitOperationDelete:
981 if err := db.DeleteLabelDefinition(
982 ddb,
983 orm.FilterEq("did", did),
984 orm.FilterEq("rkey", rkey),
985 ); err != nil {
986 return fmt.Errorf("failed to delete labeldef record: %w", err)
987 }
988
989 return nil
990 }
991
992 return nil
993}
994
995func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
996 did := e.Did
997 rkey := e.Commit.RKey
998
999 var err error
1000
1001 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1002 l.Info("ingesting record")
1003
1004 ddb, ok := i.Db.Execer.(*db.DB)
1005 if !ok {
1006 return fmt.Errorf("failed to index label op, invalid db cast")
1007 }
1008
1009 switch e.Commit.Operation {
1010 case jmodels.CommitOperationCreate:
1011 raw := json.RawMessage(e.Commit.Record)
1012 record := tangled.LabelOp{}
1013 err = json.Unmarshal(raw, &record)
1014 if err != nil {
1015 return fmt.Errorf("invalid record: %w", err)
1016 }
1017
1018 subject := syntax.ATURI(record.Subject)
1019 collection := subject.Collection()
1020
1021 var repo *models.Repo
1022 switch collection {
1023 case tangled.RepoIssueNSID:
1024 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1025 if err != nil || len(i) != 1 {
1026 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1027 }
1028 repo = i[0].Repo
1029 default:
1030 return fmt.Errorf("unsupport label subject: %s", collection)
1031 }
1032
1033 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1034 if err != nil {
1035 return fmt.Errorf("failed to build label application ctx: %w", err)
1036 }
1037
1038 ops := models.LabelOpsFromRecord(did, rkey, record)
1039
1040 for _, o := range ops {
1041 def, ok := actx.Defs[o.OperandKey]
1042 if !ok {
1043 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1044 }
1045
1046 // validate permissions: only collaborators can apply labels currently
1047 //
1048 // TODO: introduce a repo:triage permission
1049 ok, err := i.Enforcer.IsPushAllowed(o.Did, repo.Knot, repo.DidSlashRepo())
1050 if err != nil {
1051 return fmt.Errorf("enforcing permission: %w", err)
1052 }
1053 if !ok {
1054 return fmt.Errorf("unauthorized label operation")
1055 }
1056
1057 if err := def.ValidateOperandValue(&o); err != nil {
1058 return fmt.Errorf("failed to validate labelop: %w", err)
1059 }
1060 }
1061
1062 tx, err := ddb.Begin()
1063 if err != nil {
1064 return err
1065 }
1066 defer tx.Rollback()
1067
1068 for _, o := range ops {
1069 _, err = db.AddLabelOp(tx, &o)
1070 if err != nil {
1071 return fmt.Errorf("failed to add labelop: %w", err)
1072 }
1073 }
1074
1075 if err = tx.Commit(); err != nil {
1076 return err
1077 }
1078 }
1079
1080 return nil
1081}