A vibe coded tangled fork which supports pijul.
at 2a2718a4a548236a0c704a596fd836f263e9740d 1208 lines 28 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log/slog" 9 "maps" 10 "net/http" 11 "net/url" 12 "slices" 13 "sync" 14 15 "time" 16 17 "github.com/bluesky-social/indigo/atproto/syntax" 18 jmodels "github.com/bluesky-social/jetstream/pkg/models" 19 "github.com/go-git/go-git/v5/plumbing" 20 "github.com/ipfs/go-cid" 21 "golang.org/x/sync/errgroup" 22 "tangled.org/core/api/tangled" 23 "tangled.org/core/appview/config" 24 "tangled.org/core/appview/db" 25 "tangled.org/core/appview/models" 26 "tangled.org/core/appview/serververify" 27 "tangled.org/core/appview/validator" 28 "tangled.org/core/idresolver" 29 "tangled.org/core/orm" 30 "tangled.org/core/rbac" 31) 32 33type Ingester struct { 34 Db db.DbWrapper 35 Enforcer *rbac.Enforcer 36 IdResolver *idresolver.Resolver 37 Config *config.Config 38 Logger *slog.Logger 39 Validator *validator.Validator 40} 41 42type processFunc func(ctx context.Context, e *jmodels.Event) error 43 44func (i *Ingester) Ingest() processFunc { 45 return func(ctx context.Context, e *jmodels.Event) error { 46 var err error 47 defer func() { 48 eventTime := e.TimeUS 49 lastTimeUs := eventTime + 1 50 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 51 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 52 } 53 }() 54 55 l := i.Logger.With("kind", e.Kind) 56 switch e.Kind { 57 case jmodels.EventKindAccount: 58 if !e.Account.Active && *e.Account.Status == "deactivated" { 59 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 60 } 61 case jmodels.EventKindIdentity: 62 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 63 case jmodels.EventKindCommit: 64 switch e.Commit.Collection { 65 case tangled.GraphFollowNSID: 66 err = i.ingestFollow(e) 67 case tangled.FeedStarNSID: 68 err = i.ingestStar(e) 69 case tangled.PublicKeyNSID: 70 err = i.ingestPublicKey(e) 71 case tangled.RepoArtifactNSID: 72 err = i.ingestArtifact(e) 73 case tangled.ActorProfileNSID: 74 err = i.ingestProfile(e) 75 case tangled.SpindleMemberNSID: 76 err = i.ingestSpindleMember(ctx, e) 77 case tangled.SpindleNSID: 78 err = i.ingestSpindle(ctx, e) 79 case tangled.KnotMemberNSID: 80 err = i.ingestKnotMember(e) 81 case tangled.KnotNSID: 82 err = i.ingestKnot(e) 83 case tangled.StringNSID: 84 err = i.ingestString(e) 85 case tangled.RepoIssueNSID: 86 err = i.ingestIssue(ctx, e) 87 case tangled.RepoPullNSID: 88 err = i.ingestPull(ctx, e) 89 case tangled.RepoIssueCommentNSID: 90 err = i.ingestIssueComment(e) 91 case tangled.LabelDefinitionNSID: 92 err = i.ingestLabelDefinition(e) 93 case tangled.LabelOpNSID: 94 err = i.ingestLabelOp(e) 95 } 96 l = i.Logger.With("nsid", e.Commit.Collection) 97 } 98 99 if err != nil { 100 l.Warn("refused to ingest record", "err", err) 101 } 102 103 return nil 104 } 105} 106 107func (i *Ingester) ingestStar(e *jmodels.Event) error { 108 var err error 109 did := e.Did 110 111 l := i.Logger.With("handler", "ingestStar") 112 l = l.With("nsid", e.Commit.Collection) 113 114 switch e.Commit.Operation { 115 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 116 var subjectUri syntax.ATURI 117 118 raw := json.RawMessage(e.Commit.Record) 119 record := tangled.FeedStar{} 120 err := json.Unmarshal(raw, &record) 121 if err != nil { 122 l.Error("invalid record", "err", err) 123 return err 124 } 125 126 subjectUri, err = syntax.ParseATURI(record.Subject) 127 if err != nil { 128 l.Error("invalid record", "err", err) 129 return err 130 } 131 err = db.AddStar(i.Db, &models.Star{ 132 Did: did, 133 RepoAt: subjectUri, 134 Rkey: e.Commit.RKey, 135 }) 136 case jmodels.CommitOperationDelete: 137 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 138 } 139 140 if err != nil { 141 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 142 } 143 144 return nil 145} 146 147func (i *Ingester) ingestFollow(e *jmodels.Event) error { 148 var err error 149 did := e.Did 150 151 l := i.Logger.With("handler", "ingestFollow") 152 l = l.With("nsid", e.Commit.Collection) 153 154 switch e.Commit.Operation { 155 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 156 raw := json.RawMessage(e.Commit.Record) 157 record := tangled.GraphFollow{} 158 err = json.Unmarshal(raw, &record) 159 if err != nil { 160 l.Error("invalid record", "err", err) 161 return err 162 } 163 164 err = db.AddFollow(i.Db, &models.Follow{ 165 UserDid: did, 166 SubjectDid: record.Subject, 167 Rkey: e.Commit.RKey, 168 }) 169 case jmodels.CommitOperationDelete: 170 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 171 } 172 173 if err != nil { 174 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 175 } 176 177 return nil 178} 179 180func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 181 did := e.Did 182 var err error 183 184 l := i.Logger.With("handler", "ingestPublicKey") 185 l = l.With("nsid", e.Commit.Collection) 186 187 switch e.Commit.Operation { 188 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 189 l.Debug("processing add of pubkey") 190 raw := json.RawMessage(e.Commit.Record) 191 record := tangled.PublicKey{} 192 err = json.Unmarshal(raw, &record) 193 if err != nil { 194 l.Error("invalid record", "err", err) 195 return err 196 } 197 198 name := record.Name 199 key := record.Key 200 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 201 case jmodels.CommitOperationDelete: 202 l.Debug("processing delete of pubkey") 203 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 204 } 205 206 if err != nil { 207 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 208 } 209 210 return nil 211} 212 213func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 214 did := e.Did 215 var err error 216 217 l := i.Logger.With("handler", "ingestArtifact") 218 l = l.With("nsid", e.Commit.Collection) 219 220 switch e.Commit.Operation { 221 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 222 raw := json.RawMessage(e.Commit.Record) 223 record := tangled.RepoArtifact{} 224 err = json.Unmarshal(raw, &record) 225 if err != nil { 226 l.Error("invalid record", "err", err) 227 return err 228 } 229 230 repoAt, err := syntax.ParseATURI(record.Repo) 231 if err != nil { 232 return err 233 } 234 235 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 236 if err != nil { 237 return err 238 } 239 240 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 241 if err != nil || !ok { 242 return err 243 } 244 245 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 246 if err != nil { 247 createdAt = time.Now() 248 } 249 250 artifact := models.Artifact{ 251 Did: did, 252 Rkey: e.Commit.RKey, 253 RepoAt: repoAt, 254 Tag: plumbing.Hash(record.Tag), 255 CreatedAt: createdAt, 256 BlobCid: cid.Cid(record.Artifact.Ref), 257 Name: record.Name, 258 Size: uint64(record.Artifact.Size), 259 MimeType: record.Artifact.MimeType, 260 } 261 262 err = db.AddArtifact(i.Db, artifact) 263 case jmodels.CommitOperationDelete: 264 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 265 } 266 267 if err != nil { 268 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 269 } 270 271 return nil 272} 273 274func (i *Ingester) ingestProfile(e *jmodels.Event) error { 275 did := e.Did 276 var err error 277 278 l := i.Logger.With("handler", "ingestProfile") 279 l = l.With("nsid", e.Commit.Collection) 280 281 if e.Commit.RKey != "self" { 282 return fmt.Errorf("ingestProfile only ingests `self` record") 283 } 284 285 switch e.Commit.Operation { 286 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 287 raw := json.RawMessage(e.Commit.Record) 288 record := tangled.ActorProfile{} 289 err = json.Unmarshal(raw, &record) 290 if err != nil { 291 l.Error("invalid record", "err", err) 292 return err 293 } 294 295 avatar := "" 296 if record.Avatar != nil { 297 avatar = record.Avatar.Ref.String() 298 } 299 300 description := "" 301 if record.Description != nil { 302 description = *record.Description 303 } 304 305 includeBluesky := record.Bluesky 306 307 pronouns := "" 308 if record.Pronouns != nil { 309 pronouns = *record.Pronouns 310 } 311 312 location := "" 313 if record.Location != nil { 314 location = *record.Location 315 } 316 317 var links [5]string 318 for i, l := range record.Links { 319 if i < 5 { 320 links[i] = l 321 } 322 } 323 324 var stats [2]models.VanityStat 325 for i, s := range record.Stats { 326 if i < 2 { 327 stats[i].Kind = models.ParseVanityStatKind(s) 328 } 329 } 330 331 var pinned [6]syntax.ATURI 332 for i, r := range record.PinnedRepositories { 333 if i < 6 { 334 pinned[i] = syntax.ATURI(r) 335 } 336 } 337 338 profile := models.Profile{ 339 Did: did, 340 Avatar: avatar, 341 Description: description, 342 IncludeBluesky: includeBluesky, 343 Location: location, 344 Links: links, 345 Stats: stats, 346 PinnedRepos: pinned, 347 Pronouns: pronouns, 348 } 349 350 ddb, ok := i.Db.Execer.(*db.DB) 351 if !ok { 352 return fmt.Errorf("failed to index profile record, invalid db cast") 353 } 354 355 tx, err := ddb.Begin() 356 if err != nil { 357 return fmt.Errorf("failed to start transaction") 358 } 359 360 err = db.ValidateProfile(tx, &profile) 361 if err != nil { 362 return fmt.Errorf("invalid profile record") 363 } 364 365 err = db.UpsertProfile(tx, &profile) 366 case jmodels.CommitOperationDelete: 367 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 368 } 369 370 if err != nil { 371 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 372 } 373 374 return nil 375} 376 377func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 378 did := e.Did 379 var err error 380 381 l := i.Logger.With("handler", "ingestSpindleMember") 382 l = l.With("nsid", e.Commit.Collection) 383 384 switch e.Commit.Operation { 385 case jmodels.CommitOperationCreate: 386 raw := json.RawMessage(e.Commit.Record) 387 record := tangled.SpindleMember{} 388 err = json.Unmarshal(raw, &record) 389 if err != nil { 390 l.Error("invalid record", "err", err) 391 return err 392 } 393 394 // only spindle owner can invite to spindles 395 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 396 if err != nil || !ok { 397 return fmt.Errorf("failed to enforce permissions: %w", err) 398 } 399 400 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 401 if err != nil { 402 return err 403 } 404 405 if memberId.Handle.IsInvalidHandle() { 406 return err 407 } 408 409 ddb, ok := i.Db.Execer.(*db.DB) 410 if !ok { 411 return fmt.Errorf("invalid db cast") 412 } 413 414 err = db.AddSpindleMember(ddb, models.SpindleMember{ 415 Did: syntax.DID(did), 416 Rkey: e.Commit.RKey, 417 Instance: record.Instance, 418 Subject: memberId.DID, 419 }) 420 if !ok { 421 return fmt.Errorf("failed to add to db: %w", err) 422 } 423 424 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 425 if err != nil { 426 return fmt.Errorf("failed to update ACLs: %w", err) 427 } 428 429 l.Info("added spindle member") 430 case jmodels.CommitOperationDelete: 431 rkey := e.Commit.RKey 432 433 ddb, ok := i.Db.Execer.(*db.DB) 434 if !ok { 435 return fmt.Errorf("failed to index profile record, invalid db cast") 436 } 437 438 // get record from db first 439 members, err := db.GetSpindleMembers( 440 ddb, 441 orm.FilterEq("did", did), 442 orm.FilterEq("rkey", rkey), 443 ) 444 if err != nil || len(members) != 1 { 445 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 446 } 447 member := members[0] 448 449 tx, err := ddb.Begin() 450 if err != nil { 451 return fmt.Errorf("failed to start txn: %w", err) 452 } 453 454 // remove record by rkey && update enforcer 455 if err = db.RemoveSpindleMember( 456 tx, 457 orm.FilterEq("did", did), 458 orm.FilterEq("rkey", rkey), 459 ); err != nil { 460 return fmt.Errorf("failed to remove from db: %w", err) 461 } 462 463 // update enforcer 464 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 465 if err != nil { 466 return fmt.Errorf("failed to update ACLs: %w", err) 467 } 468 469 if err = tx.Commit(); err != nil { 470 return fmt.Errorf("failed to commit txn: %w", err) 471 } 472 473 if err = i.Enforcer.E.SavePolicy(); err != nil { 474 return fmt.Errorf("failed to save ACLs: %w", err) 475 } 476 477 l.Info("removed spindle member") 478 } 479 480 return nil 481} 482 483func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 484 did := e.Did 485 var err error 486 487 l := i.Logger.With("handler", "ingestSpindle") 488 l = l.With("nsid", e.Commit.Collection) 489 490 switch e.Commit.Operation { 491 case jmodels.CommitOperationCreate: 492 raw := json.RawMessage(e.Commit.Record) 493 record := tangled.Spindle{} 494 err = json.Unmarshal(raw, &record) 495 if err != nil { 496 l.Error("invalid record", "err", err) 497 return err 498 } 499 500 instance := e.Commit.RKey 501 502 ddb, ok := i.Db.Execer.(*db.DB) 503 if !ok { 504 return fmt.Errorf("failed to index profile record, invalid db cast") 505 } 506 507 err := db.AddSpindle(ddb, models.Spindle{ 508 Owner: syntax.DID(did), 509 Instance: instance, 510 }) 511 if err != nil { 512 l.Error("failed to add spindle to db", "err", err, "instance", instance) 513 return err 514 } 515 516 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 517 if err != nil { 518 l.Error("failed to add spindle to db", "err", err, "instance", instance) 519 return err 520 } 521 522 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 523 if err != nil { 524 return fmt.Errorf("failed to mark verified: %w", err) 525 } 526 527 return nil 528 529 case jmodels.CommitOperationDelete: 530 instance := e.Commit.RKey 531 532 ddb, ok := i.Db.Execer.(*db.DB) 533 if !ok { 534 return fmt.Errorf("failed to index profile record, invalid db cast") 535 } 536 537 // get record from db first 538 spindles, err := db.GetSpindles( 539 ddb, 540 orm.FilterEq("owner", did), 541 orm.FilterEq("instance", instance), 542 ) 543 if err != nil || len(spindles) != 1 { 544 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 545 } 546 spindle := spindles[0] 547 548 tx, err := ddb.Begin() 549 if err != nil { 550 return err 551 } 552 defer func() { 553 tx.Rollback() 554 i.Enforcer.E.LoadPolicy() 555 }() 556 557 // remove spindle members first 558 err = db.RemoveSpindleMember( 559 tx, 560 orm.FilterEq("owner", did), 561 orm.FilterEq("instance", instance), 562 ) 563 if err != nil { 564 return err 565 } 566 567 err = db.DeleteSpindle( 568 tx, 569 orm.FilterEq("owner", did), 570 orm.FilterEq("instance", instance), 571 ) 572 if err != nil { 573 return err 574 } 575 576 if spindle.Verified != nil { 577 err = i.Enforcer.RemoveSpindle(instance) 578 if err != nil { 579 return err 580 } 581 } 582 583 err = tx.Commit() 584 if err != nil { 585 return err 586 } 587 588 err = i.Enforcer.E.SavePolicy() 589 if err != nil { 590 return err 591 } 592 } 593 594 return nil 595} 596 597func (i *Ingester) ingestString(e *jmodels.Event) error { 598 did := e.Did 599 rkey := e.Commit.RKey 600 601 var err error 602 603 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 604 l.Info("ingesting record") 605 606 ddb, ok := i.Db.Execer.(*db.DB) 607 if !ok { 608 return fmt.Errorf("failed to index string record, invalid db cast") 609 } 610 611 switch e.Commit.Operation { 612 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 613 raw := json.RawMessage(e.Commit.Record) 614 record := tangled.String{} 615 err = json.Unmarshal(raw, &record) 616 if err != nil { 617 l.Error("invalid record", "err", err) 618 return err 619 } 620 621 string := models.StringFromRecord(did, rkey, record) 622 623 if err = i.Validator.ValidateString(&string); err != nil { 624 l.Error("invalid record", "err", err) 625 return err 626 } 627 628 if err = db.AddString(ddb, string); err != nil { 629 l.Error("failed to add string", "err", err) 630 return err 631 } 632 633 return nil 634 635 case jmodels.CommitOperationDelete: 636 if err := db.DeleteString( 637 ddb, 638 orm.FilterEq("did", did), 639 orm.FilterEq("rkey", rkey), 640 ); err != nil { 641 l.Error("failed to delete", "err", err) 642 return fmt.Errorf("failed to delete string record: %w", err) 643 } 644 645 return nil 646 } 647 648 return nil 649} 650 651func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 652 did := e.Did 653 var err error 654 655 l := i.Logger.With("handler", "ingestKnotMember") 656 l = l.With("nsid", e.Commit.Collection) 657 658 switch e.Commit.Operation { 659 case jmodels.CommitOperationCreate: 660 raw := json.RawMessage(e.Commit.Record) 661 record := tangled.KnotMember{} 662 err = json.Unmarshal(raw, &record) 663 if err != nil { 664 l.Error("invalid record", "err", err) 665 return err 666 } 667 668 // only knot owner can invite to knots 669 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 670 if err != nil || !ok { 671 return fmt.Errorf("failed to enforce permissions: %w", err) 672 } 673 674 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 675 if err != nil { 676 return err 677 } 678 679 if memberId.Handle.IsInvalidHandle() { 680 return err 681 } 682 683 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 684 if err != nil { 685 return fmt.Errorf("failed to update ACLs: %w", err) 686 } 687 688 l.Info("added knot member") 689 case jmodels.CommitOperationDelete: 690 // we don't store knot members in a table (like we do for spindle) 691 // and we can't remove this just yet. possibly fixed if we switch 692 // to either: 693 // 1. a knot_members table like with spindle and store the rkey 694 // 2. use the knot host as the rkey 695 // 696 // TODO: implement member deletion 697 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 698 } 699 700 return nil 701} 702 703func (i *Ingester) ingestKnot(e *jmodels.Event) error { 704 did := e.Did 705 var err error 706 707 l := i.Logger.With("handler", "ingestKnot") 708 l = l.With("nsid", e.Commit.Collection) 709 710 switch e.Commit.Operation { 711 case jmodels.CommitOperationCreate: 712 raw := json.RawMessage(e.Commit.Record) 713 record := tangled.Knot{} 714 err = json.Unmarshal(raw, &record) 715 if err != nil { 716 l.Error("invalid record", "err", err) 717 return err 718 } 719 720 domain := e.Commit.RKey 721 722 ddb, ok := i.Db.Execer.(*db.DB) 723 if !ok { 724 return fmt.Errorf("failed to index profile record, invalid db cast") 725 } 726 727 err := db.AddKnot(ddb, domain, did) 728 if err != nil { 729 l.Error("failed to add knot to db", "err", err, "domain", domain) 730 return err 731 } 732 733 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 734 if err != nil { 735 l.Error("failed to verify knot", "err", err, "domain", domain) 736 return err 737 } 738 739 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 740 if err != nil { 741 return fmt.Errorf("failed to mark verified: %w", err) 742 } 743 744 return nil 745 746 case jmodels.CommitOperationDelete: 747 domain := e.Commit.RKey 748 749 ddb, ok := i.Db.Execer.(*db.DB) 750 if !ok { 751 return fmt.Errorf("failed to index knot record, invalid db cast") 752 } 753 754 // get record from db first 755 registrations, err := db.GetRegistrations( 756 ddb, 757 orm.FilterEq("domain", domain), 758 orm.FilterEq("did", did), 759 ) 760 if err != nil { 761 return fmt.Errorf("failed to get registration: %w", err) 762 } 763 if len(registrations) != 1 { 764 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 765 } 766 registration := registrations[0] 767 768 tx, err := ddb.Begin() 769 if err != nil { 770 return err 771 } 772 defer func() { 773 tx.Rollback() 774 i.Enforcer.E.LoadPolicy() 775 }() 776 777 err = db.DeleteKnot( 778 tx, 779 orm.FilterEq("did", did), 780 orm.FilterEq("domain", domain), 781 ) 782 if err != nil { 783 return err 784 } 785 786 if registration.Registered != nil { 787 err = i.Enforcer.RemoveKnot(domain) 788 if err != nil { 789 return err 790 } 791 } 792 793 err = tx.Commit() 794 if err != nil { 795 return err 796 } 797 798 err = i.Enforcer.E.SavePolicy() 799 if err != nil { 800 return err 801 } 802 } 803 804 return nil 805} 806func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 807 did := e.Did 808 rkey := e.Commit.RKey 809 810 var err error 811 812 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 813 l.Info("ingesting record") 814 815 ddb, ok := i.Db.Execer.(*db.DB) 816 if !ok { 817 return fmt.Errorf("failed to index issue record, invalid db cast") 818 } 819 820 switch e.Commit.Operation { 821 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 822 raw := json.RawMessage(e.Commit.Record) 823 record := tangled.RepoIssue{} 824 err = json.Unmarshal(raw, &record) 825 if err != nil { 826 l.Error("invalid record", "err", err) 827 return err 828 } 829 830 issue := models.IssueFromRecord(did, rkey, record) 831 832 if err := i.Validator.ValidateIssue(&issue); err != nil { 833 return fmt.Errorf("failed to validate issue: %w", err) 834 } 835 836 tx, err := ddb.BeginTx(ctx, nil) 837 if err != nil { 838 l.Error("failed to begin transaction", "err", err) 839 return err 840 } 841 defer tx.Rollback() 842 843 err = db.PutIssue(tx, &issue) 844 if err != nil { 845 l.Error("failed to create issue", "err", err) 846 return err 847 } 848 849 err = tx.Commit() 850 if err != nil { 851 l.Error("failed to commit txn", "err", err) 852 return err 853 } 854 855 return nil 856 857 case jmodels.CommitOperationDelete: 858 tx, err := ddb.BeginTx(ctx, nil) 859 if err != nil { 860 l.Error("failed to begin transaction", "err", err) 861 return err 862 } 863 defer tx.Rollback() 864 865 if err := db.DeleteIssues( 866 tx, 867 did, 868 rkey, 869 ); err != nil { 870 l.Error("failed to delete", "err", err) 871 return fmt.Errorf("failed to delete issue record: %w", err) 872 } 873 if err := tx.Commit(); err != nil { 874 l.Error("failed to commit txn", "err", err) 875 return err 876 } 877 878 return nil 879 } 880 881 return nil 882} 883 884func (i *Ingester) ingestPull(ctx context.Context, e *jmodels.Event) error { 885 did := e.Did 886 rkey := e.Commit.RKey 887 888 var err error 889 890 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 891 l.Info("ingesting record") 892 893 ddb, ok := i.Db.Execer.(*db.DB) 894 if !ok { 895 return fmt.Errorf("failed to index pull record, invalid db cast") 896 } 897 898 switch e.Commit.Operation { 899 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 900 raw := json.RawMessage(e.Commit.Record) 901 record := tangled.RepoPull{} 902 err = json.Unmarshal(raw, &record) 903 if err != nil { 904 l.Error("invalid record", "err", err) 905 return err 906 } 907 908 ownerId, err := i.IdResolver.ResolveIdent(ctx, did) 909 if err != nil { 910 l.Error("failed to resolve did") 911 return err 912 } 913 914 // go through and fetch all blobs in parallel 915 readers := make([]*io.ReadCloser, len(record.Rounds)) 916 var mu sync.Mutex 917 918 g, gctx := errgroup.WithContext(ctx) 919 920 for idx, b := range record.Rounds { 921 g.Go(func() error { 922 ownerPds := ownerId.PDSEndpoint() 923 url, _ := url.Parse(fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob", ownerPds)) 924 q := url.Query() 925 q.Set("cid", b.PatchBlob.Ref.String()) 926 q.Set("did", did) 927 url.RawQuery = q.Encode() 928 929 req, err := http.NewRequestWithContext(gctx, http.MethodGet, url.String(), nil) 930 if err != nil { 931 l.Error("failed to create request") 932 return err 933 } 934 req.Header.Set("Content-Type", "application/json") 935 936 resp, err := http.DefaultClient.Do(req) 937 if err != nil { 938 l.Error("failed to make request") 939 return err 940 } 941 942 mu.Lock() 943 readers[idx] = &resp.Body 944 mu.Unlock() 945 946 return nil 947 }) 948 } 949 950 if err := g.Wait(); err != nil { 951 for _, r := range readers { 952 if r != nil && *r != nil { 953 (*r).Close() 954 } 955 } 956 return err 957 } 958 959 defer func() { 960 for _, r := range readers { 961 if r != nil && *r != nil { 962 (*r).Close() 963 } 964 } 965 }() 966 967 pull := models.PullFromRecord(did, rkey, record, readers) 968 // if err := i.Validator.ValidateIssue(&issue); err != nil { 969 // return fmt.Errorf("failed to validate issue: %w", err) 970 // } 971 972 tx, err := ddb.BeginTx(ctx, nil) 973 if err != nil { 974 l.Error("failed to begin transaction", "err", err) 975 return err 976 } 977 defer tx.Rollback() 978 979 err = db.PutPull(tx, &pull) 980 if err != nil { 981 l.Error("failed to create pull", "err", err) 982 return err 983 } 984 985 err = tx.Commit() 986 if err != nil { 987 l.Error("failed to commit txn", "err", err) 988 return err 989 } 990 991 return nil 992 993 case jmodels.CommitOperationDelete: 994 tx, err := ddb.BeginTx(ctx, nil) 995 if err != nil { 996 l.Error("failed to begin transaction", "err", err) 997 return err 998 } 999 defer tx.Rollback() 1000 1001 if err := db.AbandonPulls( 1002 tx, 1003 orm.FilterEq("owner_did", did), 1004 orm.FilterEq("rkey", rkey), 1005 ); err != nil { 1006 l.Error("failed to abandon", "err", err) 1007 return fmt.Errorf("failed to abandon pull record: %w", err) 1008 } 1009 if err := tx.Commit(); err != nil { 1010 l.Error("failed to commit txn", "err", err) 1011 return err 1012 } 1013 1014 return nil 1015 } 1016 1017 return nil 1018} 1019 1020func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 1021 did := e.Did 1022 rkey := e.Commit.RKey 1023 1024 var err error 1025 1026 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1027 l.Info("ingesting record") 1028 1029 ddb, ok := i.Db.Execer.(*db.DB) 1030 if !ok { 1031 return fmt.Errorf("failed to index issue comment record, invalid db cast") 1032 } 1033 1034 switch e.Commit.Operation { 1035 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1036 raw := json.RawMessage(e.Commit.Record) 1037 record := tangled.RepoIssueComment{} 1038 err = json.Unmarshal(raw, &record) 1039 if err != nil { 1040 return fmt.Errorf("invalid record: %w", err) 1041 } 1042 1043 comment, err := models.IssueCommentFromRecord(did, rkey, record) 1044 if err != nil { 1045 return fmt.Errorf("failed to parse comment from record: %w", err) 1046 } 1047 1048 if err := i.Validator.ValidateIssueComment(comment); err != nil { 1049 return fmt.Errorf("failed to validate comment: %w", err) 1050 } 1051 1052 tx, err := ddb.Begin() 1053 if err != nil { 1054 return fmt.Errorf("failed to start transaction: %w", err) 1055 } 1056 defer tx.Rollback() 1057 1058 _, err = db.AddIssueComment(tx, *comment) 1059 if err != nil { 1060 return fmt.Errorf("failed to create issue comment: %w", err) 1061 } 1062 1063 return tx.Commit() 1064 1065 case jmodels.CommitOperationDelete: 1066 if err := db.DeleteIssueComments( 1067 ddb, 1068 orm.FilterEq("did", did), 1069 orm.FilterEq("rkey", rkey), 1070 ); err != nil { 1071 return fmt.Errorf("failed to delete issue comment record: %w", err) 1072 } 1073 1074 return nil 1075 } 1076 1077 return nil 1078} 1079 1080func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 1081 did := e.Did 1082 rkey := e.Commit.RKey 1083 1084 var err error 1085 1086 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1087 l.Info("ingesting record") 1088 1089 ddb, ok := i.Db.Execer.(*db.DB) 1090 if !ok { 1091 return fmt.Errorf("failed to index label definition, invalid db cast") 1092 } 1093 1094 switch e.Commit.Operation { 1095 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1096 raw := json.RawMessage(e.Commit.Record) 1097 record := tangled.LabelDefinition{} 1098 err = json.Unmarshal(raw, &record) 1099 if err != nil { 1100 return fmt.Errorf("invalid record: %w", err) 1101 } 1102 1103 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 1104 if err != nil { 1105 return fmt.Errorf("failed to parse labeldef from record: %w", err) 1106 } 1107 1108 if err := i.Validator.ValidateLabelDefinition(def); err != nil { 1109 return fmt.Errorf("failed to validate labeldef: %w", err) 1110 } 1111 1112 _, err = db.AddLabelDefinition(ddb, def) 1113 if err != nil { 1114 return fmt.Errorf("failed to create labeldef: %w", err) 1115 } 1116 1117 return nil 1118 1119 case jmodels.CommitOperationDelete: 1120 if err := db.DeleteLabelDefinition( 1121 ddb, 1122 orm.FilterEq("did", did), 1123 orm.FilterEq("rkey", rkey), 1124 ); err != nil { 1125 return fmt.Errorf("failed to delete labeldef record: %w", err) 1126 } 1127 1128 return nil 1129 } 1130 1131 return nil 1132} 1133 1134func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1135 did := e.Did 1136 rkey := e.Commit.RKey 1137 1138 var err error 1139 1140 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1141 l.Info("ingesting record") 1142 1143 ddb, ok := i.Db.Execer.(*db.DB) 1144 if !ok { 1145 return fmt.Errorf("failed to index label op, invalid db cast") 1146 } 1147 1148 switch e.Commit.Operation { 1149 case jmodels.CommitOperationCreate: 1150 raw := json.RawMessage(e.Commit.Record) 1151 record := tangled.LabelOp{} 1152 err = json.Unmarshal(raw, &record) 1153 if err != nil { 1154 return fmt.Errorf("invalid record: %w", err) 1155 } 1156 1157 subject := syntax.ATURI(record.Subject) 1158 collection := subject.Collection() 1159 1160 var repo *models.Repo 1161 switch collection { 1162 case tangled.RepoIssueNSID: 1163 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1164 if err != nil || len(i) != 1 { 1165 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1166 } 1167 repo = i[0].Repo 1168 default: 1169 return fmt.Errorf("unsupport label subject: %s", collection) 1170 } 1171 1172 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1173 if err != nil { 1174 return fmt.Errorf("failed to build label application ctx: %w", err) 1175 } 1176 1177 ops := models.LabelOpsFromRecord(did, rkey, record) 1178 1179 for _, o := range ops { 1180 def, ok := actx.Defs[o.OperandKey] 1181 if !ok { 1182 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1183 } 1184 if err := i.Validator.ValidateLabelOp(def, repo, &o); err != nil { 1185 return fmt.Errorf("failed to validate labelop: %w", err) 1186 } 1187 } 1188 1189 tx, err := ddb.Begin() 1190 if err != nil { 1191 return err 1192 } 1193 defer tx.Rollback() 1194 1195 for _, o := range ops { 1196 _, err = db.AddLabelOp(tx, &o) 1197 if err != nil { 1198 return fmt.Errorf("failed to add labelop: %w", err) 1199 } 1200 } 1201 1202 if err = tx.Commit(); err != nil { 1203 return err 1204 } 1205 } 1206 1207 return nil 1208}