A vibe coded tangled fork which supports pijul.

wip: appview: migrate to tap ingester with partial backfill support

still WIP
DO NOT MERGE

Signed-off-by: Seongmin Lee <git@boltless.me>

+1024 -4
+4
appview/db/artifact.go
··· 11 11 "tangled.org/core/orm" 12 12 ) 13 13 14 + func UpsertArtifact(e Execer, artifact models.Artifact) error { 15 + panic("unimplemented") 16 + } 17 + 14 18 func AddArtifact(e Execer, artifact models.Artifact) error { 15 19 _, err := e.Exec( 16 20 `insert or ignore into artifacts (
+4
appview/db/issues.go
··· 295 295 return GetIssuesPaginated(e, pagination.Page{}, filters...) 296 296 } 297 297 298 + func UpsertIssueComment(tx *sql.Tx, c models.IssueComment) (int64, error) { 299 + panic("unimplemented") 300 + } 301 + 298 302 func AddIssueComment(tx *sql.Tx, c models.IssueComment) (int64, error) { 299 303 result, err := tx.Exec( 300 304 `insert into issue_comments (
+4
appview/db/label.go
··· 13 13 "tangled.org/core/orm" 14 14 ) 15 15 16 + func UpsertLabelDefinition(e Execer, l *models.LabelDefinition) (int64, error) { 17 + panic("unimplemented") 18 + } 19 + 16 20 // no updating type for now 17 21 func AddLabelDefinition(e Execer, l *models.LabelDefinition) (int64, error) { 18 22 result, err := e.Exec(
+5
appview/db/profile.go
··· 229 229 return nil 230 230 } 231 231 232 + func DeleteProfile(e Execer, did syntax.DID) error { 233 + _, err := e.Exec(`delete from profiles where did = ?`, did) 234 + return err 235 + } 236 + 232 237 func GetProfiles(e Execer, filters ...orm.Filter) (map[string]*models.Profile, error) { 233 238 var conditions []string 234 239 var args []any
+4
appview/db/pulls.go
··· 585 585 return pulls, nil 586 586 } 587 587 588 + func UpsertPullComment(tx *sql.Tx, comment *models.PullComment) error { 589 + panic("unimplemented") 590 + } 591 + 588 592 func NewPullComment(tx *sql.Tx, comment *models.PullComment) (int64, error) { 589 593 query := `insert into pull_comments (owner_did, repo_at, submission_id, comment_at, pull_id, body) values (?, ?, ?, ?, ?, ?)` 590 594 res, err := tx.Exec(
+6 -2
appview/db/repos.go
··· 386 386 return err 387 387 } 388 388 389 + func UpsertRepo(tx *sql.Tx, repo *models.Repo) error { 390 + panic("unimplemented") 391 + } 392 + 389 393 func AddRepo(tx *sql.Tx, repo *models.Repo) error { 390 394 _, err := tx.Exec( 391 395 `insert into repos ··· 409 413 return nil 410 414 } 411 415 412 - func RemoveRepo(e Execer, did, name string) error { 413 - _, err := e.Exec(`delete from repos where did = ? and name = ?`, did, name) 416 + func RemoveRepo(e Execer, did syntax.DID, rkey syntax.RecordKey) error { 417 + _, err := e.Exec(`delete from repos where did = ? and rkey = ?`, did, rkey) 414 418 return err 415 419 } 416 420
+4
appview/db/strings.go
··· 11 11 "tangled.org/core/orm" 12 12 ) 13 13 14 + func UpsertString(e Execer, s models.String) error { 15 + panic("unimplemented") 16 + } 17 + 14 18 func AddString(e Execer, s models.String) error { 15 19 _, err := e.Exec( 16 20 `insert into strings (
+808
appview/ingester/ingester.go
··· 1 + package ingester 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log/slog" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/api/tangled" 11 + "tangled.org/core/appview/config" 12 + "tangled.org/core/appview/db" 13 + "tangled.org/core/appview/models" 14 + "tangled.org/core/appview/notify" 15 + "tangled.org/core/log" 16 + "tangled.org/core/orm" 17 + "tangled.org/core/rbac2" 18 + "tangled.org/core/tapc" 19 + ) 20 + 21 + type Ingester struct { 22 + cfg *config.Config 23 + db *db.DB 24 + e *rbac2.Enforcer 25 + notifier notify.Notifier 26 + l *slog.Logger 27 + } 28 + 29 + // TODO: finish with rbac/v1 30 + // TODO: just don't notify state events for now (we need full object) 31 + 32 + func (i *Ingester) ProcessEvent(ctx context.Context, evt tapc.Event) error { 33 + var err error 34 + switch evt.Type { 35 + case tapc.EvtRecord: 36 + revt := evt.Record 37 + ctx = log.IntoContext(ctx, i.l.With("record", revt.AtUri())) 38 + // NOTE: sort by alphabetical order 39 + switch revt.Collection.String() { 40 + case tangled.ActorProfileNSID: 41 + err = i.ingestActorProfile(ctx, revt) 42 + case tangled.FeedReactionNSID: 43 + err = i.ingestFeedReaction(ctx, revt) 44 + case tangled.FeedStarNSID: 45 + err = i.ingestFeedStar(ctx, revt) 46 + case tangled.GraphFollowNSID: 47 + err = i.ingestGraphFollow(ctx, revt) 48 + case tangled.KnotMemberNSID: 49 + err = i.ingestKnotMember(ctx, revt) 50 + case tangled.KnotNSID: 51 + err = i.ingestKnot(ctx, revt) 52 + case tangled.LabelDefinitionNSID: 53 + err = i.ingestLabelDefinition(ctx, revt) 54 + case tangled.LabelOpNSID: 55 + err = i.ingestLabelOp(ctx, revt) 56 + case tangled.PublicKeyNSID: 57 + err = i.ingestPublicKey(ctx, revt) 58 + case tangled.RepoArtifactNSID: 59 + err = i.ingestRepoArtifact(ctx, revt) 60 + case tangled.RepoIssueCommentNSID: 61 + err = i.ingestRepoIssueComment(ctx, revt) 62 + case tangled.RepoIssueNSID: 63 + err = i.ingestRepoIssue(ctx, revt) 64 + case tangled.RepoIssueStateNSID: 65 + err = i.ingestRepoIssueState(ctx, revt) 66 + case tangled.RepoNSID: 67 + err = i.ingestRepo(ctx, revt) 68 + case tangled.RepoPullCommentNSID: 69 + err = i.ingestRepoPullComment(ctx, revt) 70 + case tangled.RepoPullNSID: 71 + err = i.ingestRepoPull(ctx, revt) 72 + case tangled.RepoPullStatusNSID: 73 + err = i.ingestRepoPullStatus(ctx, revt) 74 + case tangled.SpindleMemberNSID: 75 + err = i.ingestSpindleMember(ctx, revt) 76 + case tangled.SpindleNSID: 77 + err = i.ingestSpindle(ctx, revt) 78 + case tangled.StringNSID: 79 + err = i.ingestString(ctx, revt) 80 + } 81 + case tapc.EvtIdentity: 82 + // no-op 83 + } 84 + 85 + if err != nil { 86 + i.l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 87 + return err 88 + } 89 + return nil 90 + } 91 + 92 + func (i *Ingester) ingestActorProfile(ctx context.Context, evt *tapc.RecordEventData) error { 93 + // ignore invalid rkey 94 + if evt.Rkey.String() != "self" { 95 + return nil 96 + } 97 + 98 + switch evt.Action { 99 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 100 + var record tangled.ActorProfile 101 + if err := json.Unmarshal(evt.Record, &record); err != nil { 102 + return fmt.Errorf("parsing record json: %w", err) 103 + } 104 + 105 + profile, err := models.ProfileFromRecord(evt.Did, record) 106 + if err != nil { 107 + i.l.Warn("ignoring invalid profile record", "err", err) 108 + return nil 109 + } 110 + 111 + tx, err := i.db.BeginTx(ctx, nil) 112 + if err != nil { 113 + return fmt.Errorf("starting transaction: %w", err) 114 + } 115 + defer tx.Rollback() 116 + 117 + if err := db.UpsertProfile(tx, &profile); err != nil { 118 + return fmt.Errorf("upserting profile: %w", err) 119 + } 120 + 121 + if err := tx.Commit(); err != nil { 122 + return fmt.Errorf("commiting profile upsert: %w", err) 123 + } 124 + case tapc.RecordDeleteAction: 125 + if err := db.DeleteProfile(i.db, evt.Did); err != nil { 126 + return fmt.Errorf("deleting profile from db: %w", err) 127 + } 128 + } 129 + return nil 130 + } 131 + 132 + func (i *Ingester) ingestFeedReaction(_ context.Context, evt *tapc.RecordEventData) error { 133 + switch evt.Action { 134 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 135 + var record tangled.FeedReaction 136 + if err := json.Unmarshal(evt.Record, &record); err != nil { 137 + return fmt.Errorf("parsing record json: %w", err) 138 + } 139 + 140 + reaction, err := models.ReactionFromRecord(evt.Did, evt.Rkey, record) 141 + if err != nil { 142 + i.l.Warn("ignoring invalid reaction record", "err", err) 143 + return nil 144 + } 145 + 146 + if err := db.UpsertReaction(i.db, reaction); err != nil { 147 + return fmt.Errorf("upserting reaction record") 148 + } 149 + case tapc.RecordDeleteAction: 150 + if err := db.DeleteReactionByRkey( 151 + i.db, 152 + evt.Did.String(), 153 + evt.Rkey.String(), 154 + ); err != nil { 155 + return fmt.Errorf("deleting reaction from db: %w", err) 156 + } 157 + } 158 + return nil 159 + } 160 + 161 + func (i *Ingester) ingestFeedStar(_ context.Context, evt *tapc.RecordEventData) error { 162 + switch evt.Action { 163 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 164 + var record tangled.FeedStar 165 + if err := json.Unmarshal(evt.Record, &record); err != nil { 166 + return fmt.Errorf("parsing record json: %w", err) 167 + } 168 + 169 + star, err := models.StarFromRecord(evt.Did, evt.Rkey, record) 170 + if err != nil { 171 + i.l.Warn("ignoring invalid star record", "err", err) 172 + return nil 173 + } 174 + 175 + if err := db.UpsertStar(i.db, star); err != nil { 176 + return fmt.Errorf("upserting star record") 177 + } 178 + case tapc.RecordDeleteAction: 179 + if err := db.DeleteStarByRkey( 180 + i.db, 181 + evt.Did.String(), 182 + evt.Rkey.String(), 183 + ); err != nil { 184 + return fmt.Errorf("deleting record from db: %w", err) 185 + } 186 + } 187 + return nil 188 + } 189 + 190 + func (i *Ingester) ingestGraphFollow(_ context.Context, evt *tapc.RecordEventData) error { 191 + switch evt.Action { 192 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 193 + var record tangled.GraphFollow 194 + if err := json.Unmarshal(evt.Record, &record); err != nil { 195 + return fmt.Errorf("parsing record json: %w", err) 196 + } 197 + 198 + follow, err := models.FollowFromRecord(evt.Did, evt.Rkey, record) 199 + if err != nil { 200 + i.l.Warn("ignoring invalid follow record", "err", err) 201 + return nil 202 + } 203 + 204 + if err := db.UpsertFollow(i.db, follow); err != nil { 205 + return fmt.Errorf("upserting follow record") 206 + } 207 + case tapc.RecordDeleteAction: 208 + if err := db.DeleteFollowByRkey( 209 + i.db, 210 + evt.Did.String(), 211 + evt.Rkey.String(), 212 + ); err != nil { 213 + return fmt.Errorf("deleting record from db: %w", err) 214 + } 215 + } 216 + return nil 217 + } 218 + 219 + // TODO: let's just remove the knot.member record 220 + func (i *Ingester) ingestKnotMember(_ context.Context, evt *tapc.RecordEventData) error { 221 + switch evt.Action { 222 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 223 + var record tangled.KnotMember 224 + if err := json.Unmarshal(evt.Record, &record); err != nil { 225 + return fmt.Errorf("parsing record json: %w", err) 226 + } 227 + panic("unimplemented") 228 + case tapc.RecordDeleteAction: 229 + panic("unimplemented") 230 + } 231 + return nil 232 + } 233 + 234 + func (i *Ingester) ingestKnot(ctx context.Context, evt *tapc.RecordEventData) error { 235 + switch evt.Action { 236 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 237 + var record tangled.Knot 238 + if err := json.Unmarshal(evt.Record, &record); err != nil { 239 + return fmt.Errorf("parsing record json: %w", err) 240 + } 241 + 242 + domain := evt.Rkey.String() 243 + 244 + if err := db.AddKnot(i.db, domain, evt.Did.String()); err != nil { 245 + return fmt.Errorf("upserting knot: %w", err) 246 + } 247 + 248 + // TODO: hmmm should we run verification here? 249 + // There can be unverified knot in user profile. 250 + panic("unimplemented") 251 + case tapc.RecordDeleteAction: 252 + domain := evt.Rkey.String() 253 + 254 + // get record from db first 255 + registration, err := func(domain string, did syntax.DID) (models.Registration, error) { 256 + registrations, err := db.GetRegistrations( 257 + i.db, 258 + orm.FilterEq("domain", domain), 259 + orm.FilterEq("did", evt.Did), 260 + ) 261 + if err != nil { 262 + return models.Registration{}, err 263 + } 264 + if len(registrations) != 1 { 265 + return models.Registration{}, fmt.Errorf("got incorret number of registrations: %d, expected 1", len(registrations)) 266 + } 267 + return registrations[0], nil 268 + }(domain, evt.Did) 269 + if err != nil { 270 + return fmt.Errorf("getting registration: %w", err) 271 + } 272 + 273 + tx, err := i.db.BeginTx(ctx, nil) 274 + if err != nil { 275 + return fmt.Errorf("starting transaction: %w", err) 276 + } 277 + defer tx.Rollback() 278 + // TODO: rollback enforcer 279 + 280 + if err := db.DeleteKnot( 281 + tx, 282 + orm.FilterEq("did", evt.Did), 283 + orm.FilterEq("domain", domain), 284 + ); err != nil { 285 + return fmt.Errorf("deleting knot: %w", err) 286 + } 287 + 288 + if registration.Registered != nil { 289 + // TODO: clear from enforcer 290 + panic("unimplemented") 291 + } 292 + 293 + if err := tx.Commit(); err != nil { 294 + return fmt.Errorf("commiting transaction: %w", err) 295 + } 296 + } 297 + return nil 298 + } 299 + 300 + func (i *Ingester) ingestLabelDefinition(_ context.Context, evt *tapc.RecordEventData) error { 301 + switch evt.Action { 302 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 303 + var record tangled.LabelDefinition 304 + if err := json.Unmarshal(evt.Record, &record); err != nil { 305 + return fmt.Errorf("parsing record json: %w", err) 306 + } 307 + 308 + def, err := models.LabelDefinitionFromRecord(evt.Did.String(), evt.Rkey.String(), record) 309 + if err != nil { 310 + return fmt.Errorf("failed to parse labeldef from record: %w", err) 311 + } 312 + 313 + if err := def.Validate(); err != nil { 314 + i.l.Warn("ignoring invalid label def record", "err", err) 315 + return nil 316 + } 317 + 318 + if _, err := db.UpsertLabelDefinition(i.db, def); err != nil { 319 + return fmt.Errorf("upserting label definition") 320 + } 321 + case tapc.RecordDeleteAction: 322 + if err := db.DeleteLabelDefinition( 323 + i.db, 324 + orm.FilterEq("did", evt.Did), 325 + orm.FilterEq("rkey", evt.Rkey), 326 + ); err != nil { 327 + return fmt.Errorf("deleting record from db: %w", err) 328 + } 329 + } 330 + return nil 331 + } 332 + 333 + // TODO: label.op record is not designed to be mutable. should be reimplemented 334 + func (i *Ingester) ingestLabelOp(ctx context.Context, evt *tapc.RecordEventData) error { 335 + switch evt.Action { 336 + case tapc.RecordCreateAction: 337 + var record tangled.LabelOp 338 + if err := json.Unmarshal(evt.Record, &record); err != nil { 339 + return fmt.Errorf("parsing record json: %w", err) 340 + } 341 + 342 + // TODO: 343 + // 1. validate permissions 344 + // 2. labelOp.Subject -> (subject).Repo 345 + // 3. get all label definition for that repo, constructing actx 346 + ops := models.LabelOpsFromRecord(evt.Did.String(), evt.Rkey.String(), record) 347 + for _, o := range ops { 348 + // 4. find label def based on o.OperandKey (AT-URI to the label definition) 349 + // 5. validate labelOp from def 350 + panic("unimplemented") 351 + } 352 + 353 + tx, err := i.db.BeginTx(ctx, nil) 354 + if err != nil { 355 + return fmt.Errorf("starting transaction: %w", err) 356 + } 357 + defer tx.Rollback() 358 + 359 + for _, o := range ops { 360 + _, err := db.AddLabelOp(tx, &o) 361 + if err != nil { 362 + return fmt.Errorf("adding label op: %w", err) 363 + } 364 + } 365 + 366 + if err := tx.Commit(); err != nil { 367 + return fmt.Errorf("commiting transaction: %w", err) 368 + } 369 + case tapc.RecordUpdateAction: 370 + // no-op. we are ignoring update action for label.op records 371 + case tapc.RecordDeleteAction: 372 + // no-op. we are ignoring delete action for label.op records 373 + } 374 + return nil 375 + } 376 + 377 + func (i *Ingester) ingestPublicKey(_ context.Context, evt *tapc.RecordEventData) error { 378 + switch evt.Action { 379 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 380 + var record tangled.PublicKey 381 + if err := json.Unmarshal(evt.Record, &record); err != nil { 382 + return fmt.Errorf("parsing record json: %w", err) 383 + } 384 + 385 + pubKey, err := models.PublicKeyFromRecord(evt.Did, evt.Rkey, record) 386 + if err != nil { 387 + i.l.Warn("ignoring invalid publicKey record", "err", err) 388 + return nil 389 + } 390 + if err := pubKey.Validate(); err != nil { 391 + i.l.Warn("ignoring invalid publicKey record", "err", err) 392 + return nil 393 + } 394 + 395 + if err := db.UpsertPublicKey(i.db, pubKey); err != nil { 396 + return fmt.Errorf("upserting publicKey record") 397 + } 398 + case tapc.RecordDeleteAction: 399 + if err := db.DeletePublicKeyByRkey( 400 + i.db, 401 + evt.Did.String(), 402 + evt.Rkey.String(), 403 + ); err != nil { 404 + return fmt.Errorf("deleting record from db: %w", err) 405 + } 406 + } 407 + return nil 408 + } 409 + 410 + // so this is one of the reasons why we need repo-DID 411 + // and possibly its own PDS. 412 + // for things need permission like repo artifact, even its not collaborative, 413 + // we need to pass the Knot to check the authority. 414 + // We cannot really distribute the arbitrary RBAC rules in meaningful way. 415 + // That's pretty hard and fragile. 416 + // Instead, when any data is belongs to the repo (no matter who created it), it should be owned 417 + // by the repo. I mean the actual data should be. 418 + // If someone removed the original artifact from their PDS, or if their PDS goes down, we lost 419 + // a way to backfill relateed data to construct the repo view. 420 + 421 + func (i *Ingester) ingestRepoArtifact(_ context.Context, evt *tapc.RecordEventData) error { 422 + switch evt.Action { 423 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 424 + var record tangled.RepoArtifact 425 + if err := json.Unmarshal(evt.Record, &record); err != nil { 426 + return fmt.Errorf("parsing record json: %w", err) 427 + } 428 + 429 + artifact, err := models.ArtifactFromRecord(evt.Did, evt.Rkey, record) 430 + if err != nil { 431 + i.l.Warn("ignoring invalid artifact record", "err", err) 432 + return nil 433 + } 434 + 435 + if err := db.UpsertArtifact(i.db, artifact); err != nil { 436 + return fmt.Errorf("upserting artifact: %w", err) 437 + } 438 + case tapc.RecordDeleteAction: 439 + if err := db.DeleteArtifact( 440 + i.db, 441 + orm.FilterEq("did", evt.Did), 442 + orm.FilterEq("rkey", evt.Rkey), 443 + ); err != nil { 444 + return fmt.Errorf("deleting record from db: %w", err) 445 + } 446 + } 447 + return nil 448 + } 449 + 450 + func (i *Ingester) ingestRepoIssueComment(ctx context.Context, evt *tapc.RecordEventData) error { 451 + switch evt.Action { 452 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 453 + var record tangled.RepoIssueComment 454 + if err := json.Unmarshal(evt.Record, &record); err != nil { 455 + return fmt.Errorf("parsing record json: %w", err) 456 + } 457 + 458 + comment, err := models.IssueCommentFromRecord(evt.Did.String(), evt.Rkey.String(), record) 459 + if err != nil { 460 + i.l.Warn("ignoring invalid issue.comment record", "err", err) 461 + return nil 462 + } 463 + 464 + tx, err := i.db.BeginTx(ctx, nil) 465 + if err != nil { 466 + return fmt.Errorf("starting transaction: %w", err) 467 + } 468 + defer tx.Rollback() 469 + 470 + if _, err = db.UpsertIssueComment(tx, *comment); err != nil { 471 + return fmt.Errorf("upserting issue comment: %w", err) 472 + } 473 + 474 + if err := tx.Commit(); err != nil { 475 + return fmt.Errorf("commiting transaction: %w", err) 476 + } 477 + case tapc.RecordDeleteAction: 478 + if err := db.DeleteIssueComments( 479 + i.db, 480 + orm.FilterEq("did", evt.Did), 481 + orm.FilterEq("rkey", evt.Rkey), 482 + ); err != nil { 483 + return fmt.Errorf("deleting issue comment: %w", err) 484 + } 485 + } 486 + return nil 487 + } 488 + 489 + func (i *Ingester) ingestRepoIssue(ctx context.Context, evt *tapc.RecordEventData) error { 490 + switch evt.Action { 491 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 492 + var record tangled.RepoIssue 493 + if err := json.Unmarshal(evt.Record, &record); err != nil { 494 + return fmt.Errorf("parsing record json: %w", err) 495 + } 496 + 497 + issue := models.IssueFromRecord(evt.Did.String(), evt.Rkey.String(), record) 498 + if err := issue.Validate(); err != nil { 499 + i.l.Warn("ignoring invalid issue record", "err", err) 500 + return nil 501 + } 502 + 503 + tx, err := i.db.BeginTx(ctx, nil) 504 + if err != nil { 505 + return fmt.Errorf("starting transaction: %w", err) 506 + } 507 + defer tx.Rollback() 508 + 509 + if err := db.PutIssue(tx, &issue); err != nil { 510 + return fmt.Errorf("upserting issue: %w", err) 511 + } 512 + 513 + if err := tx.Commit(); err != nil { 514 + return fmt.Errorf("commiting issue upsert: %w", err) 515 + } 516 + 517 + if evt.Action == tapc.RecordCreateAction { 518 + i.notifier.NewIssue(ctx, &issue, issue.Mentions) 519 + } 520 + case tapc.RecordDeleteAction: 521 + tx, err := i.db.BeginTx(ctx, nil) 522 + if err != nil { 523 + return fmt.Errorf("starting transaction: %w", err) 524 + } 525 + defer tx.Rollback() 526 + 527 + if err := db.DeleteIssues( 528 + tx, 529 + evt.Did.String(), 530 + evt.Rkey.String(), 531 + ); err != nil { 532 + return fmt.Errorf("deleting issue: %w", err) 533 + } 534 + 535 + if err := tx.Commit(); err != nil { 536 + return fmt.Errorf("commiting issue delete: %w", err) 537 + } 538 + } 539 + return nil 540 + } 541 + 542 + func (i *Ingester) ingestRepoIssueState(_ context.Context, evt *tapc.RecordEventData) error { 543 + switch evt.Action { 544 + case tapc.RecordCreateAction: 545 + var record tangled.RepoIssueState 546 + if err := json.Unmarshal(evt.Record, &record); err != nil { 547 + return fmt.Errorf("parsing record json: %w", err) 548 + } 549 + 550 + // TODO: check permission 551 + 552 + switch record.State { 553 + case tangled.RepoIssueStateOpen: 554 + if err := db.ReopenIssues( 555 + i.db, 556 + orm.FilterEq("at_uri", record.Issue), 557 + ); err != nil { 558 + return fmt.Errorf("opening issue: %w", err) 559 + } 560 + case tangled.RepoIssueStateClosed: 561 + if err := db.CloseIssues( 562 + i.db, 563 + orm.FilterEq("at_uri", record.Issue), 564 + ); err != nil { 565 + return fmt.Errorf("closing issue: %w", err) 566 + } 567 + default: 568 + return nil 569 + } 570 + 571 + // TODO: notify 572 + case tapc.RecordUpdateAction: 573 + // no-op 574 + case tapc.RecordDeleteAction: 575 + // no-op 576 + } 577 + return nil 578 + } 579 + 580 + func (i *Ingester) ingestRepo(ctx context.Context, evt *tapc.RecordEventData) error { 581 + switch evt.Action { 582 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 583 + var record tangled.Repo 584 + if err := json.Unmarshal(evt.Record, &record); err != nil { 585 + return fmt.Errorf("parsing record json: %w", err) 586 + } 587 + 588 + repo, err := models.RepoFromRecord(evt.Did, evt.Rkey, record) 589 + if err != nil { 590 + i.l.Warn("ignoring invalid repo record", "err", err) 591 + return nil 592 + } 593 + 594 + tx, err := i.db.BeginTx(ctx, nil) 595 + if err != nil { 596 + return fmt.Errorf("starting transaction: %w", err) 597 + } 598 + defer tx.Rollback() 599 + 600 + if err := db.UpsertRepo(tx, &repo); err != nil { 601 + return fmt.Errorf("upserting repo: %w", err) 602 + } 603 + 604 + if err := tx.Commit(); err != nil { 605 + return fmt.Errorf("commiting repo upsert: %w", err) 606 + } 607 + case tapc.RecordDeleteAction: 608 + if err := db.RemoveRepo(i.db, evt.Did, evt.Rkey); err != nil { 609 + return fmt.Errorf("deleting repo: %w", err) 610 + } 611 + } 612 + return nil 613 + } 614 + 615 + func (i *Ingester) ingestRepoPullComment(ctx context.Context, evt *tapc.RecordEventData) error { 616 + switch evt.Action { 617 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 618 + var record tangled.RepoPullComment 619 + if err := json.Unmarshal(evt.Record, &record); err != nil { 620 + return fmt.Errorf("parsing record json: %w", err) 621 + } 622 + 623 + comment, err := models.PullCommentFromRecord(evt.Did, evt.Rkey, record) 624 + if err != nil { 625 + i.l.Warn("ignoring invalid issue.comment record", "err", err) 626 + return nil 627 + } 628 + 629 + tx, err := i.db.BeginTx(ctx, nil) 630 + if err != nil { 631 + return fmt.Errorf("starting transaction: %w", err) 632 + } 633 + defer tx.Rollback() 634 + 635 + if err := db.UpsertPullComment(tx, &comment); err != nil { 636 + return fmt.Errorf("upserting pull comment: %w", err) 637 + } 638 + 639 + if err := tx.Commit(); err != nil { 640 + return fmt.Errorf("commiting transaction: %w", err) 641 + } 642 + 643 + if evt.Action == tapc.RecordCreateAction { 644 + i.notifier.NewPullComment(ctx, &comment, comment.Mentions) 645 + } 646 + case tapc.RecordDeleteAction: 647 + if err := db.DeletePullComments( 648 + i.db, 649 + orm.FilterEq("did", evt.Did), 650 + orm.FilterEq("rkey", evt.Rkey), 651 + ); err != nil { 652 + return fmt.Errorf("deleting pull comment: %w", err) 653 + } 654 + } 655 + return nil 656 + } 657 + 658 + func (i *Ingester) ingestRepoPull(_ context.Context, evt *tapc.RecordEventData) error { 659 + switch evt.Action { 660 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 661 + var record tangled.RepoPull 662 + if err := json.Unmarshal(evt.Record, &record); err != nil { 663 + return fmt.Errorf("parsing record json: %w", err) 664 + } 665 + panic("unimplemented") 666 + case tapc.RecordDeleteAction: 667 + panic("unimplemented") 668 + } 669 + return nil 670 + } 671 + 672 + func (i *Ingester) ingestRepoPullStatus(_ context.Context, evt *tapc.RecordEventData) error { 673 + switch evt.Action { 674 + case tapc.RecordCreateAction: 675 + var record tangled.RepoPullStatus 676 + if err := json.Unmarshal(evt.Record, &record); err != nil { 677 + return fmt.Errorf("parsing record json: %w", err) 678 + } 679 + panic("unimplemented") 680 + case tapc.RecordUpdateAction: 681 + // no-op 682 + case tapc.RecordDeleteAction: 683 + // no-op 684 + } 685 + return nil 686 + } 687 + 688 + func (i *Ingester) ingestSpindleMember(_ context.Context, evt *tapc.RecordEventData) error { 689 + switch evt.Action { 690 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 691 + var record tangled.SpindleMember 692 + if err := json.Unmarshal(evt.Record, &record); err != nil { 693 + return fmt.Errorf("parsing record json: %w", err) 694 + } 695 + // TODO: let's just remove the spindle.member record 696 + panic("unimplemented") 697 + case tapc.RecordDeleteAction: 698 + panic("unimplemented") 699 + } 700 + return nil 701 + } 702 + 703 + func (i *Ingester) ingestSpindle(ctx context.Context, evt *tapc.RecordEventData) error { 704 + switch evt.Action { 705 + case tapc.RecordCreateAction: 706 + var record tangled.Spindle 707 + if err := json.Unmarshal(evt.Record, &record); err != nil { 708 + return fmt.Errorf("parsing record json: %w", err) 709 + } 710 + instance := evt.Rkey.String() 711 + 712 + if err := db.AddSpindle(i.db, models.Spindle{ 713 + Owner: evt.Did, 714 + Instance: instance, 715 + }); err != nil { 716 + return fmt.Errorf("adding spindle: %w", err) 717 + } 718 + 719 + panic("unimplemented") 720 + case tapc.RecordDeleteAction: 721 + instance := evt.Rkey.String() 722 + 723 + // get record from db first 724 + spindle, err := func(instance string, did syntax.DID) (models.Spindle, error) { 725 + spindles, err := db.GetSpindles( 726 + i.db, 727 + orm.FilterEq("owner", did), 728 + orm.FilterEq("instance", instance), 729 + ) 730 + if err != nil { 731 + return models.Spindle{}, err 732 + } 733 + if len(spindles) != 1 { 734 + return models.Spindle{}, fmt.Errorf("got incorret number of spindles: %d, expected 1", len(spindles)) 735 + } 736 + return spindles[0], nil 737 + }(instance, evt.Did) 738 + if err != nil { 739 + return fmt.Errorf("getting spindle: %w", err) 740 + } 741 + 742 + tx, err := i.db.BeginTx(ctx, nil) 743 + if err != nil { 744 + return fmt.Errorf("starting transaction: %w", err) 745 + } 746 + defer tx.Rollback() 747 + // TODO: rollback enforcer 748 + 749 + // remove spindle members first 750 + if err := db.RemoveSpindleMember( 751 + tx, 752 + orm.FilterEq("owner", evt.Did), 753 + orm.FilterEq("instance", instance), 754 + ); err != nil { 755 + return fmt.Errorf("deleting spindle members: %w", err) 756 + } 757 + if err := db.DeleteSpindle( 758 + tx, 759 + orm.FilterEq("owner", evt.Did), 760 + orm.FilterEq("instance", instance), 761 + ); err != nil { 762 + return fmt.Errorf("deleting spindle: %w", err) 763 + } 764 + 765 + if spindle.Verified != nil { 766 + // TODO: clear from enforcer 767 + panic("unimplemented") 768 + } 769 + 770 + if err := tx.Commit(); err != nil { 771 + return fmt.Errorf("commiting transaction: %w", err) 772 + } 773 + } 774 + return nil 775 + } 776 + 777 + func (i *Ingester) ingestString(_ context.Context, evt *tapc.RecordEventData) error { 778 + switch evt.Action { 779 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 780 + var record tangled.String 781 + if err := json.Unmarshal(evt.Record, &record); err != nil { 782 + return fmt.Errorf("parsing record json: %w", err) 783 + } 784 + 785 + string := models.StringFromRecord(evt.Did.String(), evt.Rkey.String(), record) 786 + if err := string.Validate(); err != nil { 787 + i.l.Warn("invalid record", "err", err) 788 + return nil 789 + } 790 + 791 + if err := db.UpsertString(i.db, string); err != nil { 792 + return fmt.Errorf("upserting string: %w", err) 793 + } 794 + 795 + return nil 796 + case tapc.RecordDeleteAction: 797 + if err := db.DeleteString( 798 + i.db, 799 + orm.FilterEq("did", evt.Did), 800 + orm.FilterEq("rkey", evt.Rkey), 801 + ); err != nil { 802 + return fmt.Errorf("deleting string: %w", err) 803 + } 804 + 805 + return nil 806 + } 807 + return nil 808 + }
+25 -1
appview/models/artifact.go
··· 25 25 MimeType string 26 26 } 27 27 28 - func (a *Artifact) ArtifactAt() syntax.ATURI { 28 + func (a *Artifact) AtUri() syntax.ATURI { 29 29 return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", a.Did, tangled.RepoArtifactNSID, a.Rkey)) 30 30 } 31 + 32 + func ArtifactFromRecord(did syntax.DID, rkey syntax.RecordKey, record tangled.RepoArtifact) (Artifact, error) { 33 + // validate atproto record 34 + repoAt, err := syntax.ParseATURI(record.Repo) 35 + if err != nil { 36 + return Artifact{}, fmt.Errorf("invalid record %T: %w", record, fmt.Errorf("repo should be valid at-uri: %w", err)) 37 + } 38 + created, err := time.Parse(time.RFC3339, record.CreatedAt) 39 + if err != nil { 40 + return Artifact{}, fmt.Errorf("invalid record %T: %w", record, fmt.Errorf("invalid time format '%s'", record.CreatedAt)) 41 + } 42 + 43 + return Artifact{ 44 + Did: did.String(), 45 + Rkey: rkey.String(), 46 + RepoAt: repoAt, 47 + Tag: plumbing.Hash(record.Tag), 48 + CreatedAt: created, 49 + BlobCid: cid.Cid(record.Artifact.Ref), 50 + Name: record.Name, 51 + Size: uint64(record.Artifact.Size), 52 + MimeType: record.Artifact.MimeType, 53 + }, nil 54 + }
+21
appview/models/follow.go
··· 1 1 package models 2 2 3 3 import ( 4 + "fmt" 4 5 "time" 5 6 7 + "github.com/bluesky-social/indigo/atproto/syntax" 6 8 "tangled.org/core/api/tangled" 7 9 ) 8 10 ··· 18 20 Subject: f.SubjectDid, 19 21 CreatedAt: f.FollowedAt.Format(time.RFC3339), 20 22 } 23 + } 24 + 25 + func FollowFromRecord(did syntax.DID, rkey syntax.RecordKey, record tangled.GraphFollow) (Follow, error) { 26 + subjectDid, err := syntax.ParseDID(record.Subject) 27 + if err != nil { 28 + return Follow{}, fmt.Errorf("subject should be valid did: %w", err) 29 + } 30 + 31 + created, err := time.Parse(time.RFC3339, record.CreatedAt) 32 + if err != nil { 33 + return Follow{}, fmt.Errorf("invalid time format '%s'", record.CreatedAt) 34 + } 35 + 36 + return Follow{ 37 + UserDid: did.String(), 38 + Rkey: rkey.String(), 39 + SubjectDid: subjectDid.String(), 40 + FollowedAt: created, 41 + }, nil 21 42 } 22 43 23 44 type FollowStats struct {
+84
appview/models/profile.go
··· 50 50 return true 51 51 } 52 52 53 + // ProfileFromRecord will validate the atproto record and convert it to [Profile]. 54 + // It can return error for invalid records. 55 + func ProfileFromRecord(did syntax.DID, record tangled.ActorProfile) (Profile, error) { 56 + // validate atproto record 57 + if err := func(record tangled.ActorProfile) error { 58 + // validate description 59 + if record.Description != nil && len(*record.Description) > 256 { 60 + return fmt.Errorf("bio is too long") 61 + } 62 + 63 + // validate links 64 + if len(record.Links) > 5 { 65 + return fmt.Errorf("links cannot be more than 5") 66 + } 67 + 68 + // validate location 69 + if record.Location != nil && len(*record.Location) > 256 { 70 + return fmt.Errorf("location is too long") 71 + } 72 + 73 + // validate pinnedRepositories 74 + if len(record.PinnedRepositories) >= 5 { 75 + return fmt.Errorf("pinnedRepositories cannot be more than 6") 76 + } 77 + for i, v := range record.PinnedRepositories { 78 + if _, err := syntax.ParseATURI(v); err != nil { 79 + return fmt.Errorf("invalid at-uri at pinnedRepositories[%d]: %w", i, err) 80 + } 81 + } 82 + 83 + // validate pronouns 84 + if record.Pronouns != nil && len(*record.Pronouns) > 40 { 85 + return fmt.Errorf("pronouns are too long") 86 + } 87 + 88 + // validate stats 89 + if len(record.Stats) > 2 { 90 + return fmt.Errorf("stats cannot be more than 2") 91 + } 92 + for i, v := range record.Stats { 93 + if VanityStatKind(v).String() == "" { 94 + return fmt.Errorf("unknown stat kind '%s' at stats[%d]", v, i) 95 + } 96 + } 97 + return nil 98 + }(record); err != nil { 99 + return Profile{}, fmt.Errorf("invalid record %T: %w", record, err) 100 + } 101 + 102 + p := Profile{Did: did.String()} 103 + 104 + if record.Description != nil { 105 + p.Description = *record.Description 106 + } 107 + 108 + p.IncludeBluesky = record.Bluesky 109 + 110 + if record.Location != nil { 111 + p.Location = *record.Location 112 + } 113 + 114 + copy(p.Links[:], record.Links) 115 + 116 + for i, s := range record.Stats { 117 + if i >= 2 { 118 + break 119 + } 120 + p.Stats[i].Kind = VanityStatKind(s) 121 + } 122 + 123 + for i, r := range record.PinnedRepositories { 124 + if i >= 6 { 125 + break 126 + } 127 + p.PinnedRepos[i] = syntax.ATURI(r) 128 + } 129 + 130 + if record.Pronouns != nil { 131 + p.Pronouns = *record.Pronouns 132 + } 133 + 134 + return p, nil 135 + } 136 + 53 137 type VanityStatKind string 54 138 55 139 const (
+4
appview/models/pull.go
··· 171 171 return syntax.ATURI(p.CommentAt) 172 172 } 173 173 174 + func PullCommentFromRecord(did syntax.DID, rkey syntax.RecordKey, record tangled.RepoPullComment) (PullComment, error) { 175 + panic("unimplemented") 176 + } 177 + 174 178 func (p *Pull) TotalComments() int { 175 179 total := 0 176 180 for _, s := range p.Submissions {
+26
appview/models/reaction.go
··· 1 1 package models 2 2 3 3 import ( 4 + "fmt" 4 5 "time" 5 6 6 7 "github.com/bluesky-social/indigo/atproto/syntax" ··· 63 64 Reaction: r.Kind.String(), 64 65 CreatedAt: r.Created.Format(time.RFC3339), 65 66 } 67 + } 68 + 69 + func ReactionFromRecord(did syntax.DID, rkey syntax.RecordKey, record tangled.FeedReaction) (Reaction, error) { 70 + subjectAt, err := syntax.ParseATURI(record.Subject) 71 + if err != nil { 72 + return Reaction{}, fmt.Errorf("subject should be valid at-uri: %w", err) 73 + } 74 + 75 + kind, ok := ParseReactionKind(record.Reaction) 76 + if !ok { 77 + return Reaction{}, fmt.Errorf("invalid reaction kind '%s'", record.Reaction) 78 + } 79 + 80 + created, err := time.Parse(time.RFC3339, record.CreatedAt) 81 + if err != nil { 82 + return Reaction{}, fmt.Errorf("invalid time format '%s'", record.CreatedAt) 83 + } 84 + 85 + return Reaction{ 86 + ReactedByDid: did.String(), 87 + Rkey: rkey.String(), 88 + ThreadAt: subjectAt, 89 + Kind: kind, 90 + Created: created, 91 + }, nil 66 92 } 67 93 68 94 type ReactionDisplayData struct {
+4
appview/models/repo.go
··· 30 30 Source string 31 31 } 32 32 33 + func RepoFromRecord(did syntax.DID, rkey syntax.RecordKey, record tangled.Repo) (Repo, error) { 34 + panic("unimplemented") 35 + } 36 + 33 37 func (r *Repo) AsRecord() tangled.Repo { 34 38 var source, spindle, description, website *string 35 39
+20
appview/models/star.go
··· 1 1 package models 2 2 3 3 import ( 4 + "fmt" 4 5 "time" 5 6 6 7 "github.com/bluesky-social/indigo/atproto/syntax" ··· 19 20 Subject: s.RepoAt.String(), 20 21 CreatedAt: s.Created.Format(time.RFC3339), 21 22 } 23 + } 24 + 25 + func StarFromRecord(did syntax.DID, rkey syntax.RecordKey, record tangled.FeedStar) (Star, error) { 26 + subjectAt, err := syntax.ParseATURI(record.Subject) 27 + if err != nil { 28 + return Star{}, fmt.Errorf("subject should be valid at-uri: %w", err) 29 + } 30 + 31 + created, err := time.Parse(time.RFC3339, record.CreatedAt) 32 + if err != nil { 33 + return Star{}, fmt.Errorf("invalid time format '%s'", record.CreatedAt) 34 + } 35 + 36 + return Star{ 37 + Did: did.String(), 38 + Rkey: rkey.String(), 39 + RepoAt: subjectAt, 40 + Created: created, 41 + }, nil 22 42 } 23 43 24 44 // RepoStar is used for reverse mapping to repos
+1 -1
appview/repo/repo.go
··· 912 912 } 913 913 914 914 // remove repo from db 915 - err = db.RemoveRepo(tx, f.Did, f.Name) 915 + err = db.RemoveRepo(tx, syntax.DID(f.Did), syntax.RecordKey(f.Rkey)) 916 916 if err != nil { 917 917 rp.pages.Notice(w, noticeId, "Failed to update appview") 918 918 return