A vibe coded tangled fork which supports pijul.
at 14cccfcf2c05afb13d84981482f0fdd9fe09232b 1091 lines 26 kB view raw
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log/slog" 8 "maps" 9 "slices" 10 11 "time" 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 "github.com/go-git/go-git/v5/plumbing" 16 "github.com/ipfs/go-cid" 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/serververify" 22 "tangled.org/core/idresolver" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25) 26 27type Ingester struct { 28 Db db.DbWrapper 29 Enforcer *rbac.Enforcer 30 IdResolver *idresolver.Resolver 31 Config *config.Config 32 Logger *slog.Logger 33} 34 35type processFunc func(ctx context.Context, e *jmodels.Event) error 36 37func (i *Ingester) Ingest() processFunc { 38 return func(ctx context.Context, e *jmodels.Event) error { 39 var err error 40 defer func() { 41 eventTime := e.TimeUS 42 lastTimeUs := eventTime + 1 43 if err := i.Db.SaveLastTimeUs(lastTimeUs); err != nil { 44 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 45 } 46 }() 47 48 l := i.Logger.With("kind", e.Kind) 49 switch e.Kind { 50 case jmodels.EventKindAccount: 51 if !e.Account.Active && *e.Account.Status == "deactivated" { 52 err = i.IdResolver.InvalidateIdent(ctx, e.Account.Did) 53 } 54 case jmodels.EventKindIdentity: 55 err = i.IdResolver.InvalidateIdent(ctx, e.Identity.Did) 56 case jmodels.EventKindCommit: 57 switch e.Commit.Collection { 58 case tangled.GraphFollowNSID: 59 err = i.ingestFollow(e) 60 case tangled.FeedStarNSID: 61 err = i.ingestStar(e) 62 case tangled.PublicKeyNSID: 63 err = i.ingestPublicKey(e) 64 case tangled.RepoArtifactNSID: 65 err = i.ingestArtifact(e) 66 case tangled.ActorProfileNSID: 67 err = i.ingestProfile(e) 68 case tangled.SpindleMemberNSID: 69 err = i.ingestSpindleMember(ctx, e) 70 case tangled.SpindleNSID: 71 err = i.ingestSpindle(ctx, e) 72 case tangled.KnotMemberNSID: 73 err = i.ingestKnotMember(e) 74 case tangled.KnotNSID: 75 err = i.ingestKnot(e) 76 case tangled.StringNSID: 77 err = i.ingestString(e) 78 case tangled.RepoIssueNSID: 79 err = i.ingestIssue(ctx, e) 80 case tangled.RepoIssueCommentNSID: 81 err = i.ingestIssueComment(e) 82 case tangled.LabelDefinitionNSID: 83 err = i.ingestLabelDefinition(e) 84 case tangled.LabelOpNSID: 85 err = i.ingestLabelOp(e) 86 } 87 l = i.Logger.With("nsid", e.Commit.Collection) 88 } 89 90 if err != nil { 91 l.Warn("refused to ingest record", "err", err) 92 } 93 94 return nil 95 } 96} 97 98func (i *Ingester) ingestStar(e *jmodels.Event) error { 99 var err error 100 did := e.Did 101 102 l := i.Logger.With("handler", "ingestStar") 103 l = l.With("nsid", e.Commit.Collection) 104 105 switch e.Commit.Operation { 106 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 107 var subjectUri syntax.ATURI 108 109 raw := json.RawMessage(e.Commit.Record) 110 record := tangled.FeedStar{} 111 err := json.Unmarshal(raw, &record) 112 if err != nil { 113 l.Error("invalid record", "err", err) 114 return err 115 } 116 117 subjectUri, err = syntax.ParseATURI(record.Subject) 118 if err != nil { 119 l.Error("invalid record", "err", err) 120 return err 121 } 122 err = db.UpsertStar(i.Db, models.Star{ 123 Did: did, 124 RepoAt: subjectUri, 125 Rkey: e.Commit.RKey, 126 }) 127 case jmodels.CommitOperationDelete: 128 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) 129 } 130 131 if err != nil { 132 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 133 } 134 l.Info("processed star", "operation", e.Commit.Operation, "rkey", e.Commit.RKey) 135 136 return nil 137} 138 139func (i *Ingester) ingestFollow(e *jmodels.Event) error { 140 var err error 141 did := e.Did 142 143 l := i.Logger.With("handler", "ingestFollow") 144 l = l.With("nsid", e.Commit.Collection) 145 146 switch e.Commit.Operation { 147 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 148 raw := json.RawMessage(e.Commit.Record) 149 record := tangled.GraphFollow{} 150 err = json.Unmarshal(raw, &record) 151 if err != nil { 152 l.Error("invalid record", "err", err) 153 return err 154 } 155 156 err = db.UpsertFollow(i.Db, models.Follow{ 157 UserDid: did, 158 SubjectDid: record.Subject, 159 Rkey: e.Commit.RKey, 160 }) 161 case jmodels.CommitOperationDelete: 162 err = db.DeleteFollowByRkey(i.Db, did, e.Commit.RKey) 163 } 164 165 if err != nil { 166 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 167 } 168 l.Info("processed follow", "operation", e.Commit.Operation, "rkey", e.Commit.RKey) 169 170 return nil 171} 172 173func (i *Ingester) ingestPublicKey(e *jmodels.Event) error { 174 did := e.Did 175 var err error 176 177 l := i.Logger.With("handler", "ingestPublicKey") 178 l = l.With("nsid", e.Commit.Collection) 179 180 switch e.Commit.Operation { 181 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 182 l.Debug("processing add of pubkey") 183 raw := json.RawMessage(e.Commit.Record) 184 record := tangled.PublicKey{} 185 err = json.Unmarshal(raw, &record) 186 if err != nil { 187 l.Error("invalid record", "err", err) 188 return err 189 } 190 pubKey, err := models.PublicKeyFromRecord(syntax.DID(did), syntax.RecordKey(e.Commit.RKey), record) 191 if err != nil { 192 l.Error("invalid record", "err", err) 193 return err 194 } 195 if err := pubKey.Validate(); err != nil { 196 l.Error("invalid record", "err", err) 197 return err 198 } 199 200 err = db.UpsertPublicKey(i.Db, pubKey) 201 case jmodels.CommitOperationDelete: 202 l.Debug("processing delete of pubkey") 203 err = db.DeletePublicKeyByRkey(i.Db, did, e.Commit.RKey) 204 } 205 206 if err != nil { 207 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 208 } 209 l.Info("processed pubkey", "operation", e.Commit.Operation, "rkey", e.Commit.RKey) 210 211 return nil 212} 213 214func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 215 did := e.Did 216 var err error 217 218 l := i.Logger.With("handler", "ingestArtifact") 219 l = l.With("nsid", e.Commit.Collection) 220 221 switch e.Commit.Operation { 222 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 223 raw := json.RawMessage(e.Commit.Record) 224 record := tangled.RepoArtifact{} 225 err = json.Unmarshal(raw, &record) 226 if err != nil { 227 l.Error("invalid record", "err", err) 228 return err 229 } 230 231 repoAt, err := syntax.ParseATURI(record.Repo) 232 if err != nil { 233 return err 234 } 235 236 repo, err := db.GetRepoByAtUri(i.Db, repoAt.String()) 237 if err != nil { 238 return err 239 } 240 241 ok, err := i.Enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 242 if err != nil || !ok { 243 return err 244 } 245 246 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 247 if err != nil { 248 createdAt = time.Now() 249 } 250 251 artifact := models.Artifact{ 252 Did: did, 253 Rkey: e.Commit.RKey, 254 RepoAt: repoAt, 255 Tag: plumbing.Hash(record.Tag), 256 CreatedAt: createdAt, 257 BlobCid: cid.Cid(record.Artifact.Ref), 258 Name: record.Name, 259 Size: uint64(record.Artifact.Size), 260 MimeType: record.Artifact.MimeType, 261 } 262 263 err = db.AddArtifact(i.Db, artifact) 264 case jmodels.CommitOperationDelete: 265 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 266 } 267 268 if err != nil { 269 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 270 } 271 272 return nil 273} 274 275func (i *Ingester) ingestProfile(e *jmodels.Event) error { 276 did := e.Did 277 var err error 278 279 l := i.Logger.With("handler", "ingestProfile") 280 l = l.With("nsid", e.Commit.Collection) 281 282 if e.Commit.RKey != "self" { 283 return fmt.Errorf("ingestProfile only ingests `self` record") 284 } 285 286 switch e.Commit.Operation { 287 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 288 raw := json.RawMessage(e.Commit.Record) 289 record := tangled.ActorProfile{} 290 err = json.Unmarshal(raw, &record) 291 if err != nil { 292 l.Error("invalid record", "err", err) 293 return err 294 } 295 296 avatar := "" 297 if record.Avatar != nil { 298 avatar = record.Avatar.Ref.String() 299 } 300 301 description := "" 302 if record.Description != nil { 303 description = *record.Description 304 } 305 306 includeBluesky := record.Bluesky 307 308 pronouns := "" 309 if record.Pronouns != nil { 310 pronouns = *record.Pronouns 311 } 312 313 location := "" 314 if record.Location != nil { 315 location = *record.Location 316 } 317 318 var links [5]string 319 for i, l := range record.Links { 320 if i < 5 { 321 links[i] = l 322 } 323 } 324 325 var stats [2]models.VanityStat 326 for i, s := range record.Stats { 327 if i < 2 { 328 stats[i].Kind = models.VanityStatKind(s) 329 } 330 } 331 332 var pinned [6]syntax.ATURI 333 for i, r := range record.PinnedRepositories { 334 if i < 6 { 335 pinned[i] = syntax.ATURI(r) 336 } 337 } 338 339 profile := models.Profile{ 340 Did: did, 341 Avatar: avatar, 342 Description: description, 343 IncludeBluesky: includeBluesky, 344 Location: location, 345 Links: links, 346 Stats: stats, 347 PinnedRepos: pinned, 348 Pronouns: pronouns, 349 } 350 351 ddb, ok := i.Db.Execer.(*db.DB) 352 if !ok { 353 return fmt.Errorf("failed to index profile record, invalid db cast") 354 } 355 356 tx, err := ddb.Begin() 357 if err != nil { 358 return fmt.Errorf("failed to start transaction") 359 } 360 defer tx.Rollback() 361 362 err = db.ValidateProfile(tx, &profile) 363 if err != nil { 364 return fmt.Errorf("invalid profile record") 365 } 366 367 err = db.UpsertProfile(tx, &profile) 368 if err != nil { 369 return fmt.Errorf("upserting profile: %w", err) 370 } 371 372 err = tx.Commit() 373 case jmodels.CommitOperationDelete: 374 err = db.DeleteArtifact(i.Db, orm.FilterEq("did", did), orm.FilterEq("rkey", e.Commit.RKey)) 375 } 376 377 if err != nil { 378 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 379 } 380 381 return nil 382} 383 384func (i *Ingester) ingestSpindleMember(ctx context.Context, e *jmodels.Event) error { 385 did := e.Did 386 var err error 387 388 l := i.Logger.With("handler", "ingestSpindleMember") 389 l = l.With("nsid", e.Commit.Collection) 390 391 switch e.Commit.Operation { 392 case jmodels.CommitOperationCreate: 393 raw := json.RawMessage(e.Commit.Record) 394 record := tangled.SpindleMember{} 395 err = json.Unmarshal(raw, &record) 396 if err != nil { 397 l.Error("invalid record", "err", err) 398 return err 399 } 400 401 // only spindle owner can invite to spindles 402 ok, err := i.Enforcer.IsSpindleInviteAllowed(did, record.Instance) 403 if err != nil || !ok { 404 return fmt.Errorf("failed to enforce permissions: %w", err) 405 } 406 407 memberId, err := i.IdResolver.ResolveIdent(ctx, record.Subject) 408 if err != nil { 409 return err 410 } 411 412 if memberId.Handle.IsInvalidHandle() { 413 return err 414 } 415 416 ddb, ok := i.Db.Execer.(*db.DB) 417 if !ok { 418 return fmt.Errorf("failed to index profile record, invalid db cast") 419 } 420 421 err = db.AddSpindleMember(ddb, models.SpindleMember{ 422 Did: syntax.DID(did), 423 Rkey: e.Commit.RKey, 424 Instance: record.Instance, 425 Subject: memberId.DID, 426 }) 427 if !ok { 428 return fmt.Errorf("failed to add to db: %w", err) 429 } 430 431 err = i.Enforcer.AddSpindleMember(record.Instance, memberId.DID.String()) 432 if err != nil { 433 return fmt.Errorf("failed to update ACLs: %w", err) 434 } 435 436 l.Info("added spindle member") 437 case jmodels.CommitOperationDelete: 438 rkey := e.Commit.RKey 439 440 ddb, ok := i.Db.Execer.(*db.DB) 441 if !ok { 442 return fmt.Errorf("failed to index profile record, invalid db cast") 443 } 444 445 // get record from db first 446 members, err := db.GetSpindleMembers( 447 ddb, 448 orm.FilterEq("did", did), 449 orm.FilterEq("rkey", rkey), 450 ) 451 if err != nil || len(members) != 1 { 452 return fmt.Errorf("failed to get member: %w, len(members) = %d", err, len(members)) 453 } 454 member := members[0] 455 456 tx, err := ddb.Begin() 457 if err != nil { 458 return fmt.Errorf("failed to start txn: %w", err) 459 } 460 461 // remove record by rkey && update enforcer 462 if err = db.RemoveSpindleMember( 463 tx, 464 orm.FilterEq("did", did), 465 orm.FilterEq("rkey", rkey), 466 ); err != nil { 467 return fmt.Errorf("failed to remove from db: %w", err) 468 } 469 470 // update enforcer 471 err = i.Enforcer.RemoveSpindleMember(member.Instance, member.Subject.String()) 472 if err != nil { 473 return fmt.Errorf("failed to update ACLs: %w", err) 474 } 475 476 if err = tx.Commit(); err != nil { 477 return fmt.Errorf("failed to commit txn: %w", err) 478 } 479 480 if err = i.Enforcer.E.SavePolicy(); err != nil { 481 return fmt.Errorf("failed to save ACLs: %w", err) 482 } 483 484 l.Info("removed spindle member") 485 } 486 487 return nil 488} 489 490func (i *Ingester) ingestSpindle(ctx context.Context, e *jmodels.Event) error { 491 did := e.Did 492 var err error 493 494 l := i.Logger.With("handler", "ingestSpindle") 495 l = l.With("nsid", e.Commit.Collection) 496 497 switch e.Commit.Operation { 498 case jmodels.CommitOperationCreate: 499 raw := json.RawMessage(e.Commit.Record) 500 record := tangled.Spindle{} 501 err = json.Unmarshal(raw, &record) 502 if err != nil { 503 l.Error("invalid record", "err", err) 504 return err 505 } 506 507 instance := e.Commit.RKey 508 509 ddb, ok := i.Db.Execer.(*db.DB) 510 if !ok { 511 return fmt.Errorf("failed to index profile record, invalid db cast") 512 } 513 514 err := db.AddSpindle(ddb, models.Spindle{ 515 Owner: syntax.DID(did), 516 Instance: instance, 517 }) 518 if err != nil { 519 l.Error("failed to add spindle to db", "err", err, "instance", instance) 520 return err 521 } 522 523 err = serververify.RunVerification(ctx, instance, did, i.Config.Core.Dev) 524 if err != nil { 525 l.Error("failed to add spindle to db", "err", err, "instance", instance) 526 return err 527 } 528 529 _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 530 if err != nil { 531 return fmt.Errorf("failed to mark verified: %w", err) 532 } 533 534 return nil 535 536 case jmodels.CommitOperationDelete: 537 instance := e.Commit.RKey 538 539 ddb, ok := i.Db.Execer.(*db.DB) 540 if !ok { 541 return fmt.Errorf("failed to index profile record, invalid db cast") 542 } 543 544 // get record from db first 545 spindles, err := db.GetSpindles( 546 ddb, 547 orm.FilterEq("owner", did), 548 orm.FilterEq("instance", instance), 549 ) 550 if err != nil || len(spindles) != 1 { 551 return fmt.Errorf("failed to get spindles: %w, len(spindles) = %d", err, len(spindles)) 552 } 553 spindle := spindles[0] 554 555 tx, err := ddb.Begin() 556 if err != nil { 557 return err 558 } 559 defer func() { 560 tx.Rollback() 561 i.Enforcer.E.LoadPolicy() 562 }() 563 564 // remove spindle members first 565 err = db.RemoveSpindleMember( 566 tx, 567 orm.FilterEq("owner", did), 568 orm.FilterEq("instance", instance), 569 ) 570 if err != nil { 571 return err 572 } 573 574 err = db.DeleteSpindle( 575 tx, 576 orm.FilterEq("owner", did), 577 orm.FilterEq("instance", instance), 578 ) 579 if err != nil { 580 return err 581 } 582 583 if spindle.Verified != nil { 584 err = i.Enforcer.RemoveSpindle(instance) 585 if err != nil { 586 return err 587 } 588 } 589 590 err = tx.Commit() 591 if err != nil { 592 return err 593 } 594 595 err = i.Enforcer.E.SavePolicy() 596 if err != nil { 597 return err 598 } 599 } 600 601 return nil 602} 603 604func (i *Ingester) ingestString(e *jmodels.Event) error { 605 did := e.Did 606 rkey := e.Commit.RKey 607 608 var err error 609 610 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 611 l.Info("ingesting record") 612 613 ddb, ok := i.Db.Execer.(*db.DB) 614 if !ok { 615 return fmt.Errorf("failed to index string record, invalid db cast") 616 } 617 618 switch e.Commit.Operation { 619 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 620 raw := json.RawMessage(e.Commit.Record) 621 record := tangled.String{} 622 err = json.Unmarshal(raw, &record) 623 if err != nil { 624 l.Error("invalid record", "err", err) 625 return err 626 } 627 628 string := models.StringFromRecord(did, rkey, record) 629 630 if err = string.Validate(); err != nil { 631 l.Error("invalid record", "err", err) 632 return err 633 } 634 635 if err = db.AddString(ddb, string); err != nil { 636 l.Error("failed to add string", "err", err) 637 return err 638 } 639 640 return nil 641 642 case jmodels.CommitOperationDelete: 643 if err := db.DeleteString( 644 ddb, 645 orm.FilterEq("did", did), 646 orm.FilterEq("rkey", rkey), 647 ); err != nil { 648 l.Error("failed to delete", "err", err) 649 return fmt.Errorf("failed to delete string record: %w", err) 650 } 651 652 return nil 653 } 654 655 return nil 656} 657 658func (i *Ingester) ingestKnotMember(e *jmodels.Event) error { 659 did := e.Did 660 var err error 661 662 l := i.Logger.With("handler", "ingestKnotMember") 663 l = l.With("nsid", e.Commit.Collection) 664 665 switch e.Commit.Operation { 666 case jmodels.CommitOperationCreate: 667 raw := json.RawMessage(e.Commit.Record) 668 record := tangled.KnotMember{} 669 err = json.Unmarshal(raw, &record) 670 if err != nil { 671 l.Error("invalid record", "err", err) 672 return err 673 } 674 675 // only knot owner can invite to knots 676 ok, err := i.Enforcer.IsKnotInviteAllowed(did, record.Domain) 677 if err != nil || !ok { 678 return fmt.Errorf("failed to enforce permissions: %w", err) 679 } 680 681 memberId, err := i.IdResolver.ResolveIdent(context.Background(), record.Subject) 682 if err != nil { 683 return err 684 } 685 686 if memberId.Handle.IsInvalidHandle() { 687 return err 688 } 689 690 err = i.Enforcer.AddKnotMember(record.Domain, memberId.DID.String()) 691 if err != nil { 692 return fmt.Errorf("failed to update ACLs: %w", err) 693 } 694 695 l.Info("added knot member") 696 case jmodels.CommitOperationDelete: 697 // we don't store knot members in a table (like we do for spindle) 698 // and we can't remove this just yet. possibly fixed if we switch 699 // to either: 700 // 1. a knot_members table like with spindle and store the rkey 701 // 2. use the knot host as the rkey 702 // 703 // TODO: implement member deletion 704 l.Info("skipping knot member delete", "did", did, "rkey", e.Commit.RKey) 705 } 706 707 return nil 708} 709 710func (i *Ingester) ingestKnot(e *jmodels.Event) error { 711 did := e.Did 712 var err error 713 714 l := i.Logger.With("handler", "ingestKnot") 715 l = l.With("nsid", e.Commit.Collection) 716 717 switch e.Commit.Operation { 718 case jmodels.CommitOperationCreate: 719 raw := json.RawMessage(e.Commit.Record) 720 record := tangled.Knot{} 721 err = json.Unmarshal(raw, &record) 722 if err != nil { 723 l.Error("invalid record", "err", err) 724 return err 725 } 726 727 domain := e.Commit.RKey 728 729 ddb, ok := i.Db.Execer.(*db.DB) 730 if !ok { 731 return fmt.Errorf("failed to index profile record, invalid db cast") 732 } 733 734 err := db.AddKnot(ddb, domain, did) 735 if err != nil { 736 l.Error("failed to add knot to db", "err", err, "domain", domain) 737 return err 738 } 739 740 err = serververify.RunVerification(context.Background(), domain, did, i.Config.Core.Dev) 741 if err != nil { 742 l.Error("failed to verify knot", "err", err, "domain", domain) 743 return err 744 } 745 746 err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 747 if err != nil { 748 return fmt.Errorf("failed to mark verified: %w", err) 749 } 750 751 return nil 752 753 case jmodels.CommitOperationDelete: 754 domain := e.Commit.RKey 755 756 ddb, ok := i.Db.Execer.(*db.DB) 757 if !ok { 758 return fmt.Errorf("failed to index knot record, invalid db cast") 759 } 760 761 // get record from db first 762 registrations, err := db.GetRegistrations( 763 ddb, 764 orm.FilterEq("domain", domain), 765 orm.FilterEq("did", did), 766 ) 767 if err != nil { 768 return fmt.Errorf("failed to get registration: %w", err) 769 } 770 if len(registrations) != 1 { 771 return fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 772 } 773 registration := registrations[0] 774 775 tx, err := ddb.Begin() 776 if err != nil { 777 return err 778 } 779 defer func() { 780 tx.Rollback() 781 i.Enforcer.E.LoadPolicy() 782 }() 783 784 err = db.DeleteKnot( 785 tx, 786 orm.FilterEq("did", did), 787 orm.FilterEq("domain", domain), 788 ) 789 if err != nil { 790 return err 791 } 792 793 if registration.Registered != nil { 794 err = i.Enforcer.RemoveKnot(domain) 795 if err != nil { 796 return err 797 } 798 } 799 800 err = tx.Commit() 801 if err != nil { 802 return err 803 } 804 805 err = i.Enforcer.E.SavePolicy() 806 if err != nil { 807 return err 808 } 809 } 810 811 return nil 812} 813func (i *Ingester) ingestIssue(ctx context.Context, e *jmodels.Event) error { 814 did := e.Did 815 rkey := e.Commit.RKey 816 817 var err error 818 819 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 820 l.Info("ingesting record") 821 822 ddb, ok := i.Db.Execer.(*db.DB) 823 if !ok { 824 return fmt.Errorf("failed to index issue record, invalid db cast") 825 } 826 827 switch e.Commit.Operation { 828 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 829 raw := json.RawMessage(e.Commit.Record) 830 record := tangled.RepoIssue{} 831 err = json.Unmarshal(raw, &record) 832 if err != nil { 833 l.Error("invalid record", "err", err) 834 return err 835 } 836 837 issue := models.IssueFromRecord(did, rkey, record) 838 839 if err := issue.Validate(); err != nil { 840 return fmt.Errorf("failed to validate issue: %w", err) 841 } 842 843 tx, err := ddb.BeginTx(ctx, nil) 844 if err != nil { 845 l.Error("failed to begin transaction", "err", err) 846 return err 847 } 848 defer tx.Rollback() 849 850 err = db.PutIssue(tx, &issue) 851 if err != nil { 852 l.Error("failed to create issue", "err", err) 853 return err 854 } 855 856 err = tx.Commit() 857 if err != nil { 858 l.Error("failed to commit txn", "err", err) 859 return err 860 } 861 862 return nil 863 864 case jmodels.CommitOperationDelete: 865 tx, err := ddb.BeginTx(ctx, nil) 866 if err != nil { 867 l.Error("failed to begin transaction", "err", err) 868 return err 869 } 870 defer tx.Rollback() 871 872 if err := db.DeleteIssues( 873 tx, 874 did, 875 rkey, 876 ); err != nil { 877 l.Error("failed to delete", "err", err) 878 return fmt.Errorf("failed to delete issue record: %w", err) 879 } 880 if err := tx.Commit(); err != nil { 881 l.Error("failed to commit txn", "err", err) 882 return err 883 } 884 885 return nil 886 } 887 888 return nil 889} 890 891func (i *Ingester) ingestIssueComment(e *jmodels.Event) error { 892 did := e.Did 893 rkey := e.Commit.RKey 894 895 var err error 896 897 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 898 l.Info("ingesting record") 899 900 ddb, ok := i.Db.Execer.(*db.DB) 901 if !ok { 902 return fmt.Errorf("failed to index issue comment record, invalid db cast") 903 } 904 905 switch e.Commit.Operation { 906 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 907 raw := json.RawMessage(e.Commit.Record) 908 record := tangled.RepoIssueComment{} 909 err = json.Unmarshal(raw, &record) 910 if err != nil { 911 return fmt.Errorf("invalid record: %w", err) 912 } 913 914 comment, err := models.IssueCommentFromRecord(did, rkey, record) 915 if err != nil { 916 return fmt.Errorf("failed to parse comment from record: %w", err) 917 } 918 919 if err := comment.Validate(); err != nil { 920 return fmt.Errorf("failed to validate comment: %w", err) 921 } 922 923 tx, err := ddb.Begin() 924 if err != nil { 925 return fmt.Errorf("failed to start transaction: %w", err) 926 } 927 defer tx.Rollback() 928 929 _, err = db.AddIssueComment(tx, *comment) 930 if err != nil { 931 return fmt.Errorf("failed to create issue comment: %w", err) 932 } 933 934 return tx.Commit() 935 936 case jmodels.CommitOperationDelete: 937 if err := db.DeleteIssueComments( 938 ddb, 939 orm.FilterEq("did", did), 940 orm.FilterEq("rkey", rkey), 941 ); err != nil { 942 return fmt.Errorf("failed to delete issue comment record: %w", err) 943 } 944 945 return nil 946 } 947 948 return nil 949} 950 951func (i *Ingester) ingestLabelDefinition(e *jmodels.Event) error { 952 did := e.Did 953 rkey := e.Commit.RKey 954 955 var err error 956 957 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 958 l.Info("ingesting record") 959 960 ddb, ok := i.Db.Execer.(*db.DB) 961 if !ok { 962 return fmt.Errorf("failed to index label definition, invalid db cast") 963 } 964 965 switch e.Commit.Operation { 966 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 967 raw := json.RawMessage(e.Commit.Record) 968 record := tangled.LabelDefinition{} 969 err = json.Unmarshal(raw, &record) 970 if err != nil { 971 return fmt.Errorf("invalid record: %w", err) 972 } 973 974 def, err := models.LabelDefinitionFromRecord(did, rkey, record) 975 if err != nil { 976 return fmt.Errorf("failed to parse labeldef from record: %w", err) 977 } 978 979 if err := def.Validate(); err != nil { 980 return fmt.Errorf("failed to validate labeldef: %w", err) 981 } 982 983 _, err = db.AddLabelDefinition(ddb, def) 984 if err != nil { 985 return fmt.Errorf("failed to create labeldef: %w", err) 986 } 987 988 return nil 989 990 case jmodels.CommitOperationDelete: 991 if err := db.DeleteLabelDefinition( 992 ddb, 993 orm.FilterEq("did", did), 994 orm.FilterEq("rkey", rkey), 995 ); err != nil { 996 return fmt.Errorf("failed to delete labeldef record: %w", err) 997 } 998 999 return nil 1000 } 1001 1002 return nil 1003} 1004 1005func (i *Ingester) ingestLabelOp(e *jmodels.Event) error { 1006 did := e.Did 1007 rkey := e.Commit.RKey 1008 1009 var err error 1010 1011 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1012 l.Info("ingesting record") 1013 1014 ddb, ok := i.Db.Execer.(*db.DB) 1015 if !ok { 1016 return fmt.Errorf("failed to index label op, invalid db cast") 1017 } 1018 1019 switch e.Commit.Operation { 1020 case jmodels.CommitOperationCreate: 1021 raw := json.RawMessage(e.Commit.Record) 1022 record := tangled.LabelOp{} 1023 err = json.Unmarshal(raw, &record) 1024 if err != nil { 1025 return fmt.Errorf("invalid record: %w", err) 1026 } 1027 1028 subject := syntax.ATURI(record.Subject) 1029 collection := subject.Collection() 1030 1031 var repo *models.Repo 1032 switch collection { 1033 case tangled.RepoIssueNSID: 1034 i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1035 if err != nil || len(i) != 1 { 1036 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1037 } 1038 repo = i[0].Repo 1039 default: 1040 return fmt.Errorf("unsupport label subject: %s", collection) 1041 } 1042 1043 actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1044 if err != nil { 1045 return fmt.Errorf("failed to build label application ctx: %w", err) 1046 } 1047 1048 ops := models.LabelOpsFromRecord(did, rkey, record) 1049 1050 for _, o := range ops { 1051 def, ok := actx.Defs[o.OperandKey] 1052 if !ok { 1053 return fmt.Errorf("failed to find label def for key: %s, expected: %q", o.OperandKey, slices.Collect(maps.Keys(actx.Defs))) 1054 } 1055 1056 // validate permissions: only collaborators can apply labels currently 1057 // 1058 // TODO: introduce a repo:triage permission 1059 ok, err := i.Enforcer.IsPushAllowed(o.Did, repo.Knot, repo.DidSlashRepo()) 1060 if err != nil { 1061 return fmt.Errorf("enforcing permission: %w", err) 1062 } 1063 if !ok { 1064 return fmt.Errorf("unauthorized label operation") 1065 } 1066 1067 if err := def.ValidateOperandValue(&o); err != nil { 1068 return fmt.Errorf("failed to validate labelop: %w", err) 1069 } 1070 } 1071 1072 tx, err := ddb.Begin() 1073 if err != nil { 1074 return err 1075 } 1076 defer tx.Rollback() 1077 1078 for _, o := range ops { 1079 _, err = db.AddLabelOp(tx, &o) 1080 if err != nil { 1081 return fmt.Errorf("failed to add labelop: %w", err) 1082 } 1083 } 1084 1085 if err = tx.Commit(); err != nil { 1086 return err 1087 } 1088 } 1089 1090 return nil 1091}