A vibe coded tangled fork which supports pijul.
at 5bf28708dcf8972c724fb0c33fcab1281cbc3f27 632 lines 16 kB view raw
1package db 2 3import ( 4 "database/sql" 5 "fmt" 6 "maps" 7 "slices" 8 "sort" 9 "strings" 10 "time" 11 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "tangled.org/core/appview/models" 14 "tangled.org/core/appview/pagination" 15 "tangled.org/core/orm" 16) 17 18// NewDiscussion creates a new discussion in a Pijul repository 19func NewDiscussion(tx *sql.Tx, discussion *models.Discussion) error { 20 // ensure sequence exists 21 _, err := tx.Exec(` 22 insert or ignore into repo_discussion_seqs (repo_at, next_discussion_id) 23 values (?, 1) 24 `, discussion.RepoAt) 25 if err != nil { 26 return err 27 } 28 29 // get next discussion_id 30 var newDiscussionId int 31 err = tx.QueryRow(` 32 update repo_discussion_seqs 33 set next_discussion_id = next_discussion_id + 1 34 where repo_at = ? 35 returning next_discussion_id - 1 36 `, discussion.RepoAt).Scan(&newDiscussionId) 37 if err != nil { 38 return err 39 } 40 41 // insert new discussion 42 row := tx.QueryRow(` 43 insert into discussions (repo_at, did, rkey, discussion_id, title, body, target_channel, state) 44 values (?, ?, ?, ?, ?, ?, ?, ?) 45 returning id, discussion_id 46 `, discussion.RepoAt, discussion.Did, discussion.Rkey, newDiscussionId, discussion.Title, discussion.Body, discussion.TargetChannel, discussion.State) 47 48 err = row.Scan(&discussion.Id, &discussion.DiscussionId) 49 if err != nil { 50 return fmt.Errorf("scan row: %w", err) 51 } 52 53 return nil 54} 55 56// GetDiscussionsPaginated returns discussions with pagination 57func GetDiscussionsPaginated(e Execer, page pagination.Page, filters ...orm.Filter) ([]models.Discussion, error) { 58 discussionMap := make(map[string]*models.Discussion) // at-uri -> discussion 59 60 var conditions []string 61 var args []any 62 63 for _, filter := range filters { 64 conditions = append(conditions, filter.Condition()) 65 args = append(args, filter.Arg()...) 66 } 67 68 whereClause := "" 69 if conditions != nil { 70 whereClause = " where " + strings.Join(conditions, " and ") 71 } 72 73 pLower := orm.FilterGte("row_num", page.Offset+1) 74 pUpper := orm.FilterLte("row_num", page.Offset+page.Limit) 75 76 pageClause := "" 77 if page.Limit > 0 { 78 args = append(args, pLower.Arg()...) 79 args = append(args, pUpper.Arg()...) 80 pageClause = " where " + pLower.Condition() + " and " + pUpper.Condition() 81 } 82 83 query := fmt.Sprintf( 84 ` 85 select * from ( 86 select 87 id, 88 did, 89 rkey, 90 repo_at, 91 discussion_id, 92 title, 93 body, 94 target_channel, 95 state, 96 created, 97 edited, 98 row_number() over (order by created desc) as row_num 99 from 100 discussions 101 %s 102 ) ranked_discussions 103 %s 104 `, 105 whereClause, 106 pageClause, 107 ) 108 109 rows, err := e.Query(query, args...) 110 if err != nil { 111 return nil, fmt.Errorf("failed to query discussions table: %w", err) 112 } 113 defer rows.Close() 114 115 for rows.Next() { 116 var discussion models.Discussion 117 var createdAt string 118 var editedAt sql.Null[string] 119 var rowNum int64 120 err := rows.Scan( 121 &discussion.Id, 122 &discussion.Did, 123 &discussion.Rkey, 124 &discussion.RepoAt, 125 &discussion.DiscussionId, 126 &discussion.Title, 127 &discussion.Body, 128 &discussion.TargetChannel, 129 &discussion.State, 130 &createdAt, 131 &editedAt, 132 &rowNum, 133 ) 134 if err != nil { 135 return nil, fmt.Errorf("failed to scan discussion: %w", err) 136 } 137 138 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 139 discussion.Created = t 140 } 141 142 if editedAt.Valid { 143 if t, err := time.Parse(time.RFC3339, editedAt.V); err == nil { 144 discussion.Edited = &t 145 } 146 } 147 148 atUri := discussion.AtUri().String() 149 discussionMap[atUri] = &discussion 150 } 151 152 // collect reverse repos 153 repoAts := make([]string, 0, len(discussionMap)) 154 for _, discussion := range discussionMap { 155 repoAts = append(repoAts, string(discussion.RepoAt)) 156 } 157 158 repos, err := GetRepos(e, 0, orm.FilterIn("at_uri", repoAts)) 159 if err != nil { 160 return nil, fmt.Errorf("failed to build repo mappings: %w", err) 161 } 162 163 repoMap := make(map[string]*models.Repo) 164 for i := range repos { 165 repoMap[string(repos[i].RepoAt())] = &repos[i] 166 } 167 168 for discussionAt, d := range discussionMap { 169 if r, ok := repoMap[string(d.RepoAt)]; ok { 170 d.Repo = r 171 } else { 172 // do not show up the discussion if the repo is deleted 173 delete(discussionMap, discussionAt) 174 } 175 } 176 177 // collect patches 178 discussionAts := slices.Collect(maps.Keys(discussionMap)) 179 180 patches, err := GetDiscussionPatches(e, orm.FilterIn("discussion_at", discussionAts)) 181 if err != nil { 182 return nil, fmt.Errorf("failed to query patches: %w", err) 183 } 184 for _, p := range patches { 185 discussionAt := p.DiscussionAt.String() 186 if discussion, ok := discussionMap[discussionAt]; ok { 187 discussion.Patches = append(discussion.Patches, p) 188 } 189 } 190 191 // collect comments 192 comments, err := GetDiscussionComments(e, orm.FilterIn("discussion_at", discussionAts)) 193 if err != nil { 194 return nil, fmt.Errorf("failed to query comments: %w", err) 195 } 196 for i := range comments { 197 discussionAt := comments[i].DiscussionAt 198 if discussion, ok := discussionMap[discussionAt]; ok { 199 discussion.Comments = append(discussion.Comments, comments[i]) 200 } 201 } 202 203 // collect labels for each discussion 204 allLabels, err := GetLabels(e, orm.FilterIn("subject", discussionAts)) 205 if err != nil { 206 return nil, fmt.Errorf("failed to query labels: %w", err) 207 } 208 for discussionAt, labels := range allLabels { 209 if discussion, ok := discussionMap[discussionAt.String()]; ok { 210 discussion.Labels = labels 211 } 212 } 213 214 var discussions []models.Discussion 215 for _, d := range discussionMap { 216 discussions = append(discussions, *d) 217 } 218 219 sort.Slice(discussions, func(i, j int) bool { 220 return discussions[i].Created.After(discussions[j].Created) 221 }) 222 223 return discussions, nil 224} 225 226// GetDiscussion returns a single discussion by repo and ID 227func GetDiscussion(e Execer, repoAt syntax.ATURI, discussionId int) (*models.Discussion, error) { 228 discussions, err := GetDiscussionsPaginated( 229 e, 230 pagination.Page{}, 231 orm.FilterEq("repo_at", repoAt), 232 orm.FilterEq("discussion_id", discussionId), 233 ) 234 if err != nil { 235 return nil, err 236 } 237 if len(discussions) != 1 { 238 return nil, sql.ErrNoRows 239 } 240 241 return &discussions[0], nil 242} 243 244// GetDiscussions returns discussions matching filters 245func GetDiscussions(e Execer, filters ...orm.Filter) ([]models.Discussion, error) { 246 return GetDiscussionsPaginated(e, pagination.Page{}, filters...) 247} 248 249// AddDiscussionPatch adds a patch to a discussion 250// Anyone can add patches - the key feature of the Nest model 251func AddDiscussionPatch(tx *sql.Tx, patch *models.DiscussionPatch) error { 252 row := tx.QueryRow(` 253 insert into discussion_patches (discussion_at, pushed_by_did, patch_hash, patch) 254 values (?, ?, ?, ?) 255 returning id 256 `, patch.DiscussionAt, patch.PushedByDid, patch.PatchHash, patch.Patch) 257 258 return row.Scan(&patch.Id) 259} 260 261// PatchExists checks if a patch with the given hash already exists in the discussion 262func PatchExists(e Execer, discussionAt syntax.ATURI, patchHash string) (bool, error) { 263 var count int 264 err := e.QueryRow(` 265 select count(1) from discussion_patches 266 where discussion_at = ? and patch_hash = ? 267 `, discussionAt, patchHash).Scan(&count) 268 if err != nil { 269 return false, err 270 } 271 return count > 0, nil 272} 273 274// RemovePatch marks a patch as removed (soft delete) 275func RemovePatch(e Execer, patchId int64) error { 276 _, err := e.Exec(` 277 update discussion_patches 278 set removed = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 279 where id = ? 280 `, patchId) 281 return err 282} 283 284// ReaddPatch re-adds a previously removed patch 285func ReaddPatch(e Execer, patchId int64) error { 286 _, err := e.Exec(` 287 update discussion_patches 288 set removed = null 289 where id = ? 290 `, patchId) 291 return err 292} 293 294// GetDiscussionPatches returns patches for discussions 295func GetDiscussionPatches(e Execer, filters ...orm.Filter) ([]*models.DiscussionPatch, error) { 296 var conditions []string 297 var args []any 298 for _, filter := range filters { 299 conditions = append(conditions, filter.Condition()) 300 args = append(args, filter.Arg()...) 301 } 302 303 whereClause := "" 304 if conditions != nil { 305 whereClause = " where " + strings.Join(conditions, " and ") 306 } 307 308 query := fmt.Sprintf(` 309 select 310 id, 311 discussion_at, 312 pushed_by_did, 313 patch_hash, 314 patch, 315 added, 316 removed 317 from 318 discussion_patches 319 %s 320 order by added asc 321 `, whereClause) 322 323 rows, err := e.Query(query, args...) 324 if err != nil { 325 return nil, err 326 } 327 defer rows.Close() 328 329 var patches []*models.DiscussionPatch 330 for rows.Next() { 331 var patch models.DiscussionPatch 332 var addedAt string 333 var removedAt sql.Null[string] 334 err := rows.Scan( 335 &patch.Id, 336 &patch.DiscussionAt, 337 &patch.PushedByDid, 338 &patch.PatchHash, 339 &patch.Patch, 340 &addedAt, 341 &removedAt, 342 ) 343 if err != nil { 344 return nil, err 345 } 346 347 if t, err := time.Parse(time.RFC3339, addedAt); err == nil { 348 patch.Added = t 349 } 350 351 if removedAt.Valid { 352 if t, err := time.Parse(time.RFC3339, removedAt.V); err == nil { 353 patch.Removed = &t 354 } 355 } 356 357 patches = append(patches, &patch) 358 } 359 360 if err = rows.Err(); err != nil { 361 return nil, err 362 } 363 364 return patches, nil 365} 366 367// GetDiscussionPatch returns a single patch by ID 368func GetDiscussionPatch(e Execer, patchId int64) (*models.DiscussionPatch, error) { 369 patches, err := GetDiscussionPatches(e, orm.FilterEq("id", patchId)) 370 if err != nil { 371 return nil, err 372 } 373 if len(patches) != 1 { 374 return nil, sql.ErrNoRows 375 } 376 return patches[0], nil 377} 378 379// AddDiscussionComment adds a comment to a discussion 380func AddDiscussionComment(tx *sql.Tx, c models.DiscussionComment) (int64, error) { 381 result, err := tx.Exec( 382 `insert into discussion_comments ( 383 did, 384 rkey, 385 discussion_at, 386 body, 387 reply_to, 388 created, 389 edited 390 ) 391 values (?, ?, ?, ?, ?, ?, null) 392 on conflict(did, rkey) do update set 393 discussion_at = excluded.discussion_at, 394 body = excluded.body, 395 edited = case 396 when 397 discussion_comments.discussion_at != excluded.discussion_at 398 or discussion_comments.body != excluded.body 399 or discussion_comments.reply_to != excluded.reply_to 400 then ? 401 else discussion_comments.edited 402 end`, 403 c.Did, 404 c.Rkey, 405 c.DiscussionAt, 406 c.Body, 407 c.ReplyTo, 408 c.Created.Format(time.RFC3339), 409 time.Now().Format(time.RFC3339), 410 ) 411 if err != nil { 412 return 0, err 413 } 414 415 id, err := result.LastInsertId() 416 if err != nil { 417 return 0, err 418 } 419 420 return id, nil 421} 422 423// GetDiscussionComments returns comments for discussions 424func GetDiscussionComments(e Execer, filters ...orm.Filter) ([]models.DiscussionComment, error) { 425 var conditions []string 426 var args []any 427 for _, filter := range filters { 428 conditions = append(conditions, filter.Condition()) 429 args = append(args, filter.Arg()...) 430 } 431 432 whereClause := "" 433 if conditions != nil { 434 whereClause = " where " + strings.Join(conditions, " and ") 435 } 436 437 query := fmt.Sprintf(` 438 select 439 id, 440 did, 441 rkey, 442 discussion_at, 443 reply_to, 444 body, 445 created, 446 edited, 447 deleted 448 from 449 discussion_comments 450 %s 451 order by created asc 452 `, whereClause) 453 454 rows, err := e.Query(query, args...) 455 if err != nil { 456 return nil, err 457 } 458 defer rows.Close() 459 460 var comments []models.DiscussionComment 461 for rows.Next() { 462 var comment models.DiscussionComment 463 var created string 464 var rkey, edited, deleted, replyTo sql.Null[string] 465 err := rows.Scan( 466 &comment.Id, 467 &comment.Did, 468 &rkey, 469 &comment.DiscussionAt, 470 &replyTo, 471 &comment.Body, 472 &created, 473 &edited, 474 &deleted, 475 ) 476 if err != nil { 477 return nil, err 478 } 479 480 if rkey.Valid { 481 comment.Rkey = rkey.V 482 } 483 484 if t, err := time.Parse(time.RFC3339, created); err == nil { 485 comment.Created = t 486 } 487 488 if edited.Valid { 489 if t, err := time.Parse(time.RFC3339, edited.V); err == nil { 490 comment.Edited = &t 491 } 492 } 493 494 if deleted.Valid { 495 if t, err := time.Parse(time.RFC3339, deleted.V); err == nil { 496 comment.Deleted = &t 497 } 498 } 499 500 if replyTo.Valid { 501 comment.ReplyTo = &replyTo.V 502 } 503 504 comments = append(comments, comment) 505 } 506 507 if err = rows.Err(); err != nil { 508 return nil, err 509 } 510 511 return comments, nil 512} 513 514// DeleteDiscussionComment soft-deletes a comment 515func DeleteDiscussionComment(e Execer, filters ...orm.Filter) error { 516 var conditions []string 517 var args []any 518 for _, filter := range filters { 519 conditions = append(conditions, filter.Condition()) 520 args = append(args, filter.Arg()...) 521 } 522 523 whereClause := "" 524 if conditions != nil { 525 whereClause = " where " + strings.Join(conditions, " and ") 526 } 527 528 query := fmt.Sprintf(`update discussion_comments set body = "", deleted = strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now') %s`, whereClause) 529 530 _, err := e.Exec(query, args...) 531 return err 532} 533 534// CloseDiscussion closes a discussion 535func CloseDiscussion(e Execer, repoAt syntax.ATURI, discussionId int) error { 536 _, err := e.Exec(` 537 update discussions set state = ? 538 where repo_at = ? and discussion_id = ? 539 `, models.DiscussionClosed, repoAt, discussionId) 540 return err 541} 542 543// ReopenDiscussion reopens a discussion 544func ReopenDiscussion(e Execer, repoAt syntax.ATURI, discussionId int) error { 545 _, err := e.Exec(` 546 update discussions set state = ? 547 where repo_at = ? and discussion_id = ? 548 `, models.DiscussionOpen, repoAt, discussionId) 549 return err 550} 551 552// MergeDiscussion marks a discussion as merged 553func MergeDiscussion(e Execer, repoAt syntax.ATURI, discussionId int) error { 554 _, err := e.Exec(` 555 update discussions set state = ? 556 where repo_at = ? and discussion_id = ? 557 `, models.DiscussionMerged, repoAt, discussionId) 558 return err 559} 560 561// SetDiscussionState sets the state of a discussion 562func SetDiscussionState(e Execer, repoAt syntax.ATURI, discussionId int, state models.DiscussionState) error { 563 _, err := e.Exec(` 564 update discussions set state = ? 565 where repo_at = ? and discussion_id = ? 566 `, state, repoAt, discussionId) 567 return err 568} 569 570// GetDiscussionCount returns counts of discussions by state 571func GetDiscussionCount(e Execer, repoAt syntax.ATURI) (models.DiscussionCount, error) { 572 row := e.QueryRow(` 573 select 574 count(case when state = ? then 1 end) as open_count, 575 count(case when state = ? then 1 end) as merged_count, 576 count(case when state = ? then 1 end) as closed_count 577 from discussions 578 where repo_at = ?`, 579 models.DiscussionOpen, 580 models.DiscussionMerged, 581 models.DiscussionClosed, 582 repoAt, 583 ) 584 585 var count models.DiscussionCount 586 if err := row.Scan(&count.Open, &count.Merged, &count.Closed); err != nil { 587 return models.DiscussionCount{}, err 588 } 589 590 return count, nil 591} 592 593// SubscribeToDiscussion adds a subscription for a user to a discussion 594func SubscribeToDiscussion(e Execer, discussionAt syntax.ATURI, subscriberDid string) error { 595 _, err := e.Exec(` 596 insert or ignore into discussion_subscriptions (discussion_at, subscriber_did) 597 values (?, ?) 598 `, discussionAt, subscriberDid) 599 return err 600} 601 602// UnsubscribeFromDiscussion removes a subscription 603func UnsubscribeFromDiscussion(e Execer, discussionAt syntax.ATURI, subscriberDid string) error { 604 _, err := e.Exec(` 605 delete from discussion_subscriptions 606 where discussion_at = ? and subscriber_did = ? 607 `, discussionAt, subscriberDid) 608 return err 609} 610 611// GetDiscussionSubscribers returns all subscribers for a discussion 612func GetDiscussionSubscribers(e Execer, discussionAt syntax.ATURI) ([]string, error) { 613 rows, err := e.Query(` 614 select subscriber_did from discussion_subscriptions 615 where discussion_at = ? 616 `, discussionAt) 617 if err != nil { 618 return nil, err 619 } 620 defer rows.Close() 621 622 var subscribers []string 623 for rows.Next() { 624 var did string 625 if err := rows.Scan(&did); err != nil { 626 return nil, err 627 } 628 subscribers = append(subscribers, did) 629 } 630 631 return subscribers, rows.Err() 632}