A vibe coded tangled fork which supports pijul.
1package db
2
3import (
4 "context"
5 "slices"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/appview/db"
10 "tangled.org/core/appview/models"
11 "tangled.org/core/appview/notify"
12 "tangled.org/core/idresolver"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15 "tangled.org/core/sets"
16)
17
18const (
19 maxMentions = 8
20)
21
22type databaseNotifier struct {
23 db *db.DB
24 res *idresolver.Resolver
25}
26
27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
28 return &databaseNotifier{
29 db: database,
30 res: resolver,
31 }
32}
33
34var _ notify.Notifier = &databaseNotifier{}
35
36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
37 // no-op for now
38}
39
40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
41 l := log.FromContext(ctx)
42
43 if star.RepoAt.Collection().String() != tangled.RepoNSID {
44 // skip string stars for now
45 return
46 }
47 var err error
48 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt)))
49 if err != nil {
50 l.Error("failed to get repos", "err", err)
51 return
52 }
53
54 actorDid := syntax.DID(star.Did)
55 recipients := sets.Singleton(syntax.DID(repo.Did))
56 eventType := models.NotificationTypeRepoStarred
57 entityType := "repo"
58 entityId := star.RepoAt.String()
59 repoId := &repo.Id
60 var issueId *int64
61 var pullId *int64
62
63 n.notifyEvent(
64 ctx,
65 actorDid,
66 recipients,
67 eventType,
68 entityType,
69 entityId,
70 repoId,
71 issueId,
72 pullId,
73 )
74}
75
76func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
77 // no-op
78}
79
80func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment) {
81 l := log.FromContext(ctx)
82
83 var (
84 // built the recipients list:
85 // - the owner of the repo
86 // - | if the comment is a reply -> everybody on that thread
87 // | if the comment is a top level -> just the issue owner
88 // - remove mentioned users from the recipients list
89 recipients = sets.New[syntax.DID]()
90 entityType string
91 entityId string
92 repoId *int64
93 issueId *int64
94 pullId *int64
95 )
96
97 subjectDid, err := comment.Subject.Authority().AsDID()
98 if err != nil {
99 l.Error("expected did based at-uri for comment.subject")
100 return
101 }
102 switch comment.Subject.Collection() {
103 case tangled.RepoIssueNSID:
104 issues, err := db.GetIssues(
105 n.db,
106 orm.FilterEq("did", subjectDid),
107 orm.FilterEq("rkey", comment.Subject.RecordKey()),
108 )
109 if err != nil {
110 l.Error("failed to get issues", "err", err)
111 return
112 }
113 if len(issues) == 0 {
114 l.Error("no issue found", "subject", comment.Subject)
115 return
116 }
117 issue := issues[0]
118
119 recipients.Insert(syntax.DID(issue.Repo.Did))
120 if comment.IsReply() {
121 // if this comment is a reply, then notify everybody in that thread
122 parentAtUri := *comment.ReplyTo
123
124 // find the parent thread, and add all DIDs from here to the recipient list
125 for _, t := range issue.CommentList() {
126 if t.Self.AtUri() == parentAtUri {
127 for _, p := range t.Participants() {
128 recipients.Insert(p)
129 }
130 }
131 }
132 } else {
133 // not a reply, notify just the issue author
134 recipients.Insert(syntax.DID(issue.Did))
135 }
136
137 entityType = "issue"
138 entityId = issue.AtUri().String()
139 repoId = &issue.Repo.Id
140 issueId = &issue.Id
141 case tangled.RepoPullNSID:
142 pulls, err := db.GetPulls(
143 n.db,
144 orm.FilterEq("owner_did", subjectDid),
145 orm.FilterEq("rkey", comment.Subject.RecordKey()),
146 )
147 if err != nil {
148 l.Error("NewComment: failed to get pulls", "err", err)
149 return
150 }
151 if len(pulls) == 0 {
152 l.Error("NewComment: no pull found", "aturi", comment.Subject)
153 return
154 }
155 pull := pulls[0]
156
157 pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt))
158 if err != nil {
159 l.Error("NewComment: failed to get repo", "err", err)
160 return
161 }
162
163 recipients.Insert(syntax.DID(pull.Repo.Did))
164 for _, p := range pull.Participants() {
165 recipients.Insert(syntax.DID(p))
166 }
167
168 entityType = "pull"
169 entityId = pull.AtUri().String()
170 repoId = &pull.Repo.Id
171 p := int64(pull.ID)
172 pullId = &p
173 default:
174 return // no-op
175 }
176
177 for _, m := range comment.Mentions {
178 recipients.Remove(m)
179 }
180
181 n.notifyEvent(
182 ctx,
183 comment.Did,
184 recipients,
185 models.NotificationTypeIssueCommented,
186 entityType,
187 entityId,
188 repoId,
189 issueId,
190 pullId,
191 )
192 n.notifyEvent(
193 ctx,
194 comment.Did,
195 sets.Collect(slices.Values(comment.Mentions)),
196 models.NotificationTypeUserMentioned,
197 entityType,
198 entityId,
199 repoId,
200 issueId,
201 pullId,
202 )
203}
204
205func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) {
206 // no-op
207}
208
209func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
210 l := log.FromContext(ctx)
211
212 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
213 if err != nil {
214 l.Error("failed to fetch collaborators", "err", err)
215 return
216 }
217
218 // build the recipients list
219 // - owner of the repo
220 // - collaborators in the repo
221 // - remove users already mentioned
222 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
223 for _, c := range collaborators {
224 recipients.Insert(c.SubjectDid)
225 }
226 for _, m := range mentions {
227 recipients.Remove(m)
228 }
229
230 actorDid := syntax.DID(issue.Did)
231 entityType := "issue"
232 entityId := issue.AtUri().String()
233 repoId := &issue.Repo.Id
234 issueId := &issue.Id
235 var pullId *int64
236
237 n.notifyEvent(
238 ctx,
239 actorDid,
240 recipients,
241 models.NotificationTypeIssueCreated,
242 entityType,
243 entityId,
244 repoId,
245 issueId,
246 pullId,
247 )
248 n.notifyEvent(
249 ctx,
250 actorDid,
251 sets.Collect(slices.Values(mentions)),
252 models.NotificationTypeUserMentioned,
253 entityType,
254 entityId,
255 repoId,
256 issueId,
257 pullId,
258 )
259}
260
261func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
262 // no-op for now
263}
264
265func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, issue *models.Issue) {}
266func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, pull *models.Pull) {}
267
268func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
269 actorDid := syntax.DID(follow.UserDid)
270 recipients := sets.Singleton(syntax.DID(follow.SubjectDid))
271 eventType := models.NotificationTypeFollowed
272 entityType := "follow"
273 entityId := follow.UserDid
274 var repoId, issueId, pullId *int64
275
276 n.notifyEvent(
277 ctx,
278 actorDid,
279 recipients,
280 eventType,
281 entityType,
282 entityId,
283 repoId,
284 issueId,
285 pullId,
286 )
287}
288
289func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
290 // no-op
291}
292
293func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
294 l := log.FromContext(ctx)
295
296 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
297 if err != nil {
298 l.Error("failed to get repos", "err", err)
299 return
300 }
301 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
302 if err != nil {
303 l.Error("failed to fetch collaborators", "err", err)
304 return
305 }
306
307 // build the recipients list
308 // - owner of the repo
309 // - collaborators in the repo
310 recipients := sets.Singleton(syntax.DID(repo.Did))
311 for _, c := range collaborators {
312 recipients.Insert(c.SubjectDid)
313 }
314
315 actorDid := syntax.DID(pull.OwnerDid)
316 eventType := models.NotificationTypePullCreated
317 entityType := "pull"
318 entityId := pull.AtUri().String()
319 repoId := &repo.Id
320 var issueId *int64
321 p := int64(pull.ID)
322 pullId := &p
323
324 n.notifyEvent(
325 ctx,
326 actorDid,
327 recipients,
328 eventType,
329 entityType,
330 entityId,
331 repoId,
332 issueId,
333 pullId,
334 )
335}
336
337func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
338 // no-op
339}
340
341func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
342 // no-op
343}
344
345func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
346 // no-op
347}
348
349func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
350 // no-op
351}
352
353func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) {
354 // no-op for now; webhooks are handled by the webhook notifier
355}
356
357func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
358 l := log.FromContext(ctx)
359
360 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
361 if err != nil {
362 l.Error("failed to fetch collaborators", "err", err)
363 return
364 }
365
366 // build up the recipients list:
367 // - repo owner
368 // - repo collaborators
369 // - all issue participants
370 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
371 for _, c := range collaborators {
372 recipients.Insert(c.SubjectDid)
373 }
374 for _, p := range issue.Participants() {
375 recipients.Insert(syntax.DID(p))
376 }
377
378 entityType := "issue"
379 entityId := issue.AtUri().String()
380 repoId := &issue.Repo.Id
381 issueId := &issue.Id
382 var pullId *int64
383 var eventType models.NotificationType
384
385 if issue.Open {
386 eventType = models.NotificationTypeIssueReopen
387 } else {
388 eventType = models.NotificationTypeIssueClosed
389 }
390
391 n.notifyEvent(
392 ctx,
393 actor,
394 recipients,
395 eventType,
396 entityType,
397 entityId,
398 repoId,
399 issueId,
400 pullId,
401 )
402}
403
404func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
405 l := log.FromContext(ctx)
406
407 // Get repo details
408 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
409 if err != nil {
410 l.Error("failed to get repos", "err", err)
411 return
412 }
413
414 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
415 if err != nil {
416 l.Error("failed to fetch collaborators", "err", err)
417 return
418 }
419
420 // build up the recipients list:
421 // - repo owner
422 // - all pull participants
423 recipients := sets.Singleton(syntax.DID(repo.Did))
424 for _, c := range collaborators {
425 recipients.Insert(c.SubjectDid)
426 }
427 for _, p := range pull.Participants() {
428 recipients.Insert(syntax.DID(p))
429 }
430
431 entityType := "pull"
432 entityId := pull.AtUri().String()
433 repoId := &repo.Id
434 var issueId *int64
435 var eventType models.NotificationType
436 switch pull.State {
437 case models.PullClosed:
438 eventType = models.NotificationTypePullClosed
439 case models.PullOpen:
440 eventType = models.NotificationTypePullReopen
441 case models.PullMerged:
442 eventType = models.NotificationTypePullMerged
443 default:
444 l.Error("unexpected new PR state", "state", pull.State)
445 return
446 }
447 p := int64(pull.ID)
448 pullId := &p
449
450 n.notifyEvent(
451 ctx,
452 actor,
453 recipients,
454 eventType,
455 entityType,
456 entityId,
457 repoId,
458 issueId,
459 pullId,
460 )
461}
462
463func (n *databaseNotifier) notifyEvent(
464 ctx context.Context,
465 actorDid syntax.DID,
466 recipients sets.Set[syntax.DID],
467 eventType models.NotificationType,
468 entityType string,
469 entityId string,
470 repoId *int64,
471 issueId *int64,
472 pullId *int64,
473) {
474 l := log.FromContext(ctx)
475
476 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
477 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
478 return
479 }
480
481 recipients.Remove(actorDid)
482
483 prefMap, err := db.GetNotificationPreferences(
484 n.db,
485 orm.FilterIn("user_did", slices.Collect(recipients.All())),
486 )
487 if err != nil {
488 // failed to get prefs for users
489 return
490 }
491
492 // create a transaction for bulk notification storage
493 tx, err := n.db.Begin()
494 if err != nil {
495 // failed to start tx
496 return
497 }
498 defer tx.Rollback()
499
500 // filter based on preferences
501 for recipientDid := range recipients.All() {
502 prefs, ok := prefMap[recipientDid]
503 if !ok {
504 prefs = models.DefaultNotificationPreferences(recipientDid)
505 }
506
507 // skip users who don’t want this type
508 if !prefs.ShouldNotify(eventType) {
509 continue
510 }
511
512 // create notification
513 notif := &models.Notification{
514 RecipientDid: recipientDid.String(),
515 ActorDid: actorDid.String(),
516 Type: eventType,
517 EntityType: entityType,
518 EntityId: entityId,
519 RepoId: repoId,
520 IssueId: issueId,
521 PullId: pullId,
522 }
523
524 if err := db.CreateNotification(tx, notif); err != nil {
525 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err)
526 }
527 }
528
529 if err := tx.Commit(); err != nil {
530 // failed to commit
531 return
532 }
533}