package ingester import ( "context" "encoding/json" "fmt" "log/slog" "github.com/bluesky-social/indigo/atproto/syntax" "tangled.org/core/api/tangled" "tangled.org/core/appview/config" "tangled.org/core/appview/db" "tangled.org/core/appview/models" "tangled.org/core/appview/notify" "tangled.org/core/log" "tangled.org/core/orm" "tangled.org/core/rbac2" "tangled.org/core/tapc" ) type Ingester struct { cfg *config.Config db *db.DB e *rbac2.Enforcer notifier notify.Notifier l *slog.Logger } // TODO: finish with rbac/v1 // TODO: just don't notify state events for now (we need full object) func (i *Ingester) ProcessEvent(ctx context.Context, evt tapc.Event) error { var err error switch evt.Type { case tapc.EvtRecord: revt := evt.Record ctx = log.IntoContext(ctx, i.l.With("record", revt.AtUri())) // NOTE: sort by alphabetical order switch revt.Collection.String() { case tangled.ActorProfileNSID: err = i.ingestActorProfile(ctx, revt) case tangled.FeedReactionNSID: err = i.ingestFeedReaction(ctx, revt) case tangled.FeedStarNSID: err = i.ingestFeedStar(ctx, revt) case tangled.GraphFollowNSID: err = i.ingestGraphFollow(ctx, revt) case tangled.KnotMemberNSID: err = i.ingestKnotMember(ctx, revt) case tangled.KnotNSID: err = i.ingestKnot(ctx, revt) case tangled.LabelDefinitionNSID: err = i.ingestLabelDefinition(ctx, revt) case tangled.LabelOpNSID: err = i.ingestLabelOp(ctx, revt) case tangled.PublicKeyNSID: err = i.ingestPublicKey(ctx, revt) case tangled.RepoArtifactNSID: err = i.ingestRepoArtifact(ctx, revt) case tangled.RepoIssueCommentNSID: err = i.ingestRepoIssueComment(ctx, revt) case tangled.RepoIssueNSID: err = i.ingestRepoIssue(ctx, revt) case tangled.RepoIssueStateNSID: err = i.ingestRepoIssueState(ctx, revt) case tangled.RepoNSID: err = i.ingestRepo(ctx, revt) case tangled.RepoPullCommentNSID: err = i.ingestRepoPullComment(ctx, revt) case tangled.RepoPullNSID: err = i.ingestRepoPull(ctx, revt) case tangled.RepoPullStatusNSID: err = i.ingestRepoPullStatus(ctx, revt) case tangled.SpindleMemberNSID: err = i.ingestSpindleMember(ctx, revt) case tangled.SpindleNSID: err = i.ingestSpindle(ctx, revt) case tangled.StringNSID: err = i.ingestString(ctx, revt) } case tapc.EvtIdentity: // no-op } if err != nil { i.l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) return err } return nil } func (i *Ingester) ingestActorProfile(ctx context.Context, evt *tapc.RecordEventData) error { // ignore invalid rkey if evt.Rkey.String() != "self" { return nil } switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.ActorProfile if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } profile, err := models.ProfileFromRecord(evt.Did, record) if err != nil { i.l.Warn("ignoring invalid profile record", "err", err) return nil } tx, err := i.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("starting transaction: %w", err) } defer tx.Rollback() if err := db.UpsertProfile(tx, &profile); err != nil { return fmt.Errorf("upserting profile: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("commiting profile upsert: %w", err) } case tapc.RecordDeleteAction: if err := db.DeleteProfile(i.db, evt.Did); err != nil { return fmt.Errorf("deleting profile from db: %w", err) } } return nil } func (i *Ingester) ingestFeedReaction(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.FeedReaction if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } reaction, err := models.ReactionFromRecord(evt.Did, evt.Rkey, record) if err != nil { i.l.Warn("ignoring invalid reaction record", "err", err) return nil } if err := db.UpsertReaction(i.db, reaction); err != nil { return fmt.Errorf("upserting reaction record") } case tapc.RecordDeleteAction: if err := db.DeleteReactionByRkey( i.db, evt.Did.String(), evt.Rkey.String(), ); err != nil { return fmt.Errorf("deleting reaction from db: %w", err) } } return nil } func (i *Ingester) ingestFeedStar(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.FeedStar if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } star, err := models.StarFromRecord(evt.Did, evt.Rkey, record) if err != nil { i.l.Warn("ignoring invalid star record", "err", err) return nil } if err := db.UpsertStar(i.db, star); err != nil { return fmt.Errorf("upserting star record") } case tapc.RecordDeleteAction: if err := db.DeleteStarByRkey( i.db, evt.Did.String(), evt.Rkey.String(), ); err != nil { return fmt.Errorf("deleting record from db: %w", err) } } return nil } func (i *Ingester) ingestGraphFollow(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.GraphFollow if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } follow, err := models.FollowFromRecord(evt.Did, evt.Rkey, record) if err != nil { i.l.Warn("ignoring invalid follow record", "err", err) return nil } if err := db.UpsertFollow(i.db, follow); err != nil { return fmt.Errorf("upserting follow record") } case tapc.RecordDeleteAction: if err := db.DeleteFollowByRkey( i.db, evt.Did.String(), evt.Rkey.String(), ); err != nil { return fmt.Errorf("deleting record from db: %w", err) } } return nil } // TODO: let's just remove the knot.member record func (i *Ingester) ingestKnotMember(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.KnotMember if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } panic("unimplemented") case tapc.RecordDeleteAction: panic("unimplemented") } return nil } func (i *Ingester) ingestKnot(ctx context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.Knot if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } domain := evt.Rkey.String() if err := db.AddKnot(i.db, domain, evt.Did.String()); err != nil { return fmt.Errorf("upserting knot: %w", err) } // TODO: hmmm should we run verification here? // There can be unverified knot in user profile. panic("unimplemented") case tapc.RecordDeleteAction: domain := evt.Rkey.String() // get record from db first registration, err := func(domain string, did syntax.DID) (models.Registration, error) { registrations, err := db.GetRegistrations( i.db, orm.FilterEq("domain", domain), orm.FilterEq("did", evt.Did), ) if err != nil { return models.Registration{}, err } if len(registrations) != 1 { return models.Registration{}, fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) } return registrations[0], nil }(domain, evt.Did) if err != nil { return fmt.Errorf("getting registration: %w", err) } tx, err := i.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("starting transaction: %w", err) } defer tx.Rollback() // TODO: rollback enforcer if err := db.DeleteKnot( tx, orm.FilterEq("did", evt.Did), orm.FilterEq("domain", domain), ); err != nil { return fmt.Errorf("deleting knot: %w", err) } if registration.Registered != nil { // TODO: clear from enforcer panic("unimplemented") } if err := tx.Commit(); err != nil { return fmt.Errorf("commiting transaction: %w", err) } } return nil } func (i *Ingester) ingestLabelDefinition(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.LabelDefinition if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } def, err := models.LabelDefinitionFromRecord(evt.Did.String(), evt.Rkey.String(), record) if err != nil { return fmt.Errorf("failed to parse labeldef from record: %w", err) } if err := def.Validate(); err != nil { i.l.Warn("ignoring invalid label def record", "err", err) return nil } if _, err := db.UpsertLabelDefinition(i.db, def); err != nil { return fmt.Errorf("upserting label definition") } case tapc.RecordDeleteAction: if err := db.DeleteLabelDefinition( i.db, orm.FilterEq("did", evt.Did), orm.FilterEq("rkey", evt.Rkey), ); err != nil { return fmt.Errorf("deleting record from db: %w", err) } } return nil } // TODO: label.op record is not designed to be mutable. should be reimplemented func (i *Ingester) ingestLabelOp(ctx context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction: var record tangled.LabelOp if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } // TODO: // 1. validate permissions // 2. labelOp.Subject -> (subject).Repo // 3. get all label definition for that repo, constructing actx ops := models.LabelOpsFromRecord(evt.Did.String(), evt.Rkey.String(), record) for _, o := range ops { // 4. find label def based on o.OperandKey (AT-URI to the label definition) // 5. validate labelOp from def panic("unimplemented") } tx, err := i.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("starting transaction: %w", err) } defer tx.Rollback() for _, o := range ops { _, err := db.AddLabelOp(tx, &o) if err != nil { return fmt.Errorf("adding label op: %w", err) } } if err := tx.Commit(); err != nil { return fmt.Errorf("commiting transaction: %w", err) } case tapc.RecordUpdateAction: // no-op. we are ignoring update action for label.op records case tapc.RecordDeleteAction: // no-op. we are ignoring delete action for label.op records } return nil } func (i *Ingester) ingestPublicKey(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.PublicKey if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } pubKey, err := models.PublicKeyFromRecord(evt.Did, evt.Rkey, record) if err != nil { i.l.Warn("ignoring invalid publicKey record", "err", err) return nil } if err := pubKey.Validate(); err != nil { i.l.Warn("ignoring invalid publicKey record", "err", err) return nil } if err := db.UpsertPublicKey(i.db, pubKey); err != nil { return fmt.Errorf("upserting publicKey record") } case tapc.RecordDeleteAction: if err := db.DeletePublicKeyByRkey( i.db, evt.Did.String(), evt.Rkey.String(), ); err != nil { return fmt.Errorf("deleting record from db: %w", err) } } return nil } // so this is one of the reasons why we need repo-DID // and possibly its own PDS. // for things need permission like repo artifact, even its not collaborative, // we need to pass the Knot to check the authority. // We cannot really distribute the arbitrary RBAC rules in meaningful way. // That's pretty hard and fragile. // Instead, when any data is belongs to the repo (no matter who created it), it should be owned // by the repo. I mean the actual data should be. // If someone removed the original artifact from their PDS, or if their PDS goes down, we lost // a way to backfill relateed data to construct the repo view. func (i *Ingester) ingestRepoArtifact(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.RepoArtifact if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } artifact, err := models.ArtifactFromRecord(evt.Did, evt.Rkey, record) if err != nil { i.l.Warn("ignoring invalid artifact record", "err", err) return nil } if err := db.UpsertArtifact(i.db, artifact); err != nil { return fmt.Errorf("upserting artifact: %w", err) } case tapc.RecordDeleteAction: if err := db.DeleteArtifact( i.db, orm.FilterEq("did", evt.Did), orm.FilterEq("rkey", evt.Rkey), ); err != nil { return fmt.Errorf("deleting record from db: %w", err) } } return nil } func (i *Ingester) ingestRepoIssueComment(ctx context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.RepoIssueComment if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } comment, err := models.IssueCommentFromRecord(evt.Did.String(), evt.Rkey.String(), record) if err != nil { i.l.Warn("ignoring invalid issue.comment record", "err", err) return nil } tx, err := i.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("starting transaction: %w", err) } defer tx.Rollback() if _, err = db.UpsertIssueComment(tx, *comment); err != nil { return fmt.Errorf("upserting issue comment: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("commiting transaction: %w", err) } case tapc.RecordDeleteAction: if err := db.DeleteIssueComments( i.db, orm.FilterEq("did", evt.Did), orm.FilterEq("rkey", evt.Rkey), ); err != nil { return fmt.Errorf("deleting issue comment: %w", err) } } return nil } func (i *Ingester) ingestRepoIssue(ctx context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.RepoIssue if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } issue := models.IssueFromRecord(evt.Did.String(), evt.Rkey.String(), record) if err := issue.Validate(); err != nil { i.l.Warn("ignoring invalid issue record", "err", err) return nil } tx, err := i.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("starting transaction: %w", err) } defer tx.Rollback() if err := db.PutIssue(tx, &issue); err != nil { return fmt.Errorf("upserting issue: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("commiting issue upsert: %w", err) } if evt.Action == tapc.RecordCreateAction { i.notifier.NewIssue(ctx, &issue, issue.Mentions) } case tapc.RecordDeleteAction: tx, err := i.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("starting transaction: %w", err) } defer tx.Rollback() if err := db.DeleteIssues( tx, evt.Did.String(), evt.Rkey.String(), ); err != nil { return fmt.Errorf("deleting issue: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("commiting issue delete: %w", err) } } return nil } func (i *Ingester) ingestRepoIssueState(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction: var record tangled.RepoIssueState if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } // TODO: check permission switch record.State { case tangled.RepoIssueStateOpen: if err := db.ReopenIssues( i.db, orm.FilterEq("at_uri", record.Issue), ); err != nil { return fmt.Errorf("opening issue: %w", err) } case tangled.RepoIssueStateClosed: if err := db.CloseIssues( i.db, orm.FilterEq("at_uri", record.Issue), ); err != nil { return fmt.Errorf("closing issue: %w", err) } default: return nil } // TODO: notify case tapc.RecordUpdateAction: // no-op case tapc.RecordDeleteAction: // no-op } return nil } func (i *Ingester) ingestRepo(ctx context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.Repo if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } repo, err := models.RepoFromRecord(evt.Did, evt.Rkey, record) if err != nil { i.l.Warn("ignoring invalid repo record", "err", err) return nil } tx, err := i.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("starting transaction: %w", err) } defer tx.Rollback() if err := db.UpsertRepo(tx, &repo); err != nil { return fmt.Errorf("upserting repo: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("commiting repo upsert: %w", err) } case tapc.RecordDeleteAction: if err := db.RemoveRepo(i.db, evt.Did, evt.Rkey); err != nil { return fmt.Errorf("deleting repo: %w", err) } } return nil } func (i *Ingester) ingestRepoPullComment(ctx context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.RepoPullComment if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } comment, err := models.PullCommentFromRecord(evt.Did, evt.Rkey, record) if err != nil { i.l.Warn("ignoring invalid issue.comment record", "err", err) return nil } tx, err := i.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("starting transaction: %w", err) } defer tx.Rollback() if err := db.UpsertPullComment(tx, &comment); err != nil { return fmt.Errorf("upserting pull comment: %w", err) } if err := tx.Commit(); err != nil { return fmt.Errorf("commiting transaction: %w", err) } if evt.Action == tapc.RecordCreateAction { i.notifier.NewPullComment(ctx, &comment, comment.Mentions) } case tapc.RecordDeleteAction: if err := db.DeletePullComments( i.db, orm.FilterEq("did", evt.Did), orm.FilterEq("rkey", evt.Rkey), ); err != nil { return fmt.Errorf("deleting pull comment: %w", err) } } return nil } func (i *Ingester) ingestRepoPull(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.RepoPull if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } panic("unimplemented") case tapc.RecordDeleteAction: panic("unimplemented") } return nil } func (i *Ingester) ingestRepoPullStatus(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction: var record tangled.RepoPullStatus if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } panic("unimplemented") case tapc.RecordUpdateAction: // no-op case tapc.RecordDeleteAction: // no-op } return nil } func (i *Ingester) ingestSpindleMember(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.SpindleMember if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } // TODO: let's just remove the spindle.member record panic("unimplemented") case tapc.RecordDeleteAction: panic("unimplemented") } return nil } func (i *Ingester) ingestSpindle(ctx context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction: var record tangled.Spindle if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } instance := evt.Rkey.String() if err := db.AddSpindle(i.db, models.Spindle{ Owner: evt.Did, Instance: instance, }); err != nil { return fmt.Errorf("adding spindle: %w", err) } panic("unimplemented") case tapc.RecordDeleteAction: instance := evt.Rkey.String() // get record from db first spindle, err := func(instance string, did syntax.DID) (models.Spindle, error) { spindles, err := db.GetSpindles( i.db, orm.FilterEq("owner", did), orm.FilterEq("instance", instance), ) if err != nil { return models.Spindle{}, err } if len(spindles) != 1 { return models.Spindle{}, fmt.Errorf("got incorret number of spindles: %d, expected 1", len(spindles)) } return spindles[0], nil }(instance, evt.Did) if err != nil { return fmt.Errorf("getting spindle: %w", err) } tx, err := i.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("starting transaction: %w", err) } defer tx.Rollback() // TODO: rollback enforcer // remove spindle members first if err := db.RemoveSpindleMember( tx, orm.FilterEq("owner", evt.Did), orm.FilterEq("instance", instance), ); err != nil { return fmt.Errorf("deleting spindle members: %w", err) } if err := db.DeleteSpindle( tx, orm.FilterEq("owner", evt.Did), orm.FilterEq("instance", instance), ); err != nil { return fmt.Errorf("deleting spindle: %w", err) } if spindle.Verified != nil { // TODO: clear from enforcer panic("unimplemented") } if err := tx.Commit(); err != nil { return fmt.Errorf("commiting transaction: %w", err) } } return nil } func (i *Ingester) ingestString(_ context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: var record tangled.String if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record json: %w", err) } string := models.StringFromRecord(evt.Did.String(), evt.Rkey.String(), record) if err := string.Validate(); err != nil { i.l.Warn("invalid record", "err", err) return nil } if err := db.UpsertString(i.db, string); err != nil { return fmt.Errorf("upserting string: %w", err) } return nil case tapc.RecordDeleteAction: if err := db.DeleteString( i.db, orm.FilterEq("did", evt.Did), orm.FilterEq("rkey", evt.Rkey), ); err != nil { return fmt.Errorf("deleting string: %w", err) } return nil } return nil }