A vibe coded tangled fork which supports pijul.
1package appview
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log/slog"
9 "maps"
10 "net/http"
11 "net/url"
12 "slices"
13 "sync"
14
15 "time"
16
17 "github.com/bluesky-social/indigo/atproto/syntax"
18 jmodels "github.com/bluesky-social/jetstream/pkg/models"
19 "github.com/go-git/go-git/v5/plumbing"
20 "github.com/ipfs/go-cid"
21 "golang.org/x/sync/errgroup"
22 "tangled.org/core/api/tangled"
23 "tangled.org/core/appview/config"
24 "tangled.org/core/appview/db"
25 "tangled.org/core/appview/models"
26 "tangled.org/core/appview/serververify"
27 "tangled.org/core/appview/validator"
28 "tangled.org/core/idresolver"
29 "tangled.org/core/orm"
30 "tangled.org/core/rbac"
31)
32
33type Ingester struct {
34 Db db.DbWrapper
35 Enforcer *rbac.Enforcer
36 IdResolver *idresolver.Resolver
37 Config *config.Config
38 Logger *slog.Logger
39 Validator *validator.Validator
40}
41
42type processFunc func(ctx context.Context, e *jmodels.Event) error
43
44func (i *Ingester) Ingest() processFunc {
45 return func(ctx context.Context, e *jmodels.Event) error {
46 var err error
47 defer func() {
48 eventTime := e.TimeUS
49 lastTimeUs := eventTime + 1
50 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil {
51 err = fmt.Errorf("(deferred) failed to save last time us: %w", err)
52 }
53 }()
54
55 l := i.Logger.With("kind", e.Kind)
56 switch e.Kind {
57 case jmodels.EventKindAccount:
58 if !e.Account.Active && *e.Account.Status == "deactivated" {
59 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did)
60 }
61 case jmodels.EventKindIdentity:
62 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did)
63 case jmodels.EventKindCommit:
64 switch e.Commit.Collection {
65 case tangled.GraphFollowNSID:
66 err = i.ingestFollow(e)
67 case tangled.FeedStarNSID:
68 err = i.ingestStar(e)
69 case tangled.PublicKeyNSID:
70 err = i.ingestPublicKey(e)
71 case tangled.RepoArtifactNSID:
72 err = i.ingestArtifact(e)
73 case tangled.ActorProfileNSID:
74 err = i.ingestProfile(e)
75 case tangled.SpindleMemberNSID:
76 err = i.ingestSpindleMember(ctx, e)
77 case tangled.SpindleNSID:
78 err = i.ingestSpindle(ctx, e)
79 case tangled.KnotMemberNSID:
80 err = i.ingestKnotMember(e)
81 case tangled.KnotNSID:
82 err = i.ingestKnot(e)
83 case tangled.StringNSID:
84 err = i.ingestString(e)
85 case tangled.RepoIssueNSID:
86 err = i.ingestIssue(ctx, e)
87 case tangled.RepoPullNSID:
88 err = i.ingestPull(ctx, e)
89 case tangled.RepoIssueCommentNSID:
90 err = i.ingestIssueComment(e)
91 case tangled.LabelDefinitionNSID:
92 err = i.ingestLabelDefinition(e)
93 case tangled.LabelOpNSID:
94 err = i.ingestLabelOp(e)
95 }
96 l = i.Logger.With("nsid", e.Commit.Collection)
97 }
98
99 if err != nil {
100 l.Warn("refused to ingest record", "err", err)
101 }
102
103 return nil
104 }
105}
106
107func (i *Ingester) ingestStar(e *jmodels.Event) error {
108 var err error
109 did := e.Did
110
111 l := i.Logger.With("handler", "ingestStar")
112 l = l.With("nsid", e.Commit.Collection)
113
114 switch e.Commit.Operation {
115 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
116 var subjectUri syntax.ATURI
117
118 raw := json.RawMessage(e.Commit.Record)
119 record := tangled.FeedStar{}
120 err := json.Unmarshal(raw, &record)
121 if err != nil {
122 l.Error("invalid record", "err", err)
123 return err
124 }
125
126 subjectUri, err = syntax.ParseATURI(record.Subject)
127 if err != nil {
128 l.Error("invalid record", "err", err)
129 return err
130 }
131 err = db.AddStar(i.Db, &models.Star{
132 Did: did,
133 RepoAt: subjectUri,
134 Rkey: e.Commit.RKey,
135 })
136 case jmodels.CommitOperationDelete:
137 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
138 }
139
140 if err != nil {
141 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err)
142 }
143
144 return nil
145}
146
147func (i *Ingester) ingestFollow(e *jmodels.Event) error {
148 var err error
149 did := e.Did
150
151 l := i.Logger.With("handler", "ingestFollow")
152 l = l.With("nsid", e.Commit.Collection)
153
154 switch e.Commit.Operation {
155 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
156 raw := json.RawMessage(e.Commit.Record)
157 record := tangled.GraphFollow{}
158 err = json.Unmarshal(raw, &record)
159 if err != nil {
160 l.Error("invalid record", "err", err)
161 return err
162 }
163
164 err = db.AddFollow(i.Db, &models.Follow{
165 UserDid: did,
166 SubjectDid: record.Subject,
167 Rkey: e.Commit.RKey,
168 })
169 case jmodels.CommitOperationDelete:
170 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey)
171 }
172
173 if err != nil {
174 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err)
175 }
176
177 return nil
178}
179
180func (i *Ingester) ingestPublicKey(e *jmodels.Event) error {
181 did := e.Did
182 var err error
183
184 l := i.Logger.With("handler", "ingestPublicKey")
185 l = l.With("nsid", e.Commit.Collection)
186
187 switch e.Commit.Operation {
188 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
189 l.Debug("processing add of pubkey")
190 raw := json.RawMessage(e.Commit.Record)
191 record := tangled.PublicKey{}
192 err = json.Unmarshal(raw, &record)
193 if err != nil {
194 l.Error("invalid record", "err", err)
195 return err
196 }
197
198 name := record.Name
199 key := record.Key
200 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey)
201 case jmodels.CommitOperationDelete:
202 l.Debug("processing delete of pubkey")
203 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey)
204 }
205
206 if err != nil {
207 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err)
208 }
209
210 return nil
211}
212
213func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
214 did := e.Did
215 var err error
216
217 l := i.Logger.With("handler", "ingestArtifact")
218 l = l.With("nsid", e.Commit.Collection)
219
220 switch e.Commit.Operation {
221 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
222 raw := json.RawMessage(e.Commit.Record)
223 record := tangled.RepoArtifact{}
224 err = json.Unmarshal(raw, &record)
225 if err != nil {
226 l.Error("invalid record", "err", err)
227 return err
228 }
229
230 repoAt, err := syntax.ParseATURI(record.Repo)
231 if err != nil {
232 return err
233 }
234
235 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String())
236 if err != nil {
237 return err
238 }
239
240 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push")
241 if err != nil || !ok {
242 return err
243 }
244
245 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt)
246 if err != nil {
247 createdAt = time.Now()
248 }
249
250 artifact := models.Artifact{
251 Did: did,
252 Rkey: e.Commit.RKey,
253 RepoAt: repoAt,
254 Tag: plumbing.Hash(record.Tag),
255 CreatedAt: createdAt,
256 BlobCid: cid.Cid(record.Artifact.Ref),
257 Name: record.Name,
258 Size: uint64(record.Artifact.Size),
259 MimeType: record.Artifact.MimeType,
260 }
261
262 err = db.AddArtifact(i.Db, artifact)
263 case jmodels.CommitOperationDelete:
264 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
265 }
266
267 if err != nil {
268 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err)
269 }
270
271 return nil
272}
273
274func (i *Ingester) ingestProfile(e *jmodels.Event) error {
275 did := e.Did
276 var err error
277
278 l := i.Logger.With("handler", "ingestProfile")
279 l = l.With("nsid", e.Commit.Collection)
280
281 if e.Commit.RKey != "self" {
282 return fmt.Errorf("ingestProfile only ingests `self` record")
283 }
284
285 switch e.Commit.Operation {
286 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
287 raw := json.RawMessage(e.Commit.Record)
288 record := tangled.ActorProfile{}
289 err = json.Unmarshal(raw, &record)
290 if err != nil {
291 l.Error("invalid record", "err", err)
292 return err
293 }
294
295 avatar := ""
296 if record.Avatar != nil {
297 avatar = record.Avatar.Ref.String()
298 }
299
300 description := ""
301 if record.Description != nil {
302 description = *record.Description
303 }
304
305 includeBluesky := record.Bluesky
306
307 pronouns := ""
308 if record.Pronouns != nil {
309 pronouns = *record.Pronouns
310 }
311
312 location := ""
313 if record.Location != nil {
314 location = *record.Location
315 }
316
317 var links [5]string
318 for i, l := range record.Links {
319 if i < 5 {
320 links[i] = l
321 }
322 }
323
324 var stats [2]models.VanityStat
325 for i, s := range record.Stats {
326 if i < 2 {
327 stats[i].Kind = models.ParseVanityStatKind(s)
328 }
329 }
330
331 var pinned [6]syntax.ATURI
332 for i, r := range record.PinnedRepositories {
333 if i < 6 {
334 pinned[i] = syntax.ATURI(r)
335 }
336 }
337
338 profile := models.Profile{
339 Did: did,
340 Avatar: avatar,
341 Description: description,
342 IncludeBluesky: includeBluesky,
343 Location: location,
344 Links: links,
345 Stats: stats,
346 PinnedRepos: pinned,
347 Pronouns: pronouns,
348 }
349
350 ddb, ok := i.Db.Execer.(*db.DB)
351 if !ok {
352 return fmt.Errorf("failed to index profile record, invalid db cast")
353 }
354
355 tx, err := ddb.Begin()
356 if err != nil {
357 return fmt.Errorf("failed to start transaction")
358 }
359
360 err = db.ValidateProfile(tx, &profile)
361 if err != nil {
362 return fmt.Errorf("invalid profile record")
363 }
364
365 err = db.UpsertProfile(tx, &profile)
366 case jmodels.CommitOperationDelete:
367 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey))
368 }
369
370 if err != nil {
371 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err)
372 }
373
374 return nil
375}
376
377func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error {
378 did := e.Did
379 var err error
380
381 l := i.Logger.With("handler", "ingestSpindleMember")
382 l = l.With("nsid", e.Commit.Collection)
383
384 switch e.Commit.Operation {
385 case jmodels.CommitOperationCreate:
386 raw := json.RawMessage(e.Commit.Record)
387 record := tangled.SpindleMember{}
388 err = json.Unmarshal(raw, &record)
389 if err != nil {
390 l.Error("invalid record", "err", err)
391 return err
392 }
393
394 // only spindle owner can invite to spindles
395 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance)
396 if err != nil || !ok {
397 return fmt.Errorf("failed to enforce permissions: %w", err)
398 }
399
400 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject)
401 if err != nil {
402 return err
403 }
404
405 if memberId.Handle.IsInvalidHandle() {
406 return err
407 }
408
409 ddb, ok := i.Db.Execer.(*db.DB)
410 if !ok {
411 return fmt.Errorf("invalid db cast")
412 }
413
414 err = db.AddSpindleMember(ddb, models.SpindleMember{
415 Did: syntax.DID(did),
416 Rkey: e.Commit.RKey,
417 Instance: record.Instance,
418 Subject: memberId.DID,
419 })
420 if !ok {
421 return fmt.Errorf("failed to add to db: %w", err)
422 }
423
424 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String())
425 if err != nil {
426 return fmt.Errorf("failed to update ACLs: %w", err)
427 }
428
429 l.Info("added spindle member")
430 case jmodels.CommitOperationDelete:
431 rkey := e.Commit.RKey
432
433 ddb, ok := i.Db.Execer.(*db.DB)
434 if !ok {
435 return fmt.Errorf("failed to index profile record, invalid db cast")
436 }
437
438 // get record from db first
439 members, err := db.GetSpindleMembers(
440 ddb,
441 orm.FilterEq("did", did),
442 orm.FilterEq("rkey", rkey),
443 )
444 if err != nil || len(members) != 1 {
445 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members))
446 }
447 member := members[0]
448
449 tx, err := ddb.Begin()
450 if err != nil {
451 return fmt.Errorf("failed to start txn: %w", err)
452 }
453
454 // remove record by rkey && update enforcer
455 if err = db.RemoveSpindleMember(
456 tx,
457 orm.FilterEq("did", did),
458 orm.FilterEq("rkey", rkey),
459 ); err != nil {
460 return fmt.Errorf("failed to remove from db: %w", err)
461 }
462
463 // update enforcer
464 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String())
465 if err != nil {
466 return fmt.Errorf("failed to update ACLs: %w", err)
467 }
468
469 if err = tx.Commit(); err != nil {
470 return fmt.Errorf("failed to commit txn: %w", err)
471 }
472
473 if err = i.Enforcer.E.SavePolicy(); err != nil {
474 return fmt.Errorf("failed to save ACLs: %w", err)
475 }
476
477 l.Info("removed spindle member")
478 }
479
480 return nil
481}
482
483func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error {
484 did := e.Did
485 var err error
486
487 l := i.Logger.With("handler", "ingestSpindle")
488 l = l.With("nsid", e.Commit.Collection)
489
490 switch e.Commit.Operation {
491 case jmodels.CommitOperationCreate:
492 raw := json.RawMessage(e.Commit.Record)
493 record := tangled.Spindle{}
494 err = json.Unmarshal(raw, &record)
495 if err != nil {
496 l.Error("invalid record", "err", err)
497 return err
498 }
499
500 instance := e.Commit.RKey
501
502 ddb, ok := i.Db.Execer.(*db.DB)
503 if !ok {
504 return fmt.Errorf("failed to index profile record, invalid db cast")
505 }
506
507 err := db.AddSpindle(ddb, models.Spindle{
508 Owner: syntax.DID(did),
509 Instance: instance,
510 })
511 if err != nil {
512 l.Error("failed to add spindle to db", "err", err, "instance", instance)
513 return err
514 }
515
516 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev)
517 if err != nil {
518 l.Error("failed to add spindle to db", "err", err, "instance", instance)
519 return err
520 }
521
522 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
523 if err != nil {
524 return fmt.Errorf("failed to mark verified: %w", err)
525 }
526
527 return nil
528
529 case jmodels.CommitOperationDelete:
530 instance := e.Commit.RKey
531
532 ddb, ok := i.Db.Execer.(*db.DB)
533 if !ok {
534 return fmt.Errorf("failed to index profile record, invalid db cast")
535 }
536
537 // get record from db first
538 spindles, err := db.GetSpindles(
539 ddb,
540 orm.FilterEq("owner", did),
541 orm.FilterEq("instance", instance),
542 )
543 if err != nil || len(spindles) != 1 {
544 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles))
545 }
546 spindle := spindles[0]
547
548 tx, err := ddb.Begin()
549 if err != nil {
550 return err
551 }
552 defer func() {
553 tx.Rollback()
554 i.Enforcer.E.LoadPolicy()
555 }()
556
557 // remove spindle members first
558 err = db.RemoveSpindleMember(
559 tx,
560 orm.FilterEq("owner", did),
561 orm.FilterEq("instance", instance),
562 )
563 if err != nil {
564 return err
565 }
566
567 err = db.DeleteSpindle(
568 tx,
569 orm.FilterEq("owner", did),
570 orm.FilterEq("instance", instance),
571 )
572 if err != nil {
573 return err
574 }
575
576 if spindle.Verified != nil {
577 err = i.Enforcer.RemoveSpindle(instance)
578 if err != nil {
579 return err
580 }
581 }
582
583 err = tx.Commit()
584 if err != nil {
585 return err
586 }
587
588 err = i.Enforcer.E.SavePolicy()
589 if err != nil {
590 return err
591 }
592 }
593
594 return nil
595}
596
597func (i *Ingester) ingestString(e *jmodels.Event) error {
598 did := e.Did
599 rkey := e.Commit.RKey
600
601 var err error
602
603 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
604 l.Info("ingesting record")
605
606 ddb, ok := i.Db.Execer.(*db.DB)
607 if !ok {
608 return fmt.Errorf("failed to index string record, invalid db cast")
609 }
610
611 switch e.Commit.Operation {
612 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
613 raw := json.RawMessage(e.Commit.Record)
614 record := tangled.String{}
615 err = json.Unmarshal(raw, &record)
616 if err != nil {
617 l.Error("invalid record", "err", err)
618 return err
619 }
620
621 string := models.StringFromRecord(did, rkey, record)
622
623 if err = i.Validator.ValidateString(&string); err != nil {
624 l.Error("invalid record", "err", err)
625 return err
626 }
627
628 if err = db.AddString(ddb, string); err != nil {
629 l.Error("failed to add string", "err", err)
630 return err
631 }
632
633 return nil
634
635 case jmodels.CommitOperationDelete:
636 if err := db.DeleteString(
637 ddb,
638 orm.FilterEq("did", did),
639 orm.FilterEq("rkey", rkey),
640 ); err != nil {
641 l.Error("failed to delete", "err", err)
642 return fmt.Errorf("failed to delete string record: %w", err)
643 }
644
645 return nil
646 }
647
648 return nil
649}
650
651func (i *Ingester) ingestKnotMember(e *jmodels.Event) error {
652 did := e.Did
653 var err error
654
655 l := i.Logger.With("handler", "ingestKnotMember")
656 l = l.With("nsid", e.Commit.Collection)
657
658 switch e.Commit.Operation {
659 case jmodels.CommitOperationCreate:
660 raw := json.RawMessage(e.Commit.Record)
661 record := tangled.KnotMember{}
662 err = json.Unmarshal(raw, &record)
663 if err != nil {
664 l.Error("invalid record", "err", err)
665 return err
666 }
667
668 // only knot owner can invite to knots
669 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain)
670 if err != nil || !ok {
671 return fmt.Errorf("failed to enforce permissions: %w", err)
672 }
673
674 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject)
675 if err != nil {
676 return err
677 }
678
679 if memberId.Handle.IsInvalidHandle() {
680 return err
681 }
682
683 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String())
684 if err != nil {
685 return fmt.Errorf("failed to update ACLs: %w", err)
686 }
687
688 l.Info("added knot member")
689 case jmodels.CommitOperationDelete:
690 // we don't store knot members in a table (like we do for spindle)
691 // and we can't remove this just yet. possibly fixed if we switch
692 // to either:
693 // 1. a knot_members table like with spindle and store the rkey
694 // 2. use the knot host as the rkey
695 //
696 // TODO: implement member deletion
697 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey)
698 }
699
700 return nil
701}
702
703func (i *Ingester) ingestKnot(e *jmodels.Event) error {
704 did := e.Did
705 var err error
706
707 l := i.Logger.With("handler", "ingestKnot")
708 l = l.With("nsid", e.Commit.Collection)
709
710 switch e.Commit.Operation {
711 case jmodels.CommitOperationCreate:
712 raw := json.RawMessage(e.Commit.Record)
713 record := tangled.Knot{}
714 err = json.Unmarshal(raw, &record)
715 if err != nil {
716 l.Error("invalid record", "err", err)
717 return err
718 }
719
720 domain := e.Commit.RKey
721
722 ddb, ok := i.Db.Execer.(*db.DB)
723 if !ok {
724 return fmt.Errorf("failed to index profile record, invalid db cast")
725 }
726
727 err := db.AddKnot(ddb, domain, did)
728 if err != nil {
729 l.Error("failed to add knot to db", "err", err, "domain", domain)
730 return err
731 }
732
733 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev)
734 if err != nil {
735 l.Error("failed to verify knot", "err", err, "domain", domain)
736 return err
737 }
738
739 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
740 if err != nil {
741 return fmt.Errorf("failed to mark verified: %w", err)
742 }
743
744 return nil
745
746 case jmodels.CommitOperationDelete:
747 domain := e.Commit.RKey
748
749 ddb, ok := i.Db.Execer.(*db.DB)
750 if !ok {
751 return fmt.Errorf("failed to index knot record, invalid db cast")
752 }
753
754 // get record from db first
755 registrations, err := db.GetRegistrations(
756 ddb,
757 orm.FilterEq("domain", domain),
758 orm.FilterEq("did", did),
759 )
760 if err != nil {
761 return fmt.Errorf("failed to get registration: %w", err)
762 }
763 if len(registrations) != 1 {
764 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
765 }
766 registration := registrations[0]
767
768 tx, err := ddb.Begin()
769 if err != nil {
770 return err
771 }
772 defer func() {
773 tx.Rollback()
774 i.Enforcer.E.LoadPolicy()
775 }()
776
777 err = db.DeleteKnot(
778 tx,
779 orm.FilterEq("did", did),
780 orm.FilterEq("domain", domain),
781 )
782 if err != nil {
783 return err
784 }
785
786 if registration.Registered != nil {
787 err = i.Enforcer.RemoveKnot(domain)
788 if err != nil {
789 return err
790 }
791 }
792
793 err = tx.Commit()
794 if err != nil {
795 return err
796 }
797
798 err = i.Enforcer.E.SavePolicy()
799 if err != nil {
800 return err
801 }
802 }
803
804 return nil
805}
806func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error {
807 did := e.Did
808 rkey := e.Commit.RKey
809
810 var err error
811
812 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
813 l.Info("ingesting record")
814
815 ddb, ok := i.Db.Execer.(*db.DB)
816 if !ok {
817 return fmt.Errorf("failed to index issue record, invalid db cast")
818 }
819
820 switch e.Commit.Operation {
821 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
822 raw := json.RawMessage(e.Commit.Record)
823 record := tangled.RepoIssue{}
824 err = json.Unmarshal(raw, &record)
825 if err != nil {
826 l.Error("invalid record", "err", err)
827 return err
828 }
829
830 issue := models.IssueFromRecord(did, rkey, record)
831
832 if err := i.Validator.ValidateIssue(&issue); err != nil {
833 return fmt.Errorf("failed to validate issue: %w", err)
834 }
835
836 tx, err := ddb.BeginTx(ctx, nil)
837 if err != nil {
838 l.Error("failed to begin transaction", "err", err)
839 return err
840 }
841 defer tx.Rollback()
842
843 err = db.PutIssue(tx, &issue)
844 if err != nil {
845 l.Error("failed to create issue", "err", err)
846 return err
847 }
848
849 err = tx.Commit()
850 if err != nil {
851 l.Error("failed to commit txn", "err", err)
852 return err
853 }
854
855 return nil
856
857 case jmodels.CommitOperationDelete:
858 tx, err := ddb.BeginTx(ctx, nil)
859 if err != nil {
860 l.Error("failed to begin transaction", "err", err)
861 return err
862 }
863 defer tx.Rollback()
864
865 if err := db.DeleteIssues(
866 tx,
867 did,
868 rkey,
869 ); err != nil {
870 l.Error("failed to delete", "err", err)
871 return fmt.Errorf("failed to delete issue record: %w", err)
872 }
873 if err := tx.Commit(); err != nil {
874 l.Error("failed to commit txn", "err", err)
875 return err
876 }
877
878 return nil
879 }
880
881 return nil
882}
883
884func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error {
885 did := e.Did
886 rkey := e.Commit.RKey
887
888 var err error
889
890 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
891 l.Info("ingesting record")
892
893 ddb, ok := i.Db.Execer.(*db.DB)
894 if !ok {
895 return fmt.Errorf("failed to index pull record, invalid db cast")
896 }
897
898 switch e.Commit.Operation {
899 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
900 raw := json.RawMessage(e.Commit.Record)
901 record := tangled.RepoPull{}
902 err = json.Unmarshal(raw, &record)
903 if err != nil {
904 l.Error("invalid record", "err", err)
905 return err
906 }
907
908 ownerId, err := i.IdResolver.ResolveIdent(ctx, did)
909 if err != nil {
910 l.Error("failed to resolve did")
911 return err
912 }
913
914 // go through and fetch all blobs in parallel
915 readers := make([]*io.ReadCloser, len(record.Rounds))
916 var mu sync.Mutex
917
918 g, gctx := errgroup.WithContext(ctx)
919
920 for idx, b := range record.Rounds {
921 g.Go(func() error {
922 ownerPds := ownerId.PDSEndpoint()
923 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds))
924 q := url.Query()
925 q.Set("cid", b.PatchBlob.Ref.String())
926 q.Set("did", did)
927 url.RawQuery = q.Encode()
928
929 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil)
930 if err != nil {
931 l.Error("failed to create request")
932 return err
933 }
934 req.Header.Set("Content-Type", "application/json")
935
936 resp, err := http.DefaultClient.Do(req)
937 if err != nil {
938 l.Error("failed to make request")
939 return err
940 }
941
942 mu.Lock()
943 readers[idx] = &resp.Body
944 mu.Unlock()
945
946 return nil
947 })
948 }
949
950 if err := g.Wait(); err != nil {
951 for _, r := range readers {
952 if r != nil && *r != nil {
953 (*r).Close()
954 }
955 }
956 return err
957 }
958
959 defer func() {
960 for _, r := range readers {
961 if r != nil && *r != nil {
962 (*r).Close()
963 }
964 }
965 }()
966
967 pull := models.PullFromRecord(did, rkey, record, readers)
968 // if err := i.Validator.ValidateIssue(&issue); err != nil {
969 // return fmt.Errorf("failed to validate issue: %w", err)
970 // }
971
972 tx, err := ddb.BeginTx(ctx, nil)
973 if err != nil {
974 l.Error("failed to begin transaction", "err", err)
975 return err
976 }
977 defer tx.Rollback()
978
979 err = db.PutPull(tx, &pull)
980 if err != nil {
981 l.Error("failed to create pull", "err", err)
982 return err
983 }
984
985 err = tx.Commit()
986 if err != nil {
987 l.Error("failed to commit txn", "err", err)
988 return err
989 }
990
991 return nil
992
993 case jmodels.CommitOperationDelete:
994 tx, err := ddb.BeginTx(ctx, nil)
995 if err != nil {
996 l.Error("failed to begin transaction", "err", err)
997 return err
998 }
999 defer tx.Rollback()
1000
1001 if err := db.AbandonPulls(
1002 tx,
1003 orm.FilterEq("owner_did", did),
1004 orm.FilterEq("rkey", rkey),
1005 ); err != nil {
1006 l.Error("failed to abandon", "err", err)
1007 return fmt.Errorf("failed to abandon pull record: %w", err)
1008 }
1009 if err := tx.Commit(); err != nil {
1010 l.Error("failed to commit txn", "err", err)
1011 return err
1012 }
1013
1014 return nil
1015 }
1016
1017 return nil
1018}
1019
1020func (i *Ingester) ingestIssueComment(e *jmodels.Event) error {
1021 did := e.Did
1022 rkey := e.Commit.RKey
1023
1024 var err error
1025
1026 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1027 l.Info("ingesting record")
1028
1029 ddb, ok := i.Db.Execer.(*db.DB)
1030 if !ok {
1031 return fmt.Errorf("failed to index issue comment record, invalid db cast")
1032 }
1033
1034 switch e.Commit.Operation {
1035 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1036 raw := json.RawMessage(e.Commit.Record)
1037 record := tangled.RepoIssueComment{}
1038 err = json.Unmarshal(raw, &record)
1039 if err != nil {
1040 return fmt.Errorf("invalid record: %w", err)
1041 }
1042
1043 comment, err := models.IssueCommentFromRecord(did, rkey, record)
1044 if err != nil {
1045 return fmt.Errorf("failed to parse comment from record: %w", err)
1046 }
1047
1048 if err := i.Validator.ValidateIssueComment(comment); err != nil {
1049 return fmt.Errorf("failed to validate comment: %w", err)
1050 }
1051
1052 tx, err := ddb.Begin()
1053 if err != nil {
1054 return fmt.Errorf("failed to start transaction: %w", err)
1055 }
1056 defer tx.Rollback()
1057
1058 _, err = db.AddIssueComment(tx, *comment)
1059 if err != nil {
1060 return fmt.Errorf("failed to create issue comment: %w", err)
1061 }
1062
1063 return tx.Commit()
1064
1065 case jmodels.CommitOperationDelete:
1066 if err := db.DeleteIssueComments(
1067 ddb,
1068 orm.FilterEq("did", did),
1069 orm.FilterEq("rkey", rkey),
1070 ); err != nil {
1071 return fmt.Errorf("failed to delete issue comment record: %w", err)
1072 }
1073
1074 return nil
1075 }
1076
1077 return nil
1078}
1079
1080func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error {
1081 did := e.Did
1082 rkey := e.Commit.RKey
1083
1084 var err error
1085
1086 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1087 l.Info("ingesting record")
1088
1089 ddb, ok := i.Db.Execer.(*db.DB)
1090 if !ok {
1091 return fmt.Errorf("failed to index label definition, invalid db cast")
1092 }
1093
1094 switch e.Commit.Operation {
1095 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1096 raw := json.RawMessage(e.Commit.Record)
1097 record := tangled.LabelDefinition{}
1098 err = json.Unmarshal(raw, &record)
1099 if err != nil {
1100 return fmt.Errorf("invalid record: %w", err)
1101 }
1102
1103 def, err := models.LabelDefinitionFromRecord(did, rkey, record)
1104 if err != nil {
1105 return fmt.Errorf("failed to parse labeldef from record: %w", err)
1106 }
1107
1108 if err := i.Validator.ValidateLabelDefinition(def); err != nil {
1109 return fmt.Errorf("failed to validate labeldef: %w", err)
1110 }
1111
1112 _, err = db.AddLabelDefinition(ddb, def)
1113 if err != nil {
1114 return fmt.Errorf("failed to create labeldef: %w", err)
1115 }
1116
1117 return nil
1118
1119 case jmodels.CommitOperationDelete:
1120 if err := db.DeleteLabelDefinition(
1121 ddb,
1122 orm.FilterEq("did", did),
1123 orm.FilterEq("rkey", rkey),
1124 ); err != nil {
1125 return fmt.Errorf("failed to delete labeldef record: %w", err)
1126 }
1127
1128 return nil
1129 }
1130
1131 return nil
1132}
1133
1134func (i *Ingester) ingestLabelOp(e *jmodels.Event) error {
1135 did := e.Did
1136 rkey := e.Commit.RKey
1137
1138 var err error
1139
1140 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1141 l.Info("ingesting record")
1142
1143 ddb, ok := i.Db.Execer.(*db.DB)
1144 if !ok {
1145 return fmt.Errorf("failed to index label op, invalid db cast")
1146 }
1147
1148 switch e.Commit.Operation {
1149 case jmodels.CommitOperationCreate:
1150 raw := json.RawMessage(e.Commit.Record)
1151 record := tangled.LabelOp{}
1152 err = json.Unmarshal(raw, &record)
1153 if err != nil {
1154 return fmt.Errorf("invalid record: %w", err)
1155 }
1156
1157 subject := syntax.ATURI(record.Subject)
1158 collection := subject.Collection()
1159
1160 var repo *models.Repo
1161 switch collection {
1162 case tangled.RepoIssueNSID:
1163 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1164 if err != nil || len(i) != 1 {
1165 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1166 }
1167 repo = i[0].Repo
1168 default:
1169 return fmt.Errorf("unsupport label subject: %s", collection)
1170 }
1171
1172 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1173 if err != nil {
1174 return fmt.Errorf("failed to build label application ctx: %w", err)
1175 }
1176
1177 ops := models.LabelOpsFromRecord(did, rkey, record)
1178
1179 for _, o := range ops {
1180 def, ok := actx.Defs[o.OperandKey]
1181 if !ok {
1182 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs)))
1183 }
1184 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil {
1185 return fmt.Errorf("failed to validate labelop: %w", err)
1186 }
1187 }
1188
1189 tx, err := ddb.Begin()
1190 if err != nil {
1191 return err
1192 }
1193 defer tx.Rollback()
1194
1195 for _, o := range ops {
1196 _, err = db.AddLabelOp(tx, &o)
1197 if err != nil {
1198 return fmt.Errorf("failed to add labelop: %w", err)
1199 }
1200 }
1201
1202 if err = tx.Commit(); err != nil {
1203 return err
1204 }
1205 }
1206
1207 return nil
1208}