A vibe coded tangled fork which supports pijul.
at sl/tap-appview 808 lines 23 kB view raw
1package ingester 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/api/tangled" 11 "tangled.org/core/appview/config" 12 "tangled.org/core/appview/db" 13 "tangled.org/core/appview/models" 14 "tangled.org/core/appview/notify" 15 "tangled.org/core/log" 16 "tangled.org/core/orm" 17 "tangled.org/core/rbac2" 18 "tangled.org/core/tapc" 19) 20 21type Ingester struct { 22 cfg *config.Config 23 db *db.DB 24 e *rbac2.Enforcer 25 notifier notify.Notifier 26 l *slog.Logger 27} 28 29// TODO: finish with rbac/v1 30// TODO: just don't notify state events for now (we need full object) 31 32func (i *Ingester) ProcessEvent(ctx context.Context, evt tapc.Event) error { 33 var err error 34 switch evt.Type { 35 case tapc.EvtRecord: 36 revt := evt.Record 37 ctx = log.IntoContext(ctx, i.l.With("record", revt.AtUri())) 38 // NOTE: sort by alphabetical order 39 switch revt.Collection.String() { 40 case tangled.ActorProfileNSID: 41 err = i.ingestActorProfile(ctx, revt) 42 case tangled.FeedReactionNSID: 43 err = i.ingestFeedReaction(ctx, revt) 44 case tangled.FeedStarNSID: 45 err = i.ingestFeedStar(ctx, revt) 46 case tangled.GraphFollowNSID: 47 err = i.ingestGraphFollow(ctx, revt) 48 case tangled.KnotMemberNSID: 49 err = i.ingestKnotMember(ctx, revt) 50 case tangled.KnotNSID: 51 err = i.ingestKnot(ctx, revt) 52 case tangled.LabelDefinitionNSID: 53 err = i.ingestLabelDefinition(ctx, revt) 54 case tangled.LabelOpNSID: 55 err = i.ingestLabelOp(ctx, revt) 56 case tangled.PublicKeyNSID: 57 err = i.ingestPublicKey(ctx, revt) 58 case tangled.RepoArtifactNSID: 59 err = i.ingestRepoArtifact(ctx, revt) 60 case tangled.RepoIssueCommentNSID: 61 err = i.ingestRepoIssueComment(ctx, revt) 62 case tangled.RepoIssueNSID: 63 err = i.ingestRepoIssue(ctx, revt) 64 case tangled.RepoIssueStateNSID: 65 err = i.ingestRepoIssueState(ctx, revt) 66 case tangled.RepoNSID: 67 err = i.ingestRepo(ctx, revt) 68 case tangled.RepoPullCommentNSID: 69 err = i.ingestRepoPullComment(ctx, revt) 70 case tangled.RepoPullNSID: 71 err = i.ingestRepoPull(ctx, revt) 72 case tangled.RepoPullStatusNSID: 73 err = i.ingestRepoPullStatus(ctx, revt) 74 case tangled.SpindleMemberNSID: 75 err = i.ingestSpindleMember(ctx, revt) 76 case tangled.SpindleNSID: 77 err = i.ingestSpindle(ctx, revt) 78 case tangled.StringNSID: 79 err = i.ingestString(ctx, revt) 80 } 81 case tapc.EvtIdentity: 82 // no-op 83 } 84 85 if err != nil { 86 i.l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 87 return err 88 } 89 return nil 90} 91 92func (i *Ingester) ingestActorProfile(ctx context.Context, evt *tapc.RecordEventData) error { 93 // ignore invalid rkey 94 if evt.Rkey.String() != "self" { 95 return nil 96 } 97 98 switch evt.Action { 99 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 100 var record tangled.ActorProfile 101 if err := json.Unmarshal(evt.Record, &record); err != nil { 102 return fmt.Errorf("parsing record json: %w", err) 103 } 104 105 profile, err := models.ProfileFromRecord(evt.Did, record) 106 if err != nil { 107 i.l.Warn("ignoring invalid profile record", "err", err) 108 return nil 109 } 110 111 tx, err := i.db.BeginTx(ctx, nil) 112 if err != nil { 113 return fmt.Errorf("starting transaction: %w", err) 114 } 115 defer tx.Rollback() 116 117 if err := db.UpsertProfile(tx, &profile); err != nil { 118 return fmt.Errorf("upserting profile: %w", err) 119 } 120 121 if err := tx.Commit(); err != nil { 122 return fmt.Errorf("commiting profile upsert: %w", err) 123 } 124 case tapc.RecordDeleteAction: 125 if err := db.DeleteProfile(i.db, evt.Did); err != nil { 126 return fmt.Errorf("deleting profile from db: %w", err) 127 } 128 } 129 return nil 130} 131 132func (i *Ingester) ingestFeedReaction(_ context.Context, evt *tapc.RecordEventData) error { 133 switch evt.Action { 134 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 135 var record tangled.FeedReaction 136 if err := json.Unmarshal(evt.Record, &record); err != nil { 137 return fmt.Errorf("parsing record json: %w", err) 138 } 139 140 reaction, err := models.ReactionFromRecord(evt.Did, evt.Rkey, record) 141 if err != nil { 142 i.l.Warn("ignoring invalid reaction record", "err", err) 143 return nil 144 } 145 146 if err := db.UpsertReaction(i.db, reaction); err != nil { 147 return fmt.Errorf("upserting reaction record") 148 } 149 case tapc.RecordDeleteAction: 150 if err := db.DeleteReactionByRkey( 151 i.db, 152 evt.Did.String(), 153 evt.Rkey.String(), 154 ); err != nil { 155 return fmt.Errorf("deleting reaction from db: %w", err) 156 } 157 } 158 return nil 159} 160 161func (i *Ingester) ingestFeedStar(_ context.Context, evt *tapc.RecordEventData) error { 162 switch evt.Action { 163 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 164 var record tangled.FeedStar 165 if err := json.Unmarshal(evt.Record, &record); err != nil { 166 return fmt.Errorf("parsing record json: %w", err) 167 } 168 169 star, err := models.StarFromRecord(evt.Did, evt.Rkey, record) 170 if err != nil { 171 i.l.Warn("ignoring invalid star record", "err", err) 172 return nil 173 } 174 175 if err := db.UpsertStar(i.db, star); err != nil { 176 return fmt.Errorf("upserting star record") 177 } 178 case tapc.RecordDeleteAction: 179 if err := db.DeleteStarByRkey( 180 i.db, 181 evt.Did.String(), 182 evt.Rkey.String(), 183 ); err != nil { 184 return fmt.Errorf("deleting record from db: %w", err) 185 } 186 } 187 return nil 188} 189 190func (i *Ingester) ingestGraphFollow(_ context.Context, evt *tapc.RecordEventData) error { 191 switch evt.Action { 192 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 193 var record tangled.GraphFollow 194 if err := json.Unmarshal(evt.Record, &record); err != nil { 195 return fmt.Errorf("parsing record json: %w", err) 196 } 197 198 follow, err := models.FollowFromRecord(evt.Did, evt.Rkey, record) 199 if err != nil { 200 i.l.Warn("ignoring invalid follow record", "err", err) 201 return nil 202 } 203 204 if err := db.UpsertFollow(i.db, follow); err != nil { 205 return fmt.Errorf("upserting follow record") 206 } 207 case tapc.RecordDeleteAction: 208 if err := db.DeleteFollowByRkey( 209 i.db, 210 evt.Did.String(), 211 evt.Rkey.String(), 212 ); err != nil { 213 return fmt.Errorf("deleting record from db: %w", err) 214 } 215 } 216 return nil 217} 218 219// TODO: let's just remove the knot.member record 220func (i *Ingester) ingestKnotMember(_ context.Context, evt *tapc.RecordEventData) error { 221 switch evt.Action { 222 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 223 var record tangled.KnotMember 224 if err := json.Unmarshal(evt.Record, &record); err != nil { 225 return fmt.Errorf("parsing record json: %w", err) 226 } 227 panic("unimplemented") 228 case tapc.RecordDeleteAction: 229 panic("unimplemented") 230 } 231 return nil 232} 233 234func (i *Ingester) ingestKnot(ctx context.Context, evt *tapc.RecordEventData) error { 235 switch evt.Action { 236 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 237 var record tangled.Knot 238 if err := json.Unmarshal(evt.Record, &record); err != nil { 239 return fmt.Errorf("parsing record json: %w", err) 240 } 241 242 domain := evt.Rkey.String() 243 244 if err := db.AddKnot(i.db, domain, evt.Did.String()); err != nil { 245 return fmt.Errorf("upserting knot: %w", err) 246 } 247 248 // TODO: hmmm should we run verification here? 249 // There can be unverified knot in user profile. 250 panic("unimplemented") 251 case tapc.RecordDeleteAction: 252 domain := evt.Rkey.String() 253 254 // get record from db first 255 registration, err := func(domain string, did syntax.DID) (models.Registration, error) { 256 registrations, err := db.GetRegistrations( 257 i.db, 258 orm.FilterEq("domain", domain), 259 orm.FilterEq("did", evt.Did), 260 ) 261 if err != nil { 262 return models.Registration{}, err 263 } 264 if len(registrations) != 1 { 265 return models.Registration{}, fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 266 } 267 return registrations[0], nil 268 }(domain, evt.Did) 269 if err != nil { 270 return fmt.Errorf("getting registration: %w", err) 271 } 272 273 tx, err := i.db.BeginTx(ctx, nil) 274 if err != nil { 275 return fmt.Errorf("starting transaction: %w", err) 276 } 277 defer tx.Rollback() 278 // TODO: rollback enforcer 279 280 if err := db.DeleteKnot( 281 tx, 282 orm.FilterEq("did", evt.Did), 283 orm.FilterEq("domain", domain), 284 ); err != nil { 285 return fmt.Errorf("deleting knot: %w", err) 286 } 287 288 if registration.Registered != nil { 289 // TODO: clear from enforcer 290 panic("unimplemented") 291 } 292 293 if err := tx.Commit(); err != nil { 294 return fmt.Errorf("commiting transaction: %w", err) 295 } 296 } 297 return nil 298} 299 300func (i *Ingester) ingestLabelDefinition(_ context.Context, evt *tapc.RecordEventData) error { 301 switch evt.Action { 302 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 303 var record tangled.LabelDefinition 304 if err := json.Unmarshal(evt.Record, &record); err != nil { 305 return fmt.Errorf("parsing record json: %w", err) 306 } 307 308 def, err := models.LabelDefinitionFromRecord(evt.Did.String(), evt.Rkey.String(), record) 309 if err != nil { 310 return fmt.Errorf("failed to parse labeldef from record: %w", err) 311 } 312 313 if err := def.Validate(); err != nil { 314 i.l.Warn("ignoring invalid label def record", "err", err) 315 return nil 316 } 317 318 if _, err := db.UpsertLabelDefinition(i.db, def); err != nil { 319 return fmt.Errorf("upserting label definition") 320 } 321 case tapc.RecordDeleteAction: 322 if err := db.DeleteLabelDefinition( 323 i.db, 324 orm.FilterEq("did", evt.Did), 325 orm.FilterEq("rkey", evt.Rkey), 326 ); err != nil { 327 return fmt.Errorf("deleting record from db: %w", err) 328 } 329 } 330 return nil 331} 332 333// TODO: label.op record is not designed to be mutable. should be reimplemented 334func (i *Ingester) ingestLabelOp(ctx context.Context, evt *tapc.RecordEventData) error { 335 switch evt.Action { 336 case tapc.RecordCreateAction: 337 var record tangled.LabelOp 338 if err := json.Unmarshal(evt.Record, &record); err != nil { 339 return fmt.Errorf("parsing record json: %w", err) 340 } 341 342 // TODO: 343 // 1. validate permissions 344 // 2. labelOp.Subject -> (subject).Repo 345 // 3. get all label definition for that repo, constructing actx 346 ops := models.LabelOpsFromRecord(evt.Did.String(), evt.Rkey.String(), record) 347 for _, o := range ops { 348 // 4. find label def based on o.OperandKey (AT-URI to the label definition) 349 // 5. validate labelOp from def 350 panic("unimplemented") 351 } 352 353 tx, err := i.db.BeginTx(ctx, nil) 354 if err != nil { 355 return fmt.Errorf("starting transaction: %w", err) 356 } 357 defer tx.Rollback() 358 359 for _, o := range ops { 360 _, err := db.AddLabelOp(tx, &o) 361 if err != nil { 362 return fmt.Errorf("adding label op: %w", err) 363 } 364 } 365 366 if err := tx.Commit(); err != nil { 367 return fmt.Errorf("commiting transaction: %w", err) 368 } 369 case tapc.RecordUpdateAction: 370 // no-op. we are ignoring update action for label.op records 371 case tapc.RecordDeleteAction: 372 // no-op. we are ignoring delete action for label.op records 373 } 374 return nil 375} 376 377func (i *Ingester) ingestPublicKey(_ context.Context, evt *tapc.RecordEventData) error { 378 switch evt.Action { 379 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 380 var record tangled.PublicKey 381 if err := json.Unmarshal(evt.Record, &record); err != nil { 382 return fmt.Errorf("parsing record json: %w", err) 383 } 384 385 pubKey, err := models.PublicKeyFromRecord(evt.Did, evt.Rkey, record) 386 if err != nil { 387 i.l.Warn("ignoring invalid publicKey record", "err", err) 388 return nil 389 } 390 if err := pubKey.Validate(); err != nil { 391 i.l.Warn("ignoring invalid publicKey record", "err", err) 392 return nil 393 } 394 395 if err := db.UpsertPublicKey(i.db, pubKey); err != nil { 396 return fmt.Errorf("upserting publicKey record") 397 } 398 case tapc.RecordDeleteAction: 399 if err := db.DeletePublicKeyByRkey( 400 i.db, 401 evt.Did.String(), 402 evt.Rkey.String(), 403 ); err != nil { 404 return fmt.Errorf("deleting record from db: %w", err) 405 } 406 } 407 return nil 408} 409 410// so this is one of the reasons why we need repo-DID 411// and possibly its own PDS. 412// for things need permission like repo artifact, even its not collaborative, 413// we need to pass the Knot to check the authority. 414// We cannot really distribute the arbitrary RBAC rules in meaningful way. 415// That's pretty hard and fragile. 416// Instead, when any data is belongs to the repo (no matter who created it), it should be owned 417// by the repo. I mean the actual data should be. 418// If someone removed the original artifact from their PDS, or if their PDS goes down, we lost 419// a way to backfill relateed data to construct the repo view. 420 421func (i *Ingester) ingestRepoArtifact(_ context.Context, evt *tapc.RecordEventData) error { 422 switch evt.Action { 423 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 424 var record tangled.RepoArtifact 425 if err := json.Unmarshal(evt.Record, &record); err != nil { 426 return fmt.Errorf("parsing record json: %w", err) 427 } 428 429 artifact, err := models.ArtifactFromRecord(evt.Did, evt.Rkey, record) 430 if err != nil { 431 i.l.Warn("ignoring invalid artifact record", "err", err) 432 return nil 433 } 434 435 if err := db.UpsertArtifact(i.db, artifact); err != nil { 436 return fmt.Errorf("upserting artifact: %w", err) 437 } 438 case tapc.RecordDeleteAction: 439 if err := db.DeleteArtifact( 440 i.db, 441 orm.FilterEq("did", evt.Did), 442 orm.FilterEq("rkey", evt.Rkey), 443 ); err != nil { 444 return fmt.Errorf("deleting record from db: %w", err) 445 } 446 } 447 return nil 448} 449 450func (i *Ingester) ingestRepoIssueComment(ctx context.Context, evt *tapc.RecordEventData) error { 451 switch evt.Action { 452 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 453 var record tangled.RepoIssueComment 454 if err := json.Unmarshal(evt.Record, &record); err != nil { 455 return fmt.Errorf("parsing record json: %w", err) 456 } 457 458 comment, err := models.IssueCommentFromRecord(evt.Did.String(), evt.Rkey.String(), record) 459 if err != nil { 460 i.l.Warn("ignoring invalid issue.comment record", "err", err) 461 return nil 462 } 463 464 tx, err := i.db.BeginTx(ctx, nil) 465 if err != nil { 466 return fmt.Errorf("starting transaction: %w", err) 467 } 468 defer tx.Rollback() 469 470 if _, err = db.UpsertIssueComment(tx, *comment); err != nil { 471 return fmt.Errorf("upserting issue comment: %w", err) 472 } 473 474 if err := tx.Commit(); err != nil { 475 return fmt.Errorf("commiting transaction: %w", err) 476 } 477 case tapc.RecordDeleteAction: 478 if err := db.DeleteIssueComments( 479 i.db, 480 orm.FilterEq("did", evt.Did), 481 orm.FilterEq("rkey", evt.Rkey), 482 ); err != nil { 483 return fmt.Errorf("deleting issue comment: %w", err) 484 } 485 } 486 return nil 487} 488 489func (i *Ingester) ingestRepoIssue(ctx context.Context, evt *tapc.RecordEventData) error { 490 switch evt.Action { 491 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 492 var record tangled.RepoIssue 493 if err := json.Unmarshal(evt.Record, &record); err != nil { 494 return fmt.Errorf("parsing record json: %w", err) 495 } 496 497 issue := models.IssueFromRecord(evt.Did.String(), evt.Rkey.String(), record) 498 if err := issue.Validate(); err != nil { 499 i.l.Warn("ignoring invalid issue record", "err", err) 500 return nil 501 } 502 503 tx, err := i.db.BeginTx(ctx, nil) 504 if err != nil { 505 return fmt.Errorf("starting transaction: %w", err) 506 } 507 defer tx.Rollback() 508 509 if err := db.PutIssue(tx, &issue); err != nil { 510 return fmt.Errorf("upserting issue: %w", err) 511 } 512 513 if err := tx.Commit(); err != nil { 514 return fmt.Errorf("commiting issue upsert: %w", err) 515 } 516 517 if evt.Action == tapc.RecordCreateAction { 518 i.notifier.NewIssue(ctx, &issue, issue.Mentions) 519 } 520 case tapc.RecordDeleteAction: 521 tx, err := i.db.BeginTx(ctx, nil) 522 if err != nil { 523 return fmt.Errorf("starting transaction: %w", err) 524 } 525 defer tx.Rollback() 526 527 if err := db.DeleteIssues( 528 tx, 529 evt.Did.String(), 530 evt.Rkey.String(), 531 ); err != nil { 532 return fmt.Errorf("deleting issue: %w", err) 533 } 534 535 if err := tx.Commit(); err != nil { 536 return fmt.Errorf("commiting issue delete: %w", err) 537 } 538 } 539 return nil 540} 541 542func (i *Ingester) ingestRepoIssueState(_ context.Context, evt *tapc.RecordEventData) error { 543 switch evt.Action { 544 case tapc.RecordCreateAction: 545 var record tangled.RepoIssueState 546 if err := json.Unmarshal(evt.Record, &record); err != nil { 547 return fmt.Errorf("parsing record json: %w", err) 548 } 549 550 // TODO: check permission 551 552 switch record.State { 553 case tangled.RepoIssueStateOpen: 554 if err := db.ReopenIssues( 555 i.db, 556 orm.FilterEq("at_uri", record.Issue), 557 ); err != nil { 558 return fmt.Errorf("opening issue: %w", err) 559 } 560 case tangled.RepoIssueStateClosed: 561 if err := db.CloseIssues( 562 i.db, 563 orm.FilterEq("at_uri", record.Issue), 564 ); err != nil { 565 return fmt.Errorf("closing issue: %w", err) 566 } 567 default: 568 return nil 569 } 570 571 // TODO: notify 572 case tapc.RecordUpdateAction: 573 // no-op 574 case tapc.RecordDeleteAction: 575 // no-op 576 } 577 return nil 578} 579 580func (i *Ingester) ingestRepo(ctx context.Context, evt *tapc.RecordEventData) error { 581 switch evt.Action { 582 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 583 var record tangled.Repo 584 if err := json.Unmarshal(evt.Record, &record); err != nil { 585 return fmt.Errorf("parsing record json: %w", err) 586 } 587 588 repo, err := models.RepoFromRecord(evt.Did, evt.Rkey, record) 589 if err != nil { 590 i.l.Warn("ignoring invalid repo record", "err", err) 591 return nil 592 } 593 594 tx, err := i.db.BeginTx(ctx, nil) 595 if err != nil { 596 return fmt.Errorf("starting transaction: %w", err) 597 } 598 defer tx.Rollback() 599 600 if err := db.UpsertRepo(tx, &repo); err != nil { 601 return fmt.Errorf("upserting repo: %w", err) 602 } 603 604 if err := tx.Commit(); err != nil { 605 return fmt.Errorf("commiting repo upsert: %w", err) 606 } 607 case tapc.RecordDeleteAction: 608 if err := db.RemoveRepo(i.db, evt.Did, evt.Rkey); err != nil { 609 return fmt.Errorf("deleting repo: %w", err) 610 } 611 } 612 return nil 613} 614 615func (i *Ingester) ingestRepoPullComment(ctx context.Context, evt *tapc.RecordEventData) error { 616 switch evt.Action { 617 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 618 var record tangled.RepoPullComment 619 if err := json.Unmarshal(evt.Record, &record); err != nil { 620 return fmt.Errorf("parsing record json: %w", err) 621 } 622 623 comment, err := models.PullCommentFromRecord(evt.Did, evt.Rkey, record) 624 if err != nil { 625 i.l.Warn("ignoring invalid issue.comment record", "err", err) 626 return nil 627 } 628 629 tx, err := i.db.BeginTx(ctx, nil) 630 if err != nil { 631 return fmt.Errorf("starting transaction: %w", err) 632 } 633 defer tx.Rollback() 634 635 if err := db.UpsertPullComment(tx, &comment); err != nil { 636 return fmt.Errorf("upserting pull comment: %w", err) 637 } 638 639 if err := tx.Commit(); err != nil { 640 return fmt.Errorf("commiting transaction: %w", err) 641 } 642 643 if evt.Action == tapc.RecordCreateAction { 644 i.notifier.NewPullComment(ctx, &comment, comment.Mentions) 645 } 646 case tapc.RecordDeleteAction: 647 if err := db.DeletePullComments( 648 i.db, 649 orm.FilterEq("did", evt.Did), 650 orm.FilterEq("rkey", evt.Rkey), 651 ); err != nil { 652 return fmt.Errorf("deleting pull comment: %w", err) 653 } 654 } 655 return nil 656} 657 658func (i *Ingester) ingestRepoPull(_ context.Context, evt *tapc.RecordEventData) error { 659 switch evt.Action { 660 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 661 var record tangled.RepoPull 662 if err := json.Unmarshal(evt.Record, &record); err != nil { 663 return fmt.Errorf("parsing record json: %w", err) 664 } 665 panic("unimplemented") 666 case tapc.RecordDeleteAction: 667 panic("unimplemented") 668 } 669 return nil 670} 671 672func (i *Ingester) ingestRepoPullStatus(_ context.Context, evt *tapc.RecordEventData) error { 673 switch evt.Action { 674 case tapc.RecordCreateAction: 675 var record tangled.RepoPullStatus 676 if err := json.Unmarshal(evt.Record, &record); err != nil { 677 return fmt.Errorf("parsing record json: %w", err) 678 } 679 panic("unimplemented") 680 case tapc.RecordUpdateAction: 681 // no-op 682 case tapc.RecordDeleteAction: 683 // no-op 684 } 685 return nil 686} 687 688func (i *Ingester) ingestSpindleMember(_ context.Context, evt *tapc.RecordEventData) error { 689 switch evt.Action { 690 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 691 var record tangled.SpindleMember 692 if err := json.Unmarshal(evt.Record, &record); err != nil { 693 return fmt.Errorf("parsing record json: %w", err) 694 } 695 // TODO: let's just remove the spindle.member record 696 panic("unimplemented") 697 case tapc.RecordDeleteAction: 698 panic("unimplemented") 699 } 700 return nil 701} 702 703func (i *Ingester) ingestSpindle(ctx context.Context, evt *tapc.RecordEventData) error { 704 switch evt.Action { 705 case tapc.RecordCreateAction: 706 var record tangled.Spindle 707 if err := json.Unmarshal(evt.Record, &record); err != nil { 708 return fmt.Errorf("parsing record json: %w", err) 709 } 710 instance := evt.Rkey.String() 711 712 if err := db.AddSpindle(i.db, models.Spindle{ 713 Owner: evt.Did, 714 Instance: instance, 715 }); err != nil { 716 return fmt.Errorf("adding spindle: %w", err) 717 } 718 719 panic("unimplemented") 720 case tapc.RecordDeleteAction: 721 instance := evt.Rkey.String() 722 723 // get record from db first 724 spindle, err := func(instance string, did syntax.DID) (models.Spindle, error) { 725 spindles, err := db.GetSpindles( 726 i.db, 727 orm.FilterEq("owner", did), 728 orm.FilterEq("instance", instance), 729 ) 730 if err != nil { 731 return models.Spindle{}, err 732 } 733 if len(spindles) != 1 { 734 return models.Spindle{}, fmt.Errorf("got incorret number of spindles: %d, expected 1", len(spindles)) 735 } 736 return spindles[0], nil 737 }(instance, evt.Did) 738 if err != nil { 739 return fmt.Errorf("getting spindle: %w", err) 740 } 741 742 tx, err := i.db.BeginTx(ctx, nil) 743 if err != nil { 744 return fmt.Errorf("starting transaction: %w", err) 745 } 746 defer tx.Rollback() 747 // TODO: rollback enforcer 748 749 // remove spindle members first 750 if err := db.RemoveSpindleMember( 751 tx, 752 orm.FilterEq("owner", evt.Did), 753 orm.FilterEq("instance", instance), 754 ); err != nil { 755 return fmt.Errorf("deleting spindle members: %w", err) 756 } 757 if err := db.DeleteSpindle( 758 tx, 759 orm.FilterEq("owner", evt.Did), 760 orm.FilterEq("instance", instance), 761 ); err != nil { 762 return fmt.Errorf("deleting spindle: %w", err) 763 } 764 765 if spindle.Verified != nil { 766 // TODO: clear from enforcer 767 panic("unimplemented") 768 } 769 770 if err := tx.Commit(); err != nil { 771 return fmt.Errorf("commiting transaction: %w", err) 772 } 773 } 774 return nil 775} 776 777func (i *Ingester) ingestString(_ context.Context, evt *tapc.RecordEventData) error { 778 switch evt.Action { 779 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 780 var record tangled.String 781 if err := json.Unmarshal(evt.Record, &record); err != nil { 782 return fmt.Errorf("parsing record json: %w", err) 783 } 784 785 string := models.StringFromRecord(evt.Did.String(), evt.Rkey.String(), record) 786 if err := string.Validate(); err != nil { 787 i.l.Warn("invalid record", "err", err) 788 return nil 789 } 790 791 if err := db.UpsertString(i.db, string); err != nil { 792 return fmt.Errorf("upserting string: %w", err) 793 } 794 795 return nil 796 case tapc.RecordDeleteAction: 797 if err := db.DeleteString( 798 i.db, 799 orm.FilterEq("did", evt.Did), 800 orm.FilterEq("rkey", evt.Rkey), 801 ); err != nil { 802 return fmt.Errorf("deleting string: %w", err) 803 } 804 805 return nil 806 } 807 return nil 808}