A vibe coded tangled fork which supports pijul.
at f4a09133a636451c495fa96c98b99379b7692497 1081 lines 26 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" 9 "slices" 10 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/go-git/go-git/v5/plumbing" 16 "github.com/ipfs/go-cid" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/serververify" 22 "tangled.org/core/idresolver" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25) 26 27type Ingester struct { 28 Db db.DbWrapper 29 Enforcer *rbac.Enforcer 30 IdResolver *idresolver.Resolver 31 Config *config.Config 32 Logger *slog.Logger 33} 34 35type processFunc func(ctx context.Context, e *jmodels.Event) error 36 37func (i *Ingester) Ingest() processFunc { 38 return func(ctx context.Context, e *jmodels.Event) error { 39 var err error 40 defer func() { 41 eventTime := e.TimeUS 42 lastTimeUs := eventTime + 1 43 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 44 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 45 } 46 }() 47 48 l := i.Logger.With("kind", e.Kind) 49 switch e.Kind { 50 case jmodels.EventKindAccount: 51 if !e.Account.Active && *e.Account.Status == "deactivated" { 52 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 53 } 54 case jmodels.EventKindIdentity: 55 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 56 case jmodels.EventKindCommit: 57 switch e.Commit.Collection { 58 case tangled.GraphFollowNSID: 59 err = i.ingestFollow(e) 60 case tangled.FeedStarNSID: 61 err = i.ingestStar(e) 62 case tangled.PublicKeyNSID: 63 err = i.ingestPublicKey(e) 64 case tangled.RepoArtifactNSID: 65 err = i.ingestArtifact(e) 66 case tangled.ActorProfileNSID: 67 err = i.ingestProfile(e) 68 case tangled.SpindleMemberNSID: 69 err = i.ingestSpindleMember(ctx, e) 70 case tangled.SpindleNSID: 71 err = i.ingestSpindle(ctx, e) 72 case tangled.KnotMemberNSID: 73 err = i.ingestKnotMember(e) 74 case tangled.KnotNSID: 75 err = i.ingestKnot(e) 76 case tangled.StringNSID: 77 err = i.ingestString(e) 78 case tangled.RepoIssueNSID: 79 err = i.ingestIssue(ctx, e) 80 case tangled.RepoIssueCommentNSID: 81 err = i.ingestIssueComment(e) 82 case tangled.LabelDefinitionNSID: 83 err = i.ingestLabelDefinition(e) 84 case tangled.LabelOpNSID: 85 err = i.ingestLabelOp(e) 86 } 87 l = i.Logger.With("nsid", e.Commit.Collection) 88 } 89 90 if err != nil { 91 l.Warn("refused to ingest record", "err", err) 92 } 93 94 return nil 95 } 96} 97 98func (i *Ingester) ingestStar(e *jmodels.Event) error { 99 var err error 100 did := e.Did 101 102 l := i.Logger.With("handler", "ingestStar") 103 l = l.With("nsid", e.Commit.Collection) 104 105 switch e.Commit.Operation { 106 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 107 var subjectUri syntax.ATURI 108 109 raw := json.RawMessage(e.Commit.Record) 110 record := tangled.FeedStar{} 111 err := json.Unmarshal(raw, &record) 112 if err != nil { 113 l.Error("invalid record", "err", err) 114 return err 115 } 116 117 subjectUri, err = syntax.ParseATURI(record.Subject) 118 if err != nil { 119 l.Error("invalid record", "err", err) 120 return err 121 } 122 err = db.AddStar(i.Db, &models.Star{ 123 Did: did, 124 RepoAt: subjectUri, 125 Rkey: e.Commit.RKey, 126 }) 127 case jmodels.CommitOperationDelete: 128 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 129 } 130 131 if err != nil { 132 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 133 } 134 135 return nil 136} 137 138func (i *Ingester) ingestFollow(e *jmodels.Event) error { 139 var err error 140 did := e.Did 141 142 l := i.Logger.With("handler", "ingestFollow") 143 l = l.With("nsid", e.Commit.Collection) 144 145 switch e.Commit.Operation { 146 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 147 raw := json.RawMessage(e.Commit.Record) 148 record := tangled.GraphFollow{} 149 err = json.Unmarshal(raw, &record) 150 if err != nil { 151 l.Error("invalid record", "err", err) 152 return err 153 } 154 155 err = db.AddFollow(i.Db, &models.Follow{ 156 UserDid: did, 157 SubjectDid: record.Subject, 158 Rkey: e.Commit.RKey, 159 }) 160 case jmodels.CommitOperationDelete: 161 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 162 } 163 164 if err != nil { 165 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 166 } 167 168 return nil 169} 170 171func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 172 did := e.Did 173 var err error 174 175 l := i.Logger.With("handler", "ingestPublicKey") 176 l = l.With("nsid", e.Commit.Collection) 177 178 switch e.Commit.Operation { 179 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 180 l.Debug("processing add of pubkey") 181 raw := json.RawMessage(e.Commit.Record) 182 record := tangled.PublicKey{} 183 err = json.Unmarshal(raw, &record) 184 if err != nil { 185 l.Error("invalid record", "err", err) 186 return err 187 } 188 189 name := record.Name 190 key := record.Key 191 err = db.AddPublicKey(i.Db, did, name, key, e.Commit.RKey) 192 case jmodels.CommitOperationDelete: 193 l.Debug("processing delete of pubkey") 194 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 195 } 196 197 if err != nil { 198 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 199 } 200 201 return nil 202} 203 204func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 205 did := e.Did 206 var err error 207 208 l := i.Logger.With("handler", "ingestArtifact") 209 l = l.With("nsid", e.Commit.Collection) 210 211 switch e.Commit.Operation { 212 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 213 raw := json.RawMessage(e.Commit.Record) 214 record := tangled.RepoArtifact{} 215 err = json.Unmarshal(raw, &record) 216 if err != nil { 217 l.Error("invalid record", "err", err) 218 return err 219 } 220 221 repoAt, err := syntax.ParseATURI(record.Repo) 222 if err != nil { 223 return err 224 } 225 226 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 227 if err != nil { 228 return err 229 } 230 231 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 232 if err != nil || !ok { 233 return err 234 } 235 236 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 237 if err != nil { 238 createdAt = time.Now() 239 } 240 241 artifact := models.Artifact{ 242 Did: did, 243 Rkey: e.Commit.RKey, 244 RepoAt: repoAt, 245 Tag: plumbing.Hash(record.Tag), 246 CreatedAt: createdAt, 247 BlobCid: cid.Cid(record.Artifact.Ref), 248 Name: record.Name, 249 Size: uint64(record.Artifact.Size), 250 MimeType: record.Artifact.MimeType, 251 } 252 253 err = db.AddArtifact(i.Db, artifact) 254 case jmodels.CommitOperationDelete: 255 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 256 } 257 258 if err != nil { 259 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 260 } 261 262 return nil 263} 264 265func (i *Ingester) ingestProfile(e *jmodels.Event) error { 266 did := e.Did 267 var err error 268 269 l := i.Logger.With("handler", "ingestProfile") 270 l = l.With("nsid", e.Commit.Collection) 271 272 if e.Commit.RKey != "self" { 273 return fmt.Errorf("ingestProfile only ingests `self` record") 274 } 275 276 switch e.Commit.Operation { 277 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 278 raw := json.RawMessage(e.Commit.Record) 279 record := tangled.ActorProfile{} 280 err = json.Unmarshal(raw, &record) 281 if err != nil { 282 l.Error("invalid record", "err", err) 283 return err 284 } 285 286 avatar := "" 287 if record.Avatar != nil { 288 avatar = record.Avatar.Ref.String() 289 } 290 291 description := "" 292 if record.Description != nil { 293 description = *record.Description 294 } 295 296 includeBluesky := record.Bluesky 297 298 pronouns := "" 299 if record.Pronouns != nil { 300 pronouns = *record.Pronouns 301 } 302 303 location := "" 304 if record.Location != nil { 305 location = *record.Location 306 } 307 308 var links [5]string 309 for i, l := range record.Links { 310 if i < 5 { 311 links[i] = l 312 } 313 } 314 315 var stats [2]models.VanityStat 316 for i, s := range record.Stats { 317 if i < 2 { 318 stats[i].Kind = models.VanityStatKind(s) 319 } 320 } 321 322 var pinned [6]syntax.ATURI 323 for i, r := range record.PinnedRepositories { 324 if i < 6 { 325 pinned[i] = syntax.ATURI(r) 326 } 327 } 328 329 profile := models.Profile{ 330 Did: did, 331 Avatar: avatar, 332 Description: description, 333 IncludeBluesky: includeBluesky, 334 Location: location, 335 Links: links, 336 Stats: stats, 337 PinnedRepos: pinned, 338 Pronouns: pronouns, 339 } 340 341 ddb, ok := i.Db.Execer.(*db.DB) 342 if !ok { 343 return fmt.Errorf("failed to index profile record, invalid db cast") 344 } 345 346 tx, err := ddb.Begin() 347 if err != nil { 348 return fmt.Errorf("failed to start transaction") 349 } 350 defer tx.Rollback() 351 352 err = db.ValidateProfile(tx, &profile) 353 if err != nil { 354 return fmt.Errorf("invalid profile record") 355 } 356 357 err = db.UpsertProfile(tx, &profile) 358 if err != nil { 359 return fmt.Errorf("upserting profile: %w", err) 360 } 361 362 err = tx.Commit() 363 case jmodels.CommitOperationDelete: 364 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 365 } 366 367 if err != nil { 368 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 369 } 370 371 return nil 372} 373 374func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 375 did := e.Did 376 var err error 377 378 l := i.Logger.With("handler", "ingestSpindleMember") 379 l = l.With("nsid", e.Commit.Collection) 380 381 switch e.Commit.Operation { 382 case jmodels.CommitOperationCreate: 383 raw := json.RawMessage(e.Commit.Record) 384 record := tangled.SpindleMember{} 385 err = json.Unmarshal(raw, &record) 386 if err != nil { 387 l.Error("invalid record", "err", err) 388 return err 389 } 390 391 // only spindle owner can invite to spindles 392 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 393 if err != nil || !ok { 394 return fmt.Errorf("failed to enforce permissions: %w", err) 395 } 396 397 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 398 if err != nil { 399 return err 400 } 401 402 if memberId.Handle.IsInvalidHandle() { 403 return err 404 } 405 406 ddb, ok := i.Db.Execer.(*db.DB) 407 if !ok { 408 return fmt.Errorf("failed to index profile record, invalid db cast") 409 } 410 411 err = db.AddSpindleMember(ddb, models.SpindleMember{ 412 Did: syntax.DID(did), 413 Rkey: e.Commit.RKey, 414 Instance: record.Instance, 415 Subject: memberId.DID, 416 }) 417 if !ok { 418 return fmt.Errorf("failed to add to db: %w", err) 419 } 420 421 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 422 if err != nil { 423 return fmt.Errorf("failed to update ACLs: %w", err) 424 } 425 426 l.Info("added spindle member") 427 case jmodels.CommitOperationDelete: 428 rkey := e.Commit.RKey 429 430 ddb, ok := i.Db.Execer.(*db.DB) 431 if !ok { 432 return fmt.Errorf("failed to index profile record, invalid db cast") 433 } 434 435 // get record from db first 436 members, err := db.GetSpindleMembers( 437 ddb, 438 orm.FilterEq("did", did), 439 orm.FilterEq("rkey", rkey), 440 ) 441 if err != nil || len(members) != 1 { 442 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 443 } 444 member := members[0] 445 446 tx, err := ddb.Begin() 447 if err != nil { 448 return fmt.Errorf("failed to start txn: %w", err) 449 } 450 451 // remove record by rkey && update enforcer 452 if err = db.RemoveSpindleMember( 453 tx, 454 orm.FilterEq("did", did), 455 orm.FilterEq("rkey", rkey), 456 ); err != nil { 457 return fmt.Errorf("failed to remove from db: %w", err) 458 } 459 460 // update enforcer 461 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 462 if err != nil { 463 return fmt.Errorf("failed to update ACLs: %w", err) 464 } 465 466 if err = tx.Commit(); err != nil { 467 return fmt.Errorf("failed to commit txn: %w", err) 468 } 469 470 if err = i.Enforcer.E.SavePolicy(); err != nil { 471 return fmt.Errorf("failed to save ACLs: %w", err) 472 } 473 474 l.Info("removed spindle member") 475 } 476 477 return nil 478} 479 480func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 481 did := e.Did 482 var err error 483 484 l := i.Logger.With("handler", "ingestSpindle") 485 l = l.With("nsid", e.Commit.Collection) 486 487 switch e.Commit.Operation { 488 case jmodels.CommitOperationCreate: 489 raw := json.RawMessage(e.Commit.Record) 490 record := tangled.Spindle{} 491 err = json.Unmarshal(raw, &record) 492 if err != nil { 493 l.Error("invalid record", "err", err) 494 return err 495 } 496 497 instance := e.Commit.RKey 498 499 ddb, ok := i.Db.Execer.(*db.DB) 500 if !ok { 501 return fmt.Errorf("failed to index profile record, invalid db cast") 502 } 503 504 err := db.AddSpindle(ddb, models.Spindle{ 505 Owner: syntax.DID(did), 506 Instance: instance, 507 }) 508 if err != nil { 509 l.Error("failed to add spindle to db", "err", err, "instance", instance) 510 return err 511 } 512 513 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 514 if err != nil { 515 l.Error("failed to add spindle to db", "err", err, "instance", instance) 516 return err 517 } 518 519 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 520 if err != nil { 521 return fmt.Errorf("failed to mark verified: %w", err) 522 } 523 524 return nil 525 526 case jmodels.CommitOperationDelete: 527 instance := e.Commit.RKey 528 529 ddb, ok := i.Db.Execer.(*db.DB) 530 if !ok { 531 return fmt.Errorf("failed to index profile record, invalid db cast") 532 } 533 534 // get record from db first 535 spindles, err := db.GetSpindles( 536 ddb, 537 orm.FilterEq("owner", did), 538 orm.FilterEq("instance", instance), 539 ) 540 if err != nil || len(spindles) != 1 { 541 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 542 } 543 spindle := spindles[0] 544 545 tx, err := ddb.Begin() 546 if err != nil { 547 return err 548 } 549 defer func() { 550 tx.Rollback() 551 i.Enforcer.E.LoadPolicy() 552 }() 553 554 // remove spindle members first 555 err = db.RemoveSpindleMember( 556 tx, 557 orm.FilterEq("owner", did), 558 orm.FilterEq("instance", instance), 559 ) 560 if err != nil { 561 return err 562 } 563 564 err = db.DeleteSpindle( 565 tx, 566 orm.FilterEq("owner", did), 567 orm.FilterEq("instance", instance), 568 ) 569 if err != nil { 570 return err 571 } 572 573 if spindle.Verified != nil { 574 err = i.Enforcer.RemoveSpindle(instance) 575 if err != nil { 576 return err 577 } 578 } 579 580 err = tx.Commit() 581 if err != nil { 582 return err 583 } 584 585 err = i.Enforcer.E.SavePolicy() 586 if err != nil { 587 return err 588 } 589 } 590 591 return nil 592} 593 594func (i *Ingester) ingestString(e *jmodels.Event) error { 595 did := e.Did 596 rkey := e.Commit.RKey 597 598 var err error 599 600 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 601 l.Info("ingesting record") 602 603 ddb, ok := i.Db.Execer.(*db.DB) 604 if !ok { 605 return fmt.Errorf("failed to index string record, invalid db cast") 606 } 607 608 switch e.Commit.Operation { 609 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 610 raw := json.RawMessage(e.Commit.Record) 611 record := tangled.String{} 612 err = json.Unmarshal(raw, &record) 613 if err != nil { 614 l.Error("invalid record", "err", err) 615 return err 616 } 617 618 string := models.StringFromRecord(did, rkey, record) 619 620 if err = string.Validate(); err != nil { 621 l.Error("invalid record", "err", err) 622 return err 623 } 624 625 if err = db.AddString(ddb, string); err != nil { 626 l.Error("failed to add string", "err", err) 627 return err 628 } 629 630 return nil 631 632 case jmodels.CommitOperationDelete: 633 if err := db.DeleteString( 634 ddb, 635 orm.FilterEq("did", did), 636 orm.FilterEq("rkey", rkey), 637 ); err != nil { 638 l.Error("failed to delete", "err", err) 639 return fmt.Errorf("failed to delete string record: %w", err) 640 } 641 642 return nil 643 } 644 645 return nil 646} 647 648func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 649 did := e.Did 650 var err error 651 652 l := i.Logger.With("handler", "ingestKnotMember") 653 l = l.With("nsid", e.Commit.Collection) 654 655 switch e.Commit.Operation { 656 case jmodels.CommitOperationCreate: 657 raw := json.RawMessage(e.Commit.Record) 658 record := tangled.KnotMember{} 659 err = json.Unmarshal(raw, &record) 660 if err != nil { 661 l.Error("invalid record", "err", err) 662 return err 663 } 664 665 // only knot owner can invite to knots 666 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 667 if err != nil || !ok { 668 return fmt.Errorf("failed to enforce permissions: %w", err) 669 } 670 671 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 672 if err != nil { 673 return err 674 } 675 676 if memberId.Handle.IsInvalidHandle() { 677 return err 678 } 679 680 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 681 if err != nil { 682 return fmt.Errorf("failed to update ACLs: %w", err) 683 } 684 685 l.Info("added knot member") 686 case jmodels.CommitOperationDelete: 687 // we don't store knot members in a table (like we do for spindle) 688 // and we can't remove this just yet. possibly fixed if we switch 689 // to either: 690 // 1. a knot_members table like with spindle and store the rkey 691 // 2. use the knot host as the rkey 692 // 693 // TODO: implement member deletion 694 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 695 } 696 697 return nil 698} 699 700func (i *Ingester) ingestKnot(e *jmodels.Event) error { 701 did := e.Did 702 var err error 703 704 l := i.Logger.With("handler", "ingestKnot") 705 l = l.With("nsid", e.Commit.Collection) 706 707 switch e.Commit.Operation { 708 case jmodels.CommitOperationCreate: 709 raw := json.RawMessage(e.Commit.Record) 710 record := tangled.Knot{} 711 err = json.Unmarshal(raw, &record) 712 if err != nil { 713 l.Error("invalid record", "err", err) 714 return err 715 } 716 717 domain := e.Commit.RKey 718 719 ddb, ok := i.Db.Execer.(*db.DB) 720 if !ok { 721 return fmt.Errorf("failed to index profile record, invalid db cast") 722 } 723 724 err := db.AddKnot(ddb, domain, did) 725 if err != nil { 726 l.Error("failed to add knot to db", "err", err, "domain", domain) 727 return err 728 } 729 730 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 731 if err != nil { 732 l.Error("failed to verify knot", "err", err, "domain", domain) 733 return err 734 } 735 736 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 737 if err != nil { 738 return fmt.Errorf("failed to mark verified: %w", err) 739 } 740 741 return nil 742 743 case jmodels.CommitOperationDelete: 744 domain := e.Commit.RKey 745 746 ddb, ok := i.Db.Execer.(*db.DB) 747 if !ok { 748 return fmt.Errorf("failed to index knot record, invalid db cast") 749 } 750 751 // get record from db first 752 registrations, err := db.GetRegistrations( 753 ddb, 754 orm.FilterEq("domain", domain), 755 orm.FilterEq("did", did), 756 ) 757 if err != nil { 758 return fmt.Errorf("failed to get registration: %w", err) 759 } 760 if len(registrations) != 1 { 761 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 762 } 763 registration := registrations[0] 764 765 tx, err := ddb.Begin() 766 if err != nil { 767 return err 768 } 769 defer func() { 770 tx.Rollback() 771 i.Enforcer.E.LoadPolicy() 772 }() 773 774 err = db.DeleteKnot( 775 tx, 776 orm.FilterEq("did", did), 777 orm.FilterEq("domain", domain), 778 ) 779 if err != nil { 780 return err 781 } 782 783 if registration.Registered != nil { 784 err = i.Enforcer.RemoveKnot(domain) 785 if err != nil { 786 return err 787 } 788 } 789 790 err = tx.Commit() 791 if err != nil { 792 return err 793 } 794 795 err = i.Enforcer.E.SavePolicy() 796 if err != nil { 797 return err 798 } 799 } 800 801 return nil 802} 803func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 804 did := e.Did 805 rkey := e.Commit.RKey 806 807 var err error 808 809 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 810 l.Info("ingesting record") 811 812 ddb, ok := i.Db.Execer.(*db.DB) 813 if !ok { 814 return fmt.Errorf("failed to index issue record, invalid db cast") 815 } 816 817 switch e.Commit.Operation { 818 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 819 raw := json.RawMessage(e.Commit.Record) 820 record := tangled.RepoIssue{} 821 err = json.Unmarshal(raw, &record) 822 if err != nil { 823 l.Error("invalid record", "err", err) 824 return err 825 } 826 827 issue := models.IssueFromRecord(did, rkey, record) 828 829 if err := issue.Validate(); err != nil { 830 return fmt.Errorf("failed to validate issue: %w", err) 831 } 832 833 tx, err := ddb.BeginTx(ctx, nil) 834 if err != nil { 835 l.Error("failed to begin transaction", "err", err) 836 return err 837 } 838 defer tx.Rollback() 839 840 err = db.PutIssue(tx, &issue) 841 if err != nil { 842 l.Error("failed to create issue", "err", err) 843 return err 844 } 845 846 err = tx.Commit() 847 if err != nil { 848 l.Error("failed to commit txn", "err", err) 849 return err 850 } 851 852 return nil 853 854 case jmodels.CommitOperationDelete: 855 tx, err := ddb.BeginTx(ctx, nil) 856 if err != nil { 857 l.Error("failed to begin transaction", "err", err) 858 return err 859 } 860 defer tx.Rollback() 861 862 if err := db.DeleteIssues( 863 tx, 864 did, 865 rkey, 866 ); err != nil { 867 l.Error("failed to delete", "err", err) 868 return fmt.Errorf("failed to delete issue record: %w", err) 869 } 870 if err := tx.Commit(); err != nil { 871 l.Error("failed to commit txn", "err", err) 872 return err 873 } 874 875 return nil 876 } 877 878 return nil 879} 880 881func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 882 did := e.Did 883 rkey := e.Commit.RKey 884 885 var err error 886 887 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 888 l.Info("ingesting record") 889 890 ddb, ok := i.Db.Execer.(*db.DB) 891 if !ok { 892 return fmt.Errorf("failed to index issue comment record, invalid db cast") 893 } 894 895 switch e.Commit.Operation { 896 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 897 raw := json.RawMessage(e.Commit.Record) 898 record := tangled.RepoIssueComment{} 899 err = json.Unmarshal(raw, &record) 900 if err != nil { 901 return fmt.Errorf("invalid record: %w", err) 902 } 903 904 comment, err := models.IssueCommentFromRecord(did, rkey, record) 905 if err != nil { 906 return fmt.Errorf("failed to parse comment from record: %w", err) 907 } 908 909 if err := comment.Validate(); err != nil { 910 return fmt.Errorf("failed to validate comment: %w", err) 911 } 912 913 tx, err := ddb.Begin() 914 if err != nil { 915 return fmt.Errorf("failed to start transaction: %w", err) 916 } 917 defer tx.Rollback() 918 919 _, err = db.AddIssueComment(tx, *comment) 920 if err != nil { 921 return fmt.Errorf("failed to create issue comment: %w", err) 922 } 923 924 return tx.Commit() 925 926 case jmodels.CommitOperationDelete: 927 if err := db.DeleteIssueComments( 928 ddb, 929 orm.FilterEq("did", did), 930 orm.FilterEq("rkey", rkey), 931 ); err != nil { 932 return fmt.Errorf("failed to delete issue comment record: %w", err) 933 } 934 935 return nil 936 } 937 938 return nil 939} 940 941func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 942 did := e.Did 943 rkey := e.Commit.RKey 944 945 var err error 946 947 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 948 l.Info("ingesting record") 949 950 ddb, ok := i.Db.Execer.(*db.DB) 951 if !ok { 952 return fmt.Errorf("failed to index label definition, invalid db cast") 953 } 954 955 switch e.Commit.Operation { 956 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 957 raw := json.RawMessage(e.Commit.Record) 958 record := tangled.LabelDefinition{} 959 err = json.Unmarshal(raw, &record) 960 if err != nil { 961 return fmt.Errorf("invalid record: %w", err) 962 } 963 964 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 965 if err != nil { 966 return fmt.Errorf("failed to parse labeldef from record: %w", err) 967 } 968 969 if err := def.Validate(); err != nil { 970 return fmt.Errorf("failed to validate labeldef: %w", err) 971 } 972 973 _, err = db.AddLabelDefinition(ddb, def) 974 if err != nil { 975 return fmt.Errorf("failed to create labeldef: %w", err) 976 } 977 978 return nil 979 980 case jmodels.CommitOperationDelete: 981 if err := db.DeleteLabelDefinition( 982 ddb, 983 orm.FilterEq("did", did), 984 orm.FilterEq("rkey", rkey), 985 ); err != nil { 986 return fmt.Errorf("failed to delete labeldef record: %w", err) 987 } 988 989 return nil 990 } 991 992 return nil 993} 994 995func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 996 did := e.Did 997 rkey := e.Commit.RKey 998 999 var err error 1000 1001 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1002 l.Info("ingesting record") 1003 1004 ddb, ok := i.Db.Execer.(*db.DB) 1005 if !ok { 1006 return fmt.Errorf("failed to index label op, invalid db cast") 1007 } 1008 1009 switch e.Commit.Operation { 1010 case jmodels.CommitOperationCreate: 1011 raw := json.RawMessage(e.Commit.Record) 1012 record := tangled.LabelOp{} 1013 err = json.Unmarshal(raw, &record) 1014 if err != nil { 1015 return fmt.Errorf("invalid record: %w", err) 1016 } 1017 1018 subject := syntax.ATURI(record.Subject) 1019 collection := subject.Collection() 1020 1021 var repo *models.Repo 1022 switch collection { 1023 case tangled.RepoIssueNSID: 1024 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1025 if err != nil || len(i) != 1 { 1026 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1027 } 1028 repo = i[0].Repo 1029 default: 1030 return fmt.Errorf("unsupport label subject: %s", collection) 1031 } 1032 1033 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1034 if err != nil { 1035 return fmt.Errorf("failed to build label application ctx: %w", err) 1036 } 1037 1038 ops := models.LabelOpsFromRecord(did, rkey, record) 1039 1040 for _, o := range ops { 1041 def, ok := actx.Defs[o.OperandKey] 1042 if !ok { 1043 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1044 } 1045 1046 // validate permissions: only collaborators can apply labels currently 1047 // 1048 // TODO: introduce a repo:triage permission 1049 ok, err := i.Enforcer.IsPushAllowed(o.Did, repo.Knot, repo.DidSlashRepo()) 1050 if err != nil { 1051 return fmt.Errorf("enforcing permission: %w", err) 1052 } 1053 if !ok { 1054 return fmt.Errorf("unauthorized label operation") 1055 } 1056 1057 if err := def.ValidateOperandValue(&o); err != nil { 1058 return fmt.Errorf("failed to validate labelop: %w", err) 1059 } 1060 } 1061 1062 tx, err := ddb.Begin() 1063 if err != nil { 1064 return err 1065 } 1066 defer tx.Rollback() 1067 1068 for _, o := range ops { 1069 _, err = db.AddLabelOp(tx, &o) 1070 if err != nil { 1071 return fmt.Errorf("failed to add labelop: %w", err) 1072 } 1073 } 1074 1075 if err = tx.Commit(); err != nil { 1076 return err 1077 } 1078 } 1079 1080 return nil 1081}