A vibe coded tangled fork which supports pijul.
1package ingester
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "log/slog"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/api/tangled"
11 "tangled.org/core/appview/config"
12 "tangled.org/core/appview/db"
13 "tangled.org/core/appview/models"
14 "tangled.org/core/appview/notify"
15 "tangled.org/core/log"
16 "tangled.org/core/orm"
17 "tangled.org/core/rbac2"
18 "tangled.org/core/tapc"
19)
20
21type Ingester struct {
22 cfg *config.Config
23 db *db.DB
24 e *rbac2.Enforcer
25 notifier notify.Notifier
26 l *slog.Logger
27}
28
29// TODO: finish with rbac/v1
30// TODO: just don't notify state events for now (we need full object)
31
32func (i *Ingester) ProcessEvent(ctx context.Context, evt tapc.Event) error {
33 var err error
34 switch evt.Type {
35 case tapc.EvtRecord:
36 revt := evt.Record
37 ctx = log.IntoContext(ctx, i.l.With("record", revt.AtUri()))
38 // NOTE: sort by alphabetical order
39 switch revt.Collection.String() {
40 case tangled.ActorProfileNSID:
41 err = i.ingestActorProfile(ctx, revt)
42 case tangled.FeedReactionNSID:
43 err = i.ingestFeedReaction(ctx, revt)
44 case tangled.FeedStarNSID:
45 err = i.ingestFeedStar(ctx, revt)
46 case tangled.GraphFollowNSID:
47 err = i.ingestGraphFollow(ctx, revt)
48 case tangled.KnotMemberNSID:
49 err = i.ingestKnotMember(ctx, revt)
50 case tangled.KnotNSID:
51 err = i.ingestKnot(ctx, revt)
52 case tangled.LabelDefinitionNSID:
53 err = i.ingestLabelDefinition(ctx, revt)
54 case tangled.LabelOpNSID:
55 err = i.ingestLabelOp(ctx, revt)
56 case tangled.PublicKeyNSID:
57 err = i.ingestPublicKey(ctx, revt)
58 case tangled.RepoArtifactNSID:
59 err = i.ingestRepoArtifact(ctx, revt)
60 case tangled.RepoIssueCommentNSID:
61 err = i.ingestRepoIssueComment(ctx, revt)
62 case tangled.RepoIssueNSID:
63 err = i.ingestRepoIssue(ctx, revt)
64 case tangled.RepoIssueStateNSID:
65 err = i.ingestRepoIssueState(ctx, revt)
66 case tangled.RepoNSID:
67 err = i.ingestRepo(ctx, revt)
68 case tangled.RepoPullCommentNSID:
69 err = i.ingestRepoPullComment(ctx, revt)
70 case tangled.RepoPullNSID:
71 err = i.ingestRepoPull(ctx, revt)
72 case tangled.RepoPullStatusNSID:
73 err = i.ingestRepoPullStatus(ctx, revt)
74 case tangled.SpindleMemberNSID:
75 err = i.ingestSpindleMember(ctx, revt)
76 case tangled.SpindleNSID:
77 err = i.ingestSpindle(ctx, revt)
78 case tangled.StringNSID:
79 err = i.ingestString(ctx, revt)
80 }
81 case tapc.EvtIdentity:
82 // no-op
83 }
84
85 if err != nil {
86 i.l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
87 return err
88 }
89 return nil
90}
91
92func (i *Ingester) ingestActorProfile(ctx context.Context, evt *tapc.RecordEventData) error {
93 // ignore invalid rkey
94 if evt.Rkey.String() != "self" {
95 return nil
96 }
97
98 switch evt.Action {
99 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
100 var record tangled.ActorProfile
101 if err := json.Unmarshal(evt.Record, &record); err != nil {
102 return fmt.Errorf("parsing record json: %w", err)
103 }
104
105 profile, err := models.ProfileFromRecord(evt.Did, record)
106 if err != nil {
107 i.l.Warn("ignoring invalid profile record", "err", err)
108 return nil
109 }
110
111 tx, err := i.db.BeginTx(ctx, nil)
112 if err != nil {
113 return fmt.Errorf("starting transaction: %w", err)
114 }
115 defer tx.Rollback()
116
117 if err := db.UpsertProfile(tx, &profile); err != nil {
118 return fmt.Errorf("upserting profile: %w", err)
119 }
120
121 if err := tx.Commit(); err != nil {
122 return fmt.Errorf("commiting profile upsert: %w", err)
123 }
124 case tapc.RecordDeleteAction:
125 if err := db.DeleteProfile(i.db, evt.Did); err != nil {
126 return fmt.Errorf("deleting profile from db: %w", err)
127 }
128 }
129 return nil
130}
131
132func (i *Ingester) ingestFeedReaction(_ context.Context, evt *tapc.RecordEventData) error {
133 switch evt.Action {
134 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
135 var record tangled.FeedReaction
136 if err := json.Unmarshal(evt.Record, &record); err != nil {
137 return fmt.Errorf("parsing record json: %w", err)
138 }
139
140 reaction, err := models.ReactionFromRecord(evt.Did, evt.Rkey, record)
141 if err != nil {
142 i.l.Warn("ignoring invalid reaction record", "err", err)
143 return nil
144 }
145
146 if err := db.UpsertReaction(i.db, reaction); err != nil {
147 return fmt.Errorf("upserting reaction record")
148 }
149 case tapc.RecordDeleteAction:
150 if err := db.DeleteReactionByRkey(
151 i.db,
152 evt.Did.String(),
153 evt.Rkey.String(),
154 ); err != nil {
155 return fmt.Errorf("deleting reaction from db: %w", err)
156 }
157 }
158 return nil
159}
160
161func (i *Ingester) ingestFeedStar(_ context.Context, evt *tapc.RecordEventData) error {
162 switch evt.Action {
163 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
164 var record tangled.FeedStar
165 if err := json.Unmarshal(evt.Record, &record); err != nil {
166 return fmt.Errorf("parsing record json: %w", err)
167 }
168
169 star, err := models.StarFromRecord(evt.Did, evt.Rkey, record)
170 if err != nil {
171 i.l.Warn("ignoring invalid star record", "err", err)
172 return nil
173 }
174
175 if err := db.UpsertStar(i.db, star); err != nil {
176 return fmt.Errorf("upserting star record")
177 }
178 case tapc.RecordDeleteAction:
179 if err := db.DeleteStarByRkey(
180 i.db,
181 evt.Did.String(),
182 evt.Rkey.String(),
183 ); err != nil {
184 return fmt.Errorf("deleting record from db: %w", err)
185 }
186 }
187 return nil
188}
189
190func (i *Ingester) ingestGraphFollow(_ context.Context, evt *tapc.RecordEventData) error {
191 switch evt.Action {
192 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
193 var record tangled.GraphFollow
194 if err := json.Unmarshal(evt.Record, &record); err != nil {
195 return fmt.Errorf("parsing record json: %w", err)
196 }
197
198 follow, err := models.FollowFromRecord(evt.Did, evt.Rkey, record)
199 if err != nil {
200 i.l.Warn("ignoring invalid follow record", "err", err)
201 return nil
202 }
203
204 if err := db.UpsertFollow(i.db, follow); err != nil {
205 return fmt.Errorf("upserting follow record")
206 }
207 case tapc.RecordDeleteAction:
208 if err := db.DeleteFollowByRkey(
209 i.db,
210 evt.Did.String(),
211 evt.Rkey.String(),
212 ); err != nil {
213 return fmt.Errorf("deleting record from db: %w", err)
214 }
215 }
216 return nil
217}
218
219// TODO: let's just remove the knot.member record
220func (i *Ingester) ingestKnotMember(_ context.Context, evt *tapc.RecordEventData) error {
221 switch evt.Action {
222 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
223 var record tangled.KnotMember
224 if err := json.Unmarshal(evt.Record, &record); err != nil {
225 return fmt.Errorf("parsing record json: %w", err)
226 }
227 panic("unimplemented")
228 case tapc.RecordDeleteAction:
229 panic("unimplemented")
230 }
231 return nil
232}
233
234func (i *Ingester) ingestKnot(ctx context.Context, evt *tapc.RecordEventData) error {
235 switch evt.Action {
236 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
237 var record tangled.Knot
238 if err := json.Unmarshal(evt.Record, &record); err != nil {
239 return fmt.Errorf("parsing record json: %w", err)
240 }
241
242 domain := evt.Rkey.String()
243
244 if err := db.AddKnot(i.db, domain, evt.Did.String()); err != nil {
245 return fmt.Errorf("upserting knot: %w", err)
246 }
247
248 // TODO: hmmm should we run verification here?
249 // There can be unverified knot in user profile.
250 panic("unimplemented")
251 case tapc.RecordDeleteAction:
252 domain := evt.Rkey.String()
253
254 // get record from db first
255 registration, err := func(domain string, did syntax.DID) (models.Registration, error) {
256 registrations, err := db.GetRegistrations(
257 i.db,
258 orm.FilterEq("domain", domain),
259 orm.FilterEq("did", evt.Did),
260 )
261 if err != nil {
262 return models.Registration{}, err
263 }
264 if len(registrations) != 1 {
265 return models.Registration{}, fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations))
266 }
267 return registrations[0], nil
268 }(domain, evt.Did)
269 if err != nil {
270 return fmt.Errorf("getting registration: %w", err)
271 }
272
273 tx, err := i.db.BeginTx(ctx, nil)
274 if err != nil {
275 return fmt.Errorf("starting transaction: %w", err)
276 }
277 defer tx.Rollback()
278 // TODO: rollback enforcer
279
280 if err := db.DeleteKnot(
281 tx,
282 orm.FilterEq("did", evt.Did),
283 orm.FilterEq("domain", domain),
284 ); err != nil {
285 return fmt.Errorf("deleting knot: %w", err)
286 }
287
288 if registration.Registered != nil {
289 // TODO: clear from enforcer
290 panic("unimplemented")
291 }
292
293 if err := tx.Commit(); err != nil {
294 return fmt.Errorf("commiting transaction: %w", err)
295 }
296 }
297 return nil
298}
299
300func (i *Ingester) ingestLabelDefinition(_ context.Context, evt *tapc.RecordEventData) error {
301 switch evt.Action {
302 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
303 var record tangled.LabelDefinition
304 if err := json.Unmarshal(evt.Record, &record); err != nil {
305 return fmt.Errorf("parsing record json: %w", err)
306 }
307
308 def, err := models.LabelDefinitionFromRecord(evt.Did.String(), evt.Rkey.String(), record)
309 if err != nil {
310 return fmt.Errorf("failed to parse labeldef from record: %w", err)
311 }
312
313 if err := def.Validate(); err != nil {
314 i.l.Warn("ignoring invalid label def record", "err", err)
315 return nil
316 }
317
318 if _, err := db.UpsertLabelDefinition(i.db, def); err != nil {
319 return fmt.Errorf("upserting label definition")
320 }
321 case tapc.RecordDeleteAction:
322 if err := db.DeleteLabelDefinition(
323 i.db,
324 orm.FilterEq("did", evt.Did),
325 orm.FilterEq("rkey", evt.Rkey),
326 ); err != nil {
327 return fmt.Errorf("deleting record from db: %w", err)
328 }
329 }
330 return nil
331}
332
333// TODO: label.op record is not designed to be mutable. should be reimplemented
334func (i *Ingester) ingestLabelOp(ctx context.Context, evt *tapc.RecordEventData) error {
335 switch evt.Action {
336 case tapc.RecordCreateAction:
337 var record tangled.LabelOp
338 if err := json.Unmarshal(evt.Record, &record); err != nil {
339 return fmt.Errorf("parsing record json: %w", err)
340 }
341
342 // TODO:
343 // 1. validate permissions
344 // 2. labelOp.Subject -> (subject).Repo
345 // 3. get all label definition for that repo, constructing actx
346 ops := models.LabelOpsFromRecord(evt.Did.String(), evt.Rkey.String(), record)
347 for _, o := range ops {
348 // 4. find label def based on o.OperandKey (AT-URI to the label definition)
349 // 5. validate labelOp from def
350 panic("unimplemented")
351 }
352
353 tx, err := i.db.BeginTx(ctx, nil)
354 if err != nil {
355 return fmt.Errorf("starting transaction: %w", err)
356 }
357 defer tx.Rollback()
358
359 for _, o := range ops {
360 _, err := db.AddLabelOp(tx, &o)
361 if err != nil {
362 return fmt.Errorf("adding label op: %w", err)
363 }
364 }
365
366 if err := tx.Commit(); err != nil {
367 return fmt.Errorf("commiting transaction: %w", err)
368 }
369 case tapc.RecordUpdateAction:
370 // no-op. we are ignoring update action for label.op records
371 case tapc.RecordDeleteAction:
372 // no-op. we are ignoring delete action for label.op records
373 }
374 return nil
375}
376
377func (i *Ingester) ingestPublicKey(_ context.Context, evt *tapc.RecordEventData) error {
378 switch evt.Action {
379 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
380 var record tangled.PublicKey
381 if err := json.Unmarshal(evt.Record, &record); err != nil {
382 return fmt.Errorf("parsing record json: %w", err)
383 }
384
385 pubKey, err := models.PublicKeyFromRecord(evt.Did, evt.Rkey, record)
386 if err != nil {
387 i.l.Warn("ignoring invalid publicKey record", "err", err)
388 return nil
389 }
390 if err := pubKey.Validate(); err != nil {
391 i.l.Warn("ignoring invalid publicKey record", "err", err)
392 return nil
393 }
394
395 if err := db.UpsertPublicKey(i.db, pubKey); err != nil {
396 return fmt.Errorf("upserting publicKey record")
397 }
398 case tapc.RecordDeleteAction:
399 if err := db.DeletePublicKeyByRkey(
400 i.db,
401 evt.Did.String(),
402 evt.Rkey.String(),
403 ); err != nil {
404 return fmt.Errorf("deleting record from db: %w", err)
405 }
406 }
407 return nil
408}
409
410// so this is one of the reasons why we need repo-DID
411// and possibly its own PDS.
412// for things need permission like repo artifact, even its not collaborative,
413// we need to pass the Knot to check the authority.
414// We cannot really distribute the arbitrary RBAC rules in meaningful way.
415// That's pretty hard and fragile.
416// Instead, when any data is belongs to the repo (no matter who created it), it should be owned
417// by the repo. I mean the actual data should be.
418// If someone removed the original artifact from their PDS, or if their PDS goes down, we lost
419// a way to backfill relateed data to construct the repo view.
420
421func (i *Ingester) ingestRepoArtifact(_ context.Context, evt *tapc.RecordEventData) error {
422 switch evt.Action {
423 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
424 var record tangled.RepoArtifact
425 if err := json.Unmarshal(evt.Record, &record); err != nil {
426 return fmt.Errorf("parsing record json: %w", err)
427 }
428
429 artifact, err := models.ArtifactFromRecord(evt.Did, evt.Rkey, record)
430 if err != nil {
431 i.l.Warn("ignoring invalid artifact record", "err", err)
432 return nil
433 }
434
435 if err := db.UpsertArtifact(i.db, artifact); err != nil {
436 return fmt.Errorf("upserting artifact: %w", err)
437 }
438 case tapc.RecordDeleteAction:
439 if err := db.DeleteArtifact(
440 i.db,
441 orm.FilterEq("did", evt.Did),
442 orm.FilterEq("rkey", evt.Rkey),
443 ); err != nil {
444 return fmt.Errorf("deleting record from db: %w", err)
445 }
446 }
447 return nil
448}
449
450func (i *Ingester) ingestRepoIssueComment(ctx context.Context, evt *tapc.RecordEventData) error {
451 switch evt.Action {
452 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
453 var record tangled.RepoIssueComment
454 if err := json.Unmarshal(evt.Record, &record); err != nil {
455 return fmt.Errorf("parsing record json: %w", err)
456 }
457
458 comment, err := models.IssueCommentFromRecord(evt.Did.String(), evt.Rkey.String(), record)
459 if err != nil {
460 i.l.Warn("ignoring invalid issue.comment record", "err", err)
461 return nil
462 }
463
464 tx, err := i.db.BeginTx(ctx, nil)
465 if err != nil {
466 return fmt.Errorf("starting transaction: %w", err)
467 }
468 defer tx.Rollback()
469
470 if _, err = db.UpsertIssueComment(tx, *comment); err != nil {
471 return fmt.Errorf("upserting issue comment: %w", err)
472 }
473
474 if err := tx.Commit(); err != nil {
475 return fmt.Errorf("commiting transaction: %w", err)
476 }
477 case tapc.RecordDeleteAction:
478 if err := db.DeleteIssueComments(
479 i.db,
480 orm.FilterEq("did", evt.Did),
481 orm.FilterEq("rkey", evt.Rkey),
482 ); err != nil {
483 return fmt.Errorf("deleting issue comment: %w", err)
484 }
485 }
486 return nil
487}
488
489func (i *Ingester) ingestRepoIssue(ctx context.Context, evt *tapc.RecordEventData) error {
490 switch evt.Action {
491 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
492 var record tangled.RepoIssue
493 if err := json.Unmarshal(evt.Record, &record); err != nil {
494 return fmt.Errorf("parsing record json: %w", err)
495 }
496
497 issue := models.IssueFromRecord(evt.Did.String(), evt.Rkey.String(), record)
498 if err := issue.Validate(); err != nil {
499 i.l.Warn("ignoring invalid issue record", "err", err)
500 return nil
501 }
502
503 tx, err := i.db.BeginTx(ctx, nil)
504 if err != nil {
505 return fmt.Errorf("starting transaction: %w", err)
506 }
507 defer tx.Rollback()
508
509 if err := db.PutIssue(tx, &issue); err != nil {
510 return fmt.Errorf("upserting issue: %w", err)
511 }
512
513 if err := tx.Commit(); err != nil {
514 return fmt.Errorf("commiting issue upsert: %w", err)
515 }
516
517 if evt.Action == tapc.RecordCreateAction {
518 i.notifier.NewIssue(ctx, &issue, issue.Mentions)
519 }
520 case tapc.RecordDeleteAction:
521 tx, err := i.db.BeginTx(ctx, nil)
522 if err != nil {
523 return fmt.Errorf("starting transaction: %w", err)
524 }
525 defer tx.Rollback()
526
527 if err := db.DeleteIssues(
528 tx,
529 evt.Did.String(),
530 evt.Rkey.String(),
531 ); err != nil {
532 return fmt.Errorf("deleting issue: %w", err)
533 }
534
535 if err := tx.Commit(); err != nil {
536 return fmt.Errorf("commiting issue delete: %w", err)
537 }
538 }
539 return nil
540}
541
542func (i *Ingester) ingestRepoIssueState(_ context.Context, evt *tapc.RecordEventData) error {
543 switch evt.Action {
544 case tapc.RecordCreateAction:
545 var record tangled.RepoIssueState
546 if err := json.Unmarshal(evt.Record, &record); err != nil {
547 return fmt.Errorf("parsing record json: %w", err)
548 }
549
550 // TODO: check permission
551
552 switch record.State {
553 case tangled.RepoIssueStateOpen:
554 if err := db.ReopenIssues(
555 i.db,
556 orm.FilterEq("at_uri", record.Issue),
557 ); err != nil {
558 return fmt.Errorf("opening issue: %w", err)
559 }
560 case tangled.RepoIssueStateClosed:
561 if err := db.CloseIssues(
562 i.db,
563 orm.FilterEq("at_uri", record.Issue),
564 ); err != nil {
565 return fmt.Errorf("closing issue: %w", err)
566 }
567 default:
568 return nil
569 }
570
571 // TODO: notify
572 case tapc.RecordUpdateAction:
573 // no-op
574 case tapc.RecordDeleteAction:
575 // no-op
576 }
577 return nil
578}
579
580func (i *Ingester) ingestRepo(ctx context.Context, evt *tapc.RecordEventData) error {
581 switch evt.Action {
582 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
583 var record tangled.Repo
584 if err := json.Unmarshal(evt.Record, &record); err != nil {
585 return fmt.Errorf("parsing record json: %w", err)
586 }
587
588 repo, err := models.RepoFromRecord(evt.Did, evt.Rkey, record)
589 if err != nil {
590 i.l.Warn("ignoring invalid repo record", "err", err)
591 return nil
592 }
593
594 tx, err := i.db.BeginTx(ctx, nil)
595 if err != nil {
596 return fmt.Errorf("starting transaction: %w", err)
597 }
598 defer tx.Rollback()
599
600 if err := db.UpsertRepo(tx, &repo); err != nil {
601 return fmt.Errorf("upserting repo: %w", err)
602 }
603
604 if err := tx.Commit(); err != nil {
605 return fmt.Errorf("commiting repo upsert: %w", err)
606 }
607 case tapc.RecordDeleteAction:
608 if err := db.RemoveRepo(i.db, evt.Did, evt.Rkey); err != nil {
609 return fmt.Errorf("deleting repo: %w", err)
610 }
611 }
612 return nil
613}
614
615func (i *Ingester) ingestRepoPullComment(ctx context.Context, evt *tapc.RecordEventData) error {
616 switch evt.Action {
617 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
618 var record tangled.RepoPullComment
619 if err := json.Unmarshal(evt.Record, &record); err != nil {
620 return fmt.Errorf("parsing record json: %w", err)
621 }
622
623 comment, err := models.PullCommentFromRecord(evt.Did, evt.Rkey, record)
624 if err != nil {
625 i.l.Warn("ignoring invalid issue.comment record", "err", err)
626 return nil
627 }
628
629 tx, err := i.db.BeginTx(ctx, nil)
630 if err != nil {
631 return fmt.Errorf("starting transaction: %w", err)
632 }
633 defer tx.Rollback()
634
635 if err := db.UpsertPullComment(tx, &comment); err != nil {
636 return fmt.Errorf("upserting pull comment: %w", err)
637 }
638
639 if err := tx.Commit(); err != nil {
640 return fmt.Errorf("commiting transaction: %w", err)
641 }
642
643 if evt.Action == tapc.RecordCreateAction {
644 i.notifier.NewPullComment(ctx, &comment, comment.Mentions)
645 }
646 case tapc.RecordDeleteAction:
647 if err := db.DeletePullComments(
648 i.db,
649 orm.FilterEq("did", evt.Did),
650 orm.FilterEq("rkey", evt.Rkey),
651 ); err != nil {
652 return fmt.Errorf("deleting pull comment: %w", err)
653 }
654 }
655 return nil
656}
657
658func (i *Ingester) ingestRepoPull(_ context.Context, evt *tapc.RecordEventData) error {
659 switch evt.Action {
660 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
661 var record tangled.RepoPull
662 if err := json.Unmarshal(evt.Record, &record); err != nil {
663 return fmt.Errorf("parsing record json: %w", err)
664 }
665 panic("unimplemented")
666 case tapc.RecordDeleteAction:
667 panic("unimplemented")
668 }
669 return nil
670}
671
672func (i *Ingester) ingestRepoPullStatus(_ context.Context, evt *tapc.RecordEventData) error {
673 switch evt.Action {
674 case tapc.RecordCreateAction:
675 var record tangled.RepoPullStatus
676 if err := json.Unmarshal(evt.Record, &record); err != nil {
677 return fmt.Errorf("parsing record json: %w", err)
678 }
679 panic("unimplemented")
680 case tapc.RecordUpdateAction:
681 // no-op
682 case tapc.RecordDeleteAction:
683 // no-op
684 }
685 return nil
686}
687
688func (i *Ingester) ingestSpindleMember(_ context.Context, evt *tapc.RecordEventData) error {
689 switch evt.Action {
690 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
691 var record tangled.SpindleMember
692 if err := json.Unmarshal(evt.Record, &record); err != nil {
693 return fmt.Errorf("parsing record json: %w", err)
694 }
695 // TODO: let's just remove the spindle.member record
696 panic("unimplemented")
697 case tapc.RecordDeleteAction:
698 panic("unimplemented")
699 }
700 return nil
701}
702
703func (i *Ingester) ingestSpindle(ctx context.Context, evt *tapc.RecordEventData) error {
704 switch evt.Action {
705 case tapc.RecordCreateAction:
706 var record tangled.Spindle
707 if err := json.Unmarshal(evt.Record, &record); err != nil {
708 return fmt.Errorf("parsing record json: %w", err)
709 }
710 instance := evt.Rkey.String()
711
712 if err := db.AddSpindle(i.db, models.Spindle{
713 Owner: evt.Did,
714 Instance: instance,
715 }); err != nil {
716 return fmt.Errorf("adding spindle: %w", err)
717 }
718
719 panic("unimplemented")
720 case tapc.RecordDeleteAction:
721 instance := evt.Rkey.String()
722
723 // get record from db first
724 spindle, err := func(instance string, did syntax.DID) (models.Spindle, error) {
725 spindles, err := db.GetSpindles(
726 i.db,
727 orm.FilterEq("owner", did),
728 orm.FilterEq("instance", instance),
729 )
730 if err != nil {
731 return models.Spindle{}, err
732 }
733 if len(spindles) != 1 {
734 return models.Spindle{}, fmt.Errorf("got incorret number of spindles: %d, expected 1", len(spindles))
735 }
736 return spindles[0], nil
737 }(instance, evt.Did)
738 if err != nil {
739 return fmt.Errorf("getting spindle: %w", err)
740 }
741
742 tx, err := i.db.BeginTx(ctx, nil)
743 if err != nil {
744 return fmt.Errorf("starting transaction: %w", err)
745 }
746 defer tx.Rollback()
747 // TODO: rollback enforcer
748
749 // remove spindle members first
750 if err := db.RemoveSpindleMember(
751 tx,
752 orm.FilterEq("owner", evt.Did),
753 orm.FilterEq("instance", instance),
754 ); err != nil {
755 return fmt.Errorf("deleting spindle members: %w", err)
756 }
757 if err := db.DeleteSpindle(
758 tx,
759 orm.FilterEq("owner", evt.Did),
760 orm.FilterEq("instance", instance),
761 ); err != nil {
762 return fmt.Errorf("deleting spindle: %w", err)
763 }
764
765 if spindle.Verified != nil {
766 // TODO: clear from enforcer
767 panic("unimplemented")
768 }
769
770 if err := tx.Commit(); err != nil {
771 return fmt.Errorf("commiting transaction: %w", err)
772 }
773 }
774 return nil
775}
776
777func (i *Ingester) ingestString(_ context.Context, evt *tapc.RecordEventData) error {
778 switch evt.Action {
779 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
780 var record tangled.String
781 if err := json.Unmarshal(evt.Record, &record); err != nil {
782 return fmt.Errorf("parsing record json: %w", err)
783 }
784
785 string := models.StringFromRecord(evt.Did.String(), evt.Rkey.String(), record)
786 if err := string.Validate(); err != nil {
787 i.l.Warn("invalid record", "err", err)
788 return nil
789 }
790
791 if err := db.UpsertString(i.db, string); err != nil {
792 return fmt.Errorf("upserting string: %w", err)
793 }
794
795 return nil
796 case tapc.RecordDeleteAction:
797 if err := db.DeleteString(
798 i.db,
799 orm.FilterEq("did", evt.Did),
800 orm.FilterEq("rkey", evt.Rkey),
801 ); err != nil {
802 return fmt.Errorf("deleting string: %w", err)
803 }
804
805 return nil
806 }
807 return nil
808}