A vibe coded tangled fork which supports pijul.
at 401bc223b6b5cc3a4b7332639f64d7e6409f273e 1075 lines 26 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" 9 "slices" 10 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/go-git/go-git/v5/plumbing" 16 "github.com/ipfs/go-cid" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/serververify" 22 "tangled.org/core/idresolver" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25) 26 27type Ingester struct { 28 Db db.DbWrapper 29 Enforcer *rbac.Enforcer 30 IdResolver *idresolver.Resolver 31 Config *config.Config 32 Logger *slog.Logger 33} 34 35type processFunc func(ctx context.Context, e *jmodels.Event) error 36 37func (i *Ingester) Ingest() processFunc { 38 return func(ctx context.Context, e *jmodels.Event) error { 39 var err error 40 defer func() { 41 eventTime := e.TimeUS 42 lastTimeUs := eventTime + 1 43 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 44 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 45 } 46 }() 47 48 l := i.Logger.With("kind", e.Kind) 49 switch e.Kind { 50 case jmodels.EventKindAccount: 51 if !e.Account.Active && *e.Account.Status == "deactivated" { 52 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 53 } 54 case jmodels.EventKindIdentity: 55 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 56 case jmodels.EventKindCommit: 57 switch e.Commit.Collection { 58 case tangled.GraphFollowNSID: 59 err = i.ingestFollow(e) 60 case tangled.FeedStarNSID: 61 err = i.ingestStar(e) 62 case tangled.PublicKeyNSID: 63 err = i.ingestPublicKey(e) 64 case tangled.RepoArtifactNSID: 65 err = i.ingestArtifact(e) 66 case tangled.ActorProfileNSID: 67 err = i.ingestProfile(e) 68 case tangled.SpindleMemberNSID: 69 err = i.ingestSpindleMember(ctx, e) 70 case tangled.SpindleNSID: 71 err = i.ingestSpindle(ctx, e) 72 case tangled.KnotMemberNSID: 73 err = i.ingestKnotMember(e) 74 case tangled.KnotNSID: 75 err = i.ingestKnot(e) 76 case tangled.StringNSID: 77 err = i.ingestString(e) 78 case tangled.RepoIssueNSID: 79 err = i.ingestIssue(ctx, e) 80 case tangled.RepoIssueCommentNSID: 81 err = i.ingestIssueComment(e) 82 case tangled.LabelDefinitionNSID: 83 err = i.ingestLabelDefinition(e) 84 case tangled.LabelOpNSID: 85 err = i.ingestLabelOp(e) 86 } 87 l = i.Logger.With("nsid", e.Commit.Collection) 88 } 89 90 if err != nil { 91 l.Warn("refused to ingest record", "err", err) 92 } 93 94 return nil 95 } 96} 97 98func (i *Ingester) ingestStar(e *jmodels.Event) error { 99 var err error 100 did := e.Did 101 102 l := i.Logger.With("handler", "ingestStar") 103 l = l.With("nsid", e.Commit.Collection) 104 105 switch e.Commit.Operation { 106 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 107 var subjectUri syntax.ATURI 108 109 raw := json.RawMessage(e.Commit.Record) 110 record := tangled.FeedStar{} 111 err := json.Unmarshal(raw, &record) 112 if err != nil { 113 l.Error("invalid record", "err", err) 114 return err 115 } 116 117 subjectUri, err = syntax.ParseATURI(record.Subject) 118 if err != nil { 119 l.Error("invalid record", "err", err) 120 return err 121 } 122 err = db.AddStar(i.Db, &models.Star{ 123 Did: did, 124 RepoAt: subjectUri, 125 Rkey: e.Commit.RKey, 126 }) 127 case jmodels.CommitOperationDelete: 128 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 129 } 130 131 if err != nil { 132 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 133 } 134 135 return nil 136} 137 138func (i *Ingester) ingestFollow(e *jmodels.Event) error { 139 var err error 140 did := e.Did 141 142 l := i.Logger.With("handler", "ingestFollow") 143 l = l.With("nsid", e.Commit.Collection) 144 145 switch e.Commit.Operation { 146 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 147 raw := json.RawMessage(e.Commit.Record) 148 record := tangled.GraphFollow{} 149 err = json.Unmarshal(raw, &record) 150 if err != nil { 151 l.Error("invalid record", "err", err) 152 return err 153 } 154 155 err = db.AddFollow(i.Db, &models.Follow{ 156 UserDid: did, 157 SubjectDid: record.Subject, 158 Rkey: e.Commit.RKey, 159 }) 160 case jmodels.CommitOperationDelete: 161 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 162 } 163 164 if err != nil { 165 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 166 } 167 168 return nil 169} 170 171func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 172 did := e.Did 173 var err error 174 175 l := i.Logger.With("handler", "ingestPublicKey") 176 l = l.With("nsid", e.Commit.Collection) 177 178 switch e.Commit.Operation { 179 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 180 l.Debug("processing add of pubkey") 181 raw := json.RawMessage(e.Commit.Record) 182 record := tangled.PublicKey{} 183 err = json.Unmarshal(raw, &record) 184 if err != nil { 185 l.Error("invalid record", "err", err) 186 return err 187 } 188 189 name := record.Name 190 key := record.Key 191 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 192 case jmodels.CommitOperationDelete: 193 l.Debug("processing delete of pubkey") 194 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 195 } 196 197 if err != nil { 198 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 199 } 200 201 return nil 202} 203 204func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 205 did := e.Did 206 var err error 207 208 l := i.Logger.With("handler", "ingestArtifact") 209 l = l.With("nsid", e.Commit.Collection) 210 211 switch e.Commit.Operation { 212 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 213 raw := json.RawMessage(e.Commit.Record) 214 record := tangled.RepoArtifact{} 215 err = json.Unmarshal(raw, &record) 216 if err != nil { 217 l.Error("invalid record", "err", err) 218 return err 219 } 220 221 repoAt, err := syntax.ParseATURI(record.Repo) 222 if err != nil { 223 return err 224 } 225 226 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 227 if err != nil { 228 return err 229 } 230 231 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 232 if err != nil || !ok { 233 return err 234 } 235 236 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 237 if err != nil { 238 createdAt = time.Now() 239 } 240 241 artifact := models.Artifact{ 242 Did: did, 243 Rkey: e.Commit.RKey, 244 RepoAt: repoAt, 245 Tag: plumbing.Hash(record.Tag), 246 CreatedAt: createdAt, 247 BlobCid: cid.Cid(record.Artifact.Ref), 248 Name: record.Name, 249 Size: uint64(record.Artifact.Size), 250 MimeType: record.Artifact.MimeType, 251 } 252 253 err = db.AddArtifact(i.Db, artifact) 254 case jmodels.CommitOperationDelete: 255 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 256 } 257 258 if err != nil { 259 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 260 } 261 262 return nil 263} 264 265func (i *Ingester) ingestProfile(e *jmodels.Event) error { 266 did := e.Did 267 var err error 268 269 l := i.Logger.With("handler", "ingestProfile") 270 l = l.With("nsid", e.Commit.Collection) 271 272 if e.Commit.RKey != "self" { 273 return fmt.Errorf("ingestProfile only ingests `self` record") 274 } 275 276 switch e.Commit.Operation { 277 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 278 raw := json.RawMessage(e.Commit.Record) 279 record := tangled.ActorProfile{} 280 err = json.Unmarshal(raw, &record) 281 if err != nil { 282 l.Error("invalid record", "err", err) 283 return err 284 } 285 286 avatar := "" 287 if record.Avatar != nil { 288 avatar = record.Avatar.Ref.String() 289 } 290 291 description := "" 292 if record.Description != nil { 293 description = *record.Description 294 } 295 296 includeBluesky := record.Bluesky 297 298 pronouns := "" 299 if record.Pronouns != nil { 300 pronouns = *record.Pronouns 301 } 302 303 location := "" 304 if record.Location != nil { 305 location = *record.Location 306 } 307 308 var links [5]string 309 for i, l := range record.Links { 310 if i < 5 { 311 links[i] = l 312 } 313 } 314 315 var stats [2]models.VanityStat 316 for i, s := range record.Stats { 317 if i < 2 { 318 stats[i].Kind = models.ParseVanityStatKind(s) 319 } 320 } 321 322 var pinned [6]syntax.ATURI 323 for i, r := range record.PinnedRepositories { 324 if i < 6 { 325 pinned[i] = syntax.ATURI(r) 326 } 327 } 328 329 profile := models.Profile{ 330 Did: did, 331 Avatar: avatar, 332 Description: description, 333 IncludeBluesky: includeBluesky, 334 Location: location, 335 Links: links, 336 Stats: stats, 337 PinnedRepos: pinned, 338 Pronouns: pronouns, 339 } 340 341 ddb, ok := i.Db.Execer.(*db.DB) 342 if !ok { 343 return fmt.Errorf("failed to index profile record, invalid db cast") 344 } 345 346 tx, err := ddb.Begin() 347 if err != nil { 348 return fmt.Errorf("failed to start transaction") 349 } 350 351 err = db.ValidateProfile(tx, &profile) 352 if err != nil { 353 return fmt.Errorf("invalid profile record") 354 } 355 356 err = db.UpsertProfile(tx, &profile) 357 case jmodels.CommitOperationDelete: 358 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 359 } 360 361 if err != nil { 362 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 363 } 364 365 return nil 366} 367 368func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 369 did := e.Did 370 var err error 371 372 l := i.Logger.With("handler", "ingestSpindleMember") 373 l = l.With("nsid", e.Commit.Collection) 374 375 switch e.Commit.Operation { 376 case jmodels.CommitOperationCreate: 377 raw := json.RawMessage(e.Commit.Record) 378 record := tangled.SpindleMember{} 379 err = json.Unmarshal(raw, &record) 380 if err != nil { 381 l.Error("invalid record", "err", err) 382 return err 383 } 384 385 // only spindle owner can invite to spindles 386 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 387 if err != nil || !ok { 388 return fmt.Errorf("failed to enforce permissions: %w", err) 389 } 390 391 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 392 if err != nil { 393 return err 394 } 395 396 if memberId.Handle.IsInvalidHandle() { 397 return err 398 } 399 400 ddb, ok := i.Db.Execer.(*db.DB) 401 if !ok { 402 return fmt.Errorf("invalid db cast") 403 } 404 405 err = db.AddSpindleMember(ddb, models.SpindleMember{ 406 Did: syntax.DID(did), 407 Rkey: e.Commit.RKey, 408 Instance: record.Instance, 409 Subject: memberId.DID, 410 }) 411 if !ok { 412 return fmt.Errorf("failed to add to db: %w", err) 413 } 414 415 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 416 if err != nil { 417 return fmt.Errorf("failed to update ACLs: %w", err) 418 } 419 420 l.Info("added spindle member") 421 case jmodels.CommitOperationDelete: 422 rkey := e.Commit.RKey 423 424 ddb, ok := i.Db.Execer.(*db.DB) 425 if !ok { 426 return fmt.Errorf("failed to index profile record, invalid db cast") 427 } 428 429 // get record from db first 430 members, err := db.GetSpindleMembers( 431 ddb, 432 orm.FilterEq("did", did), 433 orm.FilterEq("rkey", rkey), 434 ) 435 if err != nil || len(members) != 1 { 436 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 437 } 438 member := members[0] 439 440 tx, err := ddb.Begin() 441 if err != nil { 442 return fmt.Errorf("failed to start txn: %w", err) 443 } 444 445 // remove record by rkey && update enforcer 446 if err = db.RemoveSpindleMember( 447 tx, 448 orm.FilterEq("did", did), 449 orm.FilterEq("rkey", rkey), 450 ); err != nil { 451 return fmt.Errorf("failed to remove from db: %w", err) 452 } 453 454 // update enforcer 455 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 456 if err != nil { 457 return fmt.Errorf("failed to update ACLs: %w", err) 458 } 459 460 if err = tx.Commit(); err != nil { 461 return fmt.Errorf("failed to commit txn: %w", err) 462 } 463 464 if err = i.Enforcer.E.SavePolicy(); err != nil { 465 return fmt.Errorf("failed to save ACLs: %w", err) 466 } 467 468 l.Info("removed spindle member") 469 } 470 471 return nil 472} 473 474func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 475 did := e.Did 476 var err error 477 478 l := i.Logger.With("handler", "ingestSpindle") 479 l = l.With("nsid", e.Commit.Collection) 480 481 switch e.Commit.Operation { 482 case jmodels.CommitOperationCreate: 483 raw := json.RawMessage(e.Commit.Record) 484 record := tangled.Spindle{} 485 err = json.Unmarshal(raw, &record) 486 if err != nil { 487 l.Error("invalid record", "err", err) 488 return err 489 } 490 491 instance := e.Commit.RKey 492 493 ddb, ok := i.Db.Execer.(*db.DB) 494 if !ok { 495 return fmt.Errorf("failed to index profile record, invalid db cast") 496 } 497 498 err := db.AddSpindle(ddb, models.Spindle{ 499 Owner: syntax.DID(did), 500 Instance: instance, 501 }) 502 if err != nil { 503 l.Error("failed to add spindle to db", "err", err, "instance", instance) 504 return err 505 } 506 507 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 508 if err != nil { 509 l.Error("failed to add spindle to db", "err", err, "instance", instance) 510 return err 511 } 512 513 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 514 if err != nil { 515 return fmt.Errorf("failed to mark verified: %w", err) 516 } 517 518 return nil 519 520 case jmodels.CommitOperationDelete: 521 instance := e.Commit.RKey 522 523 ddb, ok := i.Db.Execer.(*db.DB) 524 if !ok { 525 return fmt.Errorf("failed to index profile record, invalid db cast") 526 } 527 528 // get record from db first 529 spindles, err := db.GetSpindles( 530 ddb, 531 orm.FilterEq("owner", did), 532 orm.FilterEq("instance", instance), 533 ) 534 if err != nil || len(spindles) != 1 { 535 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 536 } 537 spindle := spindles[0] 538 539 tx, err := ddb.Begin() 540 if err != nil { 541 return err 542 } 543 defer func() { 544 tx.Rollback() 545 i.Enforcer.E.LoadPolicy() 546 }() 547 548 // remove spindle members first 549 err = db.RemoveSpindleMember( 550 tx, 551 orm.FilterEq("owner", did), 552 orm.FilterEq("instance", instance), 553 ) 554 if err != nil { 555 return err 556 } 557 558 err = db.DeleteSpindle( 559 tx, 560 orm.FilterEq("owner", did), 561 orm.FilterEq("instance", instance), 562 ) 563 if err != nil { 564 return err 565 } 566 567 if spindle.Verified != nil { 568 err = i.Enforcer.RemoveSpindle(instance) 569 if err != nil { 570 return err 571 } 572 } 573 574 err = tx.Commit() 575 if err != nil { 576 return err 577 } 578 579 err = i.Enforcer.E.SavePolicy() 580 if err != nil { 581 return err 582 } 583 } 584 585 return nil 586} 587 588func (i *Ingester) ingestString(e *jmodels.Event) error { 589 did := e.Did 590 rkey := e.Commit.RKey 591 592 var err error 593 594 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 595 l.Info("ingesting record") 596 597 ddb, ok := i.Db.Execer.(*db.DB) 598 if !ok { 599 return fmt.Errorf("failed to index string record, invalid db cast") 600 } 601 602 switch e.Commit.Operation { 603 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 604 raw := json.RawMessage(e.Commit.Record) 605 record := tangled.String{} 606 err = json.Unmarshal(raw, &record) 607 if err != nil { 608 l.Error("invalid record", "err", err) 609 return err 610 } 611 612 string := models.StringFromRecord(did, rkey, record) 613 614 if err = string.Validate(); err != nil { 615 l.Error("invalid record", "err", err) 616 return err 617 } 618 619 if err = db.AddString(ddb, string); err != nil { 620 l.Error("failed to add string", "err", err) 621 return err 622 } 623 624 return nil 625 626 case jmodels.CommitOperationDelete: 627 if err := db.DeleteString( 628 ddb, 629 orm.FilterEq("did", did), 630 orm.FilterEq("rkey", rkey), 631 ); err != nil { 632 l.Error("failed to delete", "err", err) 633 return fmt.Errorf("failed to delete string record: %w", err) 634 } 635 636 return nil 637 } 638 639 return nil 640} 641 642func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 643 did := e.Did 644 var err error 645 646 l := i.Logger.With("handler", "ingestKnotMember") 647 l = l.With("nsid", e.Commit.Collection) 648 649 switch e.Commit.Operation { 650 case jmodels.CommitOperationCreate: 651 raw := json.RawMessage(e.Commit.Record) 652 record := tangled.KnotMember{} 653 err = json.Unmarshal(raw, &record) 654 if err != nil { 655 l.Error("invalid record", "err", err) 656 return err 657 } 658 659 // only knot owner can invite to knots 660 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 661 if err != nil || !ok { 662 return fmt.Errorf("failed to enforce permissions: %w", err) 663 } 664 665 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 666 if err != nil { 667 return err 668 } 669 670 if memberId.Handle.IsInvalidHandle() { 671 return err 672 } 673 674 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 675 if err != nil { 676 return fmt.Errorf("failed to update ACLs: %w", err) 677 } 678 679 l.Info("added knot member") 680 case jmodels.CommitOperationDelete: 681 // we don't store knot members in a table (like we do for spindle) 682 // and we can't remove this just yet. possibly fixed if we switch 683 // to either: 684 // 1. a knot_members table like with spindle and store the rkey 685 // 2. use the knot host as the rkey 686 // 687 // TODO: implement member deletion 688 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 689 } 690 691 return nil 692} 693 694func (i *Ingester) ingestKnot(e *jmodels.Event) error { 695 did := e.Did 696 var err error 697 698 l := i.Logger.With("handler", "ingestKnot") 699 l = l.With("nsid", e.Commit.Collection) 700 701 switch e.Commit.Operation { 702 case jmodels.CommitOperationCreate: 703 raw := json.RawMessage(e.Commit.Record) 704 record := tangled.Knot{} 705 err = json.Unmarshal(raw, &record) 706 if err != nil { 707 l.Error("invalid record", "err", err) 708 return err 709 } 710 711 domain := e.Commit.RKey 712 713 ddb, ok := i.Db.Execer.(*db.DB) 714 if !ok { 715 return fmt.Errorf("failed to index profile record, invalid db cast") 716 } 717 718 err := db.AddKnot(ddb, domain, did) 719 if err != nil { 720 l.Error("failed to add knot to db", "err", err, "domain", domain) 721 return err 722 } 723 724 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 725 if err != nil { 726 l.Error("failed to verify knot", "err", err, "domain", domain) 727 return err 728 } 729 730 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 731 if err != nil { 732 return fmt.Errorf("failed to mark verified: %w", err) 733 } 734 735 return nil 736 737 case jmodels.CommitOperationDelete: 738 domain := e.Commit.RKey 739 740 ddb, ok := i.Db.Execer.(*db.DB) 741 if !ok { 742 return fmt.Errorf("failed to index knot record, invalid db cast") 743 } 744 745 // get record from db first 746 registrations, err := db.GetRegistrations( 747 ddb, 748 orm.FilterEq("domain", domain), 749 orm.FilterEq("did", did), 750 ) 751 if err != nil { 752 return fmt.Errorf("failed to get registration: %w", err) 753 } 754 if len(registrations) != 1 { 755 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 756 } 757 registration := registrations[0] 758 759 tx, err := ddb.Begin() 760 if err != nil { 761 return err 762 } 763 defer func() { 764 tx.Rollback() 765 i.Enforcer.E.LoadPolicy() 766 }() 767 768 err = db.DeleteKnot( 769 tx, 770 orm.FilterEq("did", did), 771 orm.FilterEq("domain", domain), 772 ) 773 if err != nil { 774 return err 775 } 776 777 if registration.Registered != nil { 778 err = i.Enforcer.RemoveKnot(domain) 779 if err != nil { 780 return err 781 } 782 } 783 784 err = tx.Commit() 785 if err != nil { 786 return err 787 } 788 789 err = i.Enforcer.E.SavePolicy() 790 if err != nil { 791 return err 792 } 793 } 794 795 return nil 796} 797func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 798 did := e.Did 799 rkey := e.Commit.RKey 800 801 var err error 802 803 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 804 l.Info("ingesting record") 805 806 ddb, ok := i.Db.Execer.(*db.DB) 807 if !ok { 808 return fmt.Errorf("failed to index issue record, invalid db cast") 809 } 810 811 switch e.Commit.Operation { 812 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 813 raw := json.RawMessage(e.Commit.Record) 814 record := tangled.RepoIssue{} 815 err = json.Unmarshal(raw, &record) 816 if err != nil { 817 l.Error("invalid record", "err", err) 818 return err 819 } 820 821 issue := models.IssueFromRecord(did, rkey, record) 822 823 if err := issue.Validate(); err != nil { 824 return fmt.Errorf("failed to validate issue: %w", err) 825 } 826 827 tx, err := ddb.BeginTx(ctx, nil) 828 if err != nil { 829 l.Error("failed to begin transaction", "err", err) 830 return err 831 } 832 defer tx.Rollback() 833 834 err = db.PutIssue(tx, &issue) 835 if err != nil { 836 l.Error("failed to create issue", "err", err) 837 return err 838 } 839 840 err = tx.Commit() 841 if err != nil { 842 l.Error("failed to commit txn", "err", err) 843 return err 844 } 845 846 return nil 847 848 case jmodels.CommitOperationDelete: 849 tx, err := ddb.BeginTx(ctx, nil) 850 if err != nil { 851 l.Error("failed to begin transaction", "err", err) 852 return err 853 } 854 defer tx.Rollback() 855 856 if err := db.DeleteIssues( 857 tx, 858 did, 859 rkey, 860 ); err != nil { 861 l.Error("failed to delete", "err", err) 862 return fmt.Errorf("failed to delete issue record: %w", err) 863 } 864 if err := tx.Commit(); err != nil { 865 l.Error("failed to commit txn", "err", err) 866 return err 867 } 868 869 return nil 870 } 871 872 return nil 873} 874 875func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 876 did := e.Did 877 rkey := e.Commit.RKey 878 879 var err error 880 881 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 882 l.Info("ingesting record") 883 884 ddb, ok := i.Db.Execer.(*db.DB) 885 if !ok { 886 return fmt.Errorf("failed to index issue comment record, invalid db cast") 887 } 888 889 switch e.Commit.Operation { 890 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 891 raw := json.RawMessage(e.Commit.Record) 892 record := tangled.RepoIssueComment{} 893 err = json.Unmarshal(raw, &record) 894 if err != nil { 895 return fmt.Errorf("invalid record: %w", err) 896 } 897 898 comment, err := models.IssueCommentFromRecord(did, rkey, record) 899 if err != nil { 900 return fmt.Errorf("failed to parse comment from record: %w", err) 901 } 902 903 if err := comment.Validate(); err != nil { 904 return fmt.Errorf("failed to validate comment: %w", err) 905 } 906 907 tx, err := ddb.Begin() 908 if err != nil { 909 return fmt.Errorf("failed to start transaction: %w", err) 910 } 911 defer tx.Rollback() 912 913 _, err = db.AddIssueComment(tx, *comment) 914 if err != nil { 915 return fmt.Errorf("failed to create issue comment: %w", err) 916 } 917 918 return tx.Commit() 919 920 case jmodels.CommitOperationDelete: 921 if err := db.DeleteIssueComments( 922 ddb, 923 orm.FilterEq("did", did), 924 orm.FilterEq("rkey", rkey), 925 ); err != nil { 926 return fmt.Errorf("failed to delete issue comment record: %w", err) 927 } 928 929 return nil 930 } 931 932 return nil 933} 934 935func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 936 did := e.Did 937 rkey := e.Commit.RKey 938 939 var err error 940 941 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 942 l.Info("ingesting record") 943 944 ddb, ok := i.Db.Execer.(*db.DB) 945 if !ok { 946 return fmt.Errorf("failed to index label definition, invalid db cast") 947 } 948 949 switch e.Commit.Operation { 950 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 951 raw := json.RawMessage(e.Commit.Record) 952 record := tangled.LabelDefinition{} 953 err = json.Unmarshal(raw, &record) 954 if err != nil { 955 return fmt.Errorf("invalid record: %w", err) 956 } 957 958 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 959 if err != nil { 960 return fmt.Errorf("failed to parse labeldef from record: %w", err) 961 } 962 963 if err := def.Validate(); err != nil { 964 return fmt.Errorf("failed to validate labeldef: %w", err) 965 } 966 967 _, err = db.AddLabelDefinition(ddb, def) 968 if err != nil { 969 return fmt.Errorf("failed to create labeldef: %w", err) 970 } 971 972 return nil 973 974 case jmodels.CommitOperationDelete: 975 if err := db.DeleteLabelDefinition( 976 ddb, 977 orm.FilterEq("did", did), 978 orm.FilterEq("rkey", rkey), 979 ); err != nil { 980 return fmt.Errorf("failed to delete labeldef record: %w", err) 981 } 982 983 return nil 984 } 985 986 return nil 987} 988 989func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 990 did := e.Did 991 rkey := e.Commit.RKey 992 993 var err error 994 995 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 996 l.Info("ingesting record") 997 998 ddb, ok := i.Db.Execer.(*db.DB) 999 if !ok { 1000 return fmt.Errorf("failed to index label op, invalid db cast") 1001 } 1002 1003 switch e.Commit.Operation { 1004 case jmodels.CommitOperationCreate: 1005 raw := json.RawMessage(e.Commit.Record) 1006 record := tangled.LabelOp{} 1007 err = json.Unmarshal(raw, &record) 1008 if err != nil { 1009 return fmt.Errorf("invalid record: %w", err) 1010 } 1011 1012 subject := syntax.ATURI(record.Subject) 1013 collection := subject.Collection() 1014 1015 var repo *models.Repo 1016 switch collection { 1017 case tangled.RepoIssueNSID: 1018 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1019 if err != nil || len(i) != 1 { 1020 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1021 } 1022 repo = i[0].Repo 1023 default: 1024 return fmt.Errorf("unsupport label subject: %s", collection) 1025 } 1026 1027 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1028 if err != nil { 1029 return fmt.Errorf("failed to build label application ctx: %w", err) 1030 } 1031 1032 ops := models.LabelOpsFromRecord(did, rkey, record) 1033 1034 for _, o := range ops { 1035 def, ok := actx.Defs[o.OperandKey] 1036 if !ok { 1037 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1038 } 1039 1040 // validate permissions: only collaborators can apply labels currently 1041 // 1042 // TODO: introduce a repo:triage permission 1043 ok, err := i.Enforcer.IsPushAllowed(o.Did, repo.Knot, repo.DidSlashRepo()) 1044 if err != nil { 1045 return fmt.Errorf("enforcing permission: %w", err) 1046 } 1047 if !ok { 1048 return fmt.Errorf("unauthorized label operation") 1049 } 1050 1051 if err := def.ValidateOperandValue(&o); err != nil { 1052 return fmt.Errorf("failed to validate labelop: %w", err) 1053 } 1054 } 1055 1056 tx, err := ddb.Begin() 1057 if err != nil { 1058 return err 1059 } 1060 defer tx.Rollback() 1061 1062 for _, o := range ops { 1063 _, err = db.AddLabelOp(tx, &o) 1064 if err != nil { 1065 return fmt.Errorf("failed to add labelop: %w", err) 1066 } 1067 } 1068 1069 if err = tx.Commit(); err != nil { 1070 return err 1071 } 1072 } 1073 1074 return nil 1075}