A vibe coded tangled fork which supports pijul.
1package db
2
3import (
4 "database/sql"
5 "fmt"
6 "maps"
7 "slices"
8 "sort"
9 "strings"
10 "time"
11
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "tangled.org/core/appview/models"
14 "tangled.org/core/appview/pagination"
15 "tangled.org/core/orm"
16)
17
18// NewDiscussion creates a new discussion in a Pijul repository
19func NewDiscussion(tx *sql.Tx, discussion *models.Discussion) error {
20 // ensure sequence exists
21 _, err := tx.Exec(`
22 insert or ignore into repo_discussion_seqs (repo_at, next_discussion_id)
23 values (?, 1)
24 `, discussion.RepoAt)
25 if err != nil {
26 return err
27 }
28
29 // get next discussion_id
30 var newDiscussionId int
31 err = tx.QueryRow(`
32 update repo_discussion_seqs
33 set next_discussion_id = next_discussion_id + 1
34 where repo_at = ?
35 returning next_discussion_id - 1
36 `, discussion.RepoAt).Scan(&newDiscussionId)
37 if err != nil {
38 return err
39 }
40
41 // insert new discussion
42 row := tx.QueryRow(`
43 insert into discussions (repo_at, did, rkey, discussion_id, title, body, target_channel, state)
44 values (?, ?, ?, ?, ?, ?, ?, ?)
45 returning id, discussion_id
46 `, discussion.RepoAt, discussion.Did, discussion.Rkey, newDiscussionId, discussion.Title, discussion.Body, discussion.TargetChannel, discussion.State)
47
48 err = row.Scan(&discussion.Id, &discussion.DiscussionId)
49 if err != nil {
50 return fmt.Errorf("scan row: %w", err)
51 }
52
53 return nil
54}
55
56// GetDiscussionsPaginated returns discussions with pagination
57func GetDiscussionsPaginated(e Execer, page pagination.Page, filters ...orm.Filter) ([]models.Discussion, error) {
58 discussionMap := make(map[string]*models.Discussion) // at-uri -> discussion
59
60 var conditions []string
61 var args []any
62
63 for _, filter := range filters {
64 conditions = append(conditions, filter.Condition())
65 args = append(args, filter.Arg()...)
66 }
67
68 whereClause := ""
69 if conditions != nil {
70 whereClause = " where " + strings.Join(conditions, " and ")
71 }
72
73 pLower := orm.FilterGte("row_num", page.Offset+1)
74 pUpper := orm.FilterLte("row_num", page.Offset+page.Limit)
75
76 pageClause := ""
77 if page.Limit > 0 {
78 args = append(args, pLower.Arg()...)
79 args = append(args, pUpper.Arg()...)
80 pageClause = " where " + pLower.Condition() + " and " + pUpper.Condition()
81 }
82
83 query := fmt.Sprintf(
84 `
85 select * from (
86 select
87 id,
88 did,
89 rkey,
90 repo_at,
91 discussion_id,
92 title,
93 body,
94 target_channel,
95 state,
96 created,
97 edited,
98 row_number() over (order by created desc) as row_num
99 from
100 discussions
101 %s
102 ) ranked_discussions
103 %s
104 `,
105 whereClause,
106 pageClause,
107 )
108
109 rows, err := e.Query(query, args...)
110 if err != nil {
111 return nil, fmt.Errorf("failed to query discussions table: %w", err)
112 }
113 defer rows.Close()
114
115 for rows.Next() {
116 var discussion models.Discussion
117 var createdAt string
118 var editedAt sql.Null[string]
119 var rowNum int64
120 err := rows.Scan(
121 &discussion.Id,
122 &discussion.Did,
123 &discussion.Rkey,
124 &discussion.RepoAt,
125 &discussion.DiscussionId,
126 &discussion.Title,
127 &discussion.Body,
128 &discussion.TargetChannel,
129 &discussion.State,
130 &createdAt,
131 &editedAt,
132 &rowNum,
133 )
134 if err != nil {
135 return nil, fmt.Errorf("failed to scan discussion: %w", err)
136 }
137
138 if t, err := time.Parse(time.RFC3339, createdAt); err == nil {
139 discussion.Created = t
140 }
141
142 if editedAt.Valid {
143 if t, err := time.Parse(time.RFC3339, editedAt.V); err == nil {
144 discussion.Edited = &t
145 }
146 }
147
148 atUri := discussion.AtUri().String()
149 discussionMap[atUri] = &discussion
150 }
151
152 // collect reverse repos
153 repoAts := make([]string, 0, len(discussionMap))
154 for _, discussion := range discussionMap {
155 repoAts = append(repoAts, string(discussion.RepoAt))
156 }
157
158 repos, err := GetRepos(e, 0, orm.FilterIn("at_uri", repoAts))
159 if err != nil {
160 return nil, fmt.Errorf("failed to build repo mappings: %w", err)
161 }
162
163 repoMap := make(map[string]*models.Repo)
164 for i := range repos {
165 repoMap[string(repos[i].RepoAt())] = &repos[i]
166 }
167
168 for discussionAt, d := range discussionMap {
169 if r, ok := repoMap[string(d.RepoAt)]; ok {
170 d.Repo = r
171 } else {
172 // do not show up the discussion if the repo is deleted
173 delete(discussionMap, discussionAt)
174 }
175 }
176
177 // collect patches
178 discussionAts := slices.Collect(maps.Keys(discussionMap))
179
180 patches, err := GetDiscussionPatches(e, orm.FilterIn("discussion_at", discussionAts))
181 if err != nil {
182 return nil, fmt.Errorf("failed to query patches: %w", err)
183 }
184 for _, p := range patches {
185 discussionAt := p.DiscussionAt.String()
186 if discussion, ok := discussionMap[discussionAt]; ok {
187 discussion.Patches = append(discussion.Patches, p)
188 }
189 }
190
191 // collect comments
192 comments, err := GetDiscussionComments(e, orm.FilterIn("discussion_at", discussionAts))
193 if err != nil {
194 return nil, fmt.Errorf("failed to query comments: %w", err)
195 }
196 for i := range comments {
197 discussionAt := comments[i].DiscussionAt
198 if discussion, ok := discussionMap[discussionAt]; ok {
199 discussion.Comments = append(discussion.Comments, comments[i])
200 }
201 }
202
203 // collect labels for each discussion
204 allLabels, err := GetLabels(e, orm.FilterIn("subject", discussionAts))
205 if err != nil {
206 return nil, fmt.Errorf("failed to query labels: %w", err)
207 }
208 for discussionAt, labels := range allLabels {
209 if discussion, ok := discussionMap[discussionAt.String()]; ok {
210 discussion.Labels = labels
211 }
212 }
213
214 var discussions []models.Discussion
215 for _, d := range discussionMap {
216 discussions = append(discussions, *d)
217 }
218
219 sort.Slice(discussions, func(i, j int) bool {
220 return discussions[i].Created.After(discussions[j].Created)
221 })
222
223 return discussions, nil
224}
225
226// GetDiscussion returns a single discussion by repo and ID
227func GetDiscussion(e Execer, repoAt syntax.ATURI, discussionId int) (*models.Discussion, error) {
228 discussions, err := GetDiscussionsPaginated(
229 e,
230 pagination.Page{},
231 orm.FilterEq("repo_at", repoAt),
232 orm.FilterEq("discussion_id", discussionId),
233 )
234 if err != nil {
235 return nil, err
236 }
237 if len(discussions) != 1 {
238 return nil, sql.ErrNoRows
239 }
240
241 return &discussions[0], nil
242}
243
244// GetDiscussions returns discussions matching filters
245func GetDiscussions(e Execer, filters ...orm.Filter) ([]models.Discussion, error) {
246 return GetDiscussionsPaginated(e, pagination.Page{}, filters...)
247}
248
249// AddDiscussionPatch adds a patch to a discussion
250// Anyone can add patches - the key feature of the Nest model
251func AddDiscussionPatch(tx *sql.Tx, patch *models.DiscussionPatch) error {
252 row := tx.QueryRow(`
253 insert into discussion_patches (discussion_at, pushed_by_did, patch_hash, patch)
254 values (?, ?, ?, ?)
255 returning id
256 `, patch.DiscussionAt, patch.PushedByDid, patch.PatchHash, patch.Patch)
257
258 return row.Scan(&patch.Id)
259}
260
261// PatchExists checks if a patch with the given hash already exists in the discussion
262func PatchExists(e Execer, discussionAt syntax.ATURI, patchHash string) (bool, error) {
263 var count int
264 err := e.QueryRow(`
265 select count(1) from discussion_patches
266 where discussion_at = ? and patch_hash = ?
267 `, discussionAt, patchHash).Scan(&count)
268 if err != nil {
269 return false, err
270 }
271 return count > 0, nil
272}
273
274// RemovePatch marks a patch as removed (soft delete)
275func RemovePatch(e Execer, patchId int64) error {
276 _, err := e.Exec(`
277 update discussion_patches
278 set removed = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
279 where id = ?
280 `, patchId)
281 return err
282}
283
284// ReaddPatch re-adds a previously removed patch
285func ReaddPatch(e Execer, patchId int64) error {
286 _, err := e.Exec(`
287 update discussion_patches
288 set removed = null
289 where id = ?
290 `, patchId)
291 return err
292}
293
294// GetDiscussionPatches returns patches for discussions
295func GetDiscussionPatches(e Execer, filters ...orm.Filter) ([]*models.DiscussionPatch, error) {
296 var conditions []string
297 var args []any
298 for _, filter := range filters {
299 conditions = append(conditions, filter.Condition())
300 args = append(args, filter.Arg()...)
301 }
302
303 whereClause := ""
304 if conditions != nil {
305 whereClause = " where " + strings.Join(conditions, " and ")
306 }
307
308 query := fmt.Sprintf(`
309 select
310 id,
311 discussion_at,
312 pushed_by_did,
313 patch_hash,
314 patch,
315 added,
316 removed
317 from
318 discussion_patches
319 %s
320 order by added asc
321 `, whereClause)
322
323 rows, err := e.Query(query, args...)
324 if err != nil {
325 return nil, err
326 }
327 defer rows.Close()
328
329 var patches []*models.DiscussionPatch
330 for rows.Next() {
331 var patch models.DiscussionPatch
332 var addedAt string
333 var removedAt sql.Null[string]
334 err := rows.Scan(
335 &patch.Id,
336 &patch.DiscussionAt,
337 &patch.PushedByDid,
338 &patch.PatchHash,
339 &patch.Patch,
340 &addedAt,
341 &removedAt,
342 )
343 if err != nil {
344 return nil, err
345 }
346
347 if t, err := time.Parse(time.RFC3339, addedAt); err == nil {
348 patch.Added = t
349 }
350
351 if removedAt.Valid {
352 if t, err := time.Parse(time.RFC3339, removedAt.V); err == nil {
353 patch.Removed = &t
354 }
355 }
356
357 patches = append(patches, &patch)
358 }
359
360 if err = rows.Err(); err != nil {
361 return nil, err
362 }
363
364 return patches, nil
365}
366
367// GetDiscussionPatch returns a single patch by ID
368func GetDiscussionPatch(e Execer, patchId int64) (*models.DiscussionPatch, error) {
369 patches, err := GetDiscussionPatches(e, orm.FilterEq("id", patchId))
370 if err != nil {
371 return nil, err
372 }
373 if len(patches) != 1 {
374 return nil, sql.ErrNoRows
375 }
376 return patches[0], nil
377}
378
379// AddDiscussionComment adds a comment to a discussion
380func AddDiscussionComment(tx *sql.Tx, c models.DiscussionComment) (int64, error) {
381 result, err := tx.Exec(
382 `insert into discussion_comments (
383 did,
384 rkey,
385 discussion_at,
386 body,
387 reply_to,
388 created,
389 edited
390 )
391 values (?, ?, ?, ?, ?, ?, null)
392 on conflict(did, rkey) do update set
393 discussion_at = excluded.discussion_at,
394 body = excluded.body,
395 edited = case
396 when
397 discussion_comments.discussion_at != excluded.discussion_at
398 or discussion_comments.body != excluded.body
399 or discussion_comments.reply_to != excluded.reply_to
400 then ?
401 else discussion_comments.edited
402 end`,
403 c.Did,
404 c.Rkey,
405 c.DiscussionAt,
406 c.Body,
407 c.ReplyTo,
408 c.Created.Format(time.RFC3339),
409 time.Now().Format(time.RFC3339),
410 )
411 if err != nil {
412 return 0, err
413 }
414
415 id, err := result.LastInsertId()
416 if err != nil {
417 return 0, err
418 }
419
420 return id, nil
421}
422
423// GetDiscussionComments returns comments for discussions
424func GetDiscussionComments(e Execer, filters ...orm.Filter) ([]models.DiscussionComment, error) {
425 var conditions []string
426 var args []any
427 for _, filter := range filters {
428 conditions = append(conditions, filter.Condition())
429 args = append(args, filter.Arg()...)
430 }
431
432 whereClause := ""
433 if conditions != nil {
434 whereClause = " where " + strings.Join(conditions, " and ")
435 }
436
437 query := fmt.Sprintf(`
438 select
439 id,
440 did,
441 rkey,
442 discussion_at,
443 reply_to,
444 body,
445 created,
446 edited,
447 deleted
448 from
449 discussion_comments
450 %s
451 order by created asc
452 `, whereClause)
453
454 rows, err := e.Query(query, args...)
455 if err != nil {
456 return nil, err
457 }
458 defer rows.Close()
459
460 var comments []models.DiscussionComment
461 for rows.Next() {
462 var comment models.DiscussionComment
463 var created string
464 var rkey, edited, deleted, replyTo sql.Null[string]
465 err := rows.Scan(
466 &comment.Id,
467 &comment.Did,
468 &rkey,
469 &comment.DiscussionAt,
470 &replyTo,
471 &comment.Body,
472 &created,
473 &edited,
474 &deleted,
475 )
476 if err != nil {
477 return nil, err
478 }
479
480 if rkey.Valid {
481 comment.Rkey = rkey.V
482 }
483
484 if t, err := time.Parse(time.RFC3339, created); err == nil {
485 comment.Created = t
486 }
487
488 if edited.Valid {
489 if t, err := time.Parse(time.RFC3339, edited.V); err == nil {
490 comment.Edited = &t
491 }
492 }
493
494 if deleted.Valid {
495 if t, err := time.Parse(time.RFC3339, deleted.V); err == nil {
496 comment.Deleted = &t
497 }
498 }
499
500 if replyTo.Valid {
501 comment.ReplyTo = &replyTo.V
502 }
503
504 comments = append(comments, comment)
505 }
506
507 if err = rows.Err(); err != nil {
508 return nil, err
509 }
510
511 return comments, nil
512}
513
514// DeleteDiscussionComment soft-deletes a comment
515func DeleteDiscussionComment(e Execer, filters ...orm.Filter) error {
516 var conditions []string
517 var args []any
518 for _, filter := range filters {
519 conditions = append(conditions, filter.Condition())
520 args = append(args, filter.Arg()...)
521 }
522
523 whereClause := ""
524 if conditions != nil {
525 whereClause = " where " + strings.Join(conditions, " and ")
526 }
527
528 query := fmt.Sprintf(`update discussion_comments set body = "", deleted = strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now') %s`, whereClause)
529
530 _, err := e.Exec(query, args...)
531 return err
532}
533
534// CloseDiscussion closes a discussion
535func CloseDiscussion(e Execer, repoAt syntax.ATURI, discussionId int) error {
536 _, err := e.Exec(`
537 update discussions set state = ?
538 where repo_at = ? and discussion_id = ?
539 `, models.DiscussionClosed, repoAt, discussionId)
540 return err
541}
542
543// ReopenDiscussion reopens a discussion
544func ReopenDiscussion(e Execer, repoAt syntax.ATURI, discussionId int) error {
545 _, err := e.Exec(`
546 update discussions set state = ?
547 where repo_at = ? and discussion_id = ?
548 `, models.DiscussionOpen, repoAt, discussionId)
549 return err
550}
551
552// MergeDiscussion marks a discussion as merged
553func MergeDiscussion(e Execer, repoAt syntax.ATURI, discussionId int) error {
554 _, err := e.Exec(`
555 update discussions set state = ?
556 where repo_at = ? and discussion_id = ?
557 `, models.DiscussionMerged, repoAt, discussionId)
558 return err
559}
560
561// SetDiscussionState sets the state of a discussion
562func SetDiscussionState(e Execer, repoAt syntax.ATURI, discussionId int, state models.DiscussionState) error {
563 _, err := e.Exec(`
564 update discussions set state = ?
565 where repo_at = ? and discussion_id = ?
566 `, state, repoAt, discussionId)
567 return err
568}
569
570// GetDiscussionCount returns counts of discussions by state
571func GetDiscussionCount(e Execer, repoAt syntax.ATURI) (models.DiscussionCount, error) {
572 row := e.QueryRow(`
573 select
574 count(case when state = ? then 1 end) as open_count,
575 count(case when state = ? then 1 end) as merged_count,
576 count(case when state = ? then 1 end) as closed_count
577 from discussions
578 where repo_at = ?`,
579 models.DiscussionOpen,
580 models.DiscussionMerged,
581 models.DiscussionClosed,
582 repoAt,
583 )
584
585 var count models.DiscussionCount
586 if err := row.Scan(&count.Open, &count.Merged, &count.Closed); err != nil {
587 return models.DiscussionCount{}, err
588 }
589
590 return count, nil
591}
592
593// SubscribeToDiscussion adds a subscription for a user to a discussion
594func SubscribeToDiscussion(e Execer, discussionAt syntax.ATURI, subscriberDid string) error {
595 _, err := e.Exec(`
596 insert or ignore into discussion_subscriptions (discussion_at, subscriber_did)
597 values (?, ?)
598 `, discussionAt, subscriberDid)
599 return err
600}
601
602// UnsubscribeFromDiscussion removes a subscription
603func UnsubscribeFromDiscussion(e Execer, discussionAt syntax.ATURI, subscriberDid string) error {
604 _, err := e.Exec(`
605 delete from discussion_subscriptions
606 where discussion_at = ? and subscriber_did = ?
607 `, discussionAt, subscriberDid)
608 return err
609}
610
611// GetDiscussionSubscribers returns all subscribers for a discussion
612func GetDiscussionSubscribers(e Execer, discussionAt syntax.ATURI) ([]string, error) {
613 rows, err := e.Query(`
614 select subscriber_did from discussion_subscriptions
615 where discussion_at = ?
616 `, discussionAt)
617 if err != nil {
618 return nil, err
619 }
620 defer rows.Close()
621
622 var subscribers []string
623 for rows.Next() {
624 var did string
625 if err := rows.Scan(&did); err != nil {
626 return nil, err
627 }
628 subscribers = append(subscribers, did)
629 }
630
631 return subscribers, rows.Err()
632}