A vibe coded tangled fork which supports pijul.
1package db
2
3import (
4 "context"
5 "slices"
6
7 "github.com/bluesky-social/indigo/atproto/syntax"
8 "tangled.org/core/api/tangled"
9 "tangled.org/core/appview/db"
10 "tangled.org/core/appview/models"
11 "tangled.org/core/appview/notify"
12 "tangled.org/core/idresolver"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15 "tangled.org/core/sets"
16)
17
18const (
19 maxMentions = 8
20)
21
22type databaseNotifier struct {
23 db *db.DB
24 res *idresolver.Resolver
25}
26
27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier {
28 return &databaseNotifier{
29 db: database,
30 res: resolver,
31 }
32}
33
34var _ notify.Notifier = &databaseNotifier{}
35
36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) {
37 // no-op for now
38}
39
40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) {
41 l := log.FromContext(ctx)
42
43 if star.RepoAt.Collection().String() != tangled.RepoNSID {
44 // skip string stars for now
45 return
46 }
47 var err error
48 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt)))
49 if err != nil {
50 l.Error("failed to get repos", "err", err)
51 return
52 }
53
54 actorDid := syntax.DID(star.Did)
55 recipients := sets.Singleton(syntax.DID(repo.Did))
56 eventType := models.NotificationTypeRepoStarred
57 entityType := "repo"
58 entityId := star.RepoAt.String()
59 repoId := &repo.Id
60 var issueId *int64
61 var pullId *int64
62
63 n.notifyEvent(
64 ctx,
65 actorDid,
66 recipients,
67 eventType,
68 entityType,
69 entityId,
70 repoId,
71 issueId,
72 pullId,
73 )
74}
75
76func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) {
77 // no-op
78}
79
80func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) {
81 l := log.FromContext(ctx)
82
83 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
84 if err != nil {
85 l.Error("failed to fetch collaborators", "err", err)
86 return
87 }
88
89 // build the recipients list
90 // - owner of the repo
91 // - collaborators in the repo
92 // - remove users already mentioned
93 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
94 for _, c := range collaborators {
95 recipients.Insert(c.SubjectDid)
96 }
97 for _, m := range mentions {
98 recipients.Remove(m)
99 }
100
101 actorDid := syntax.DID(issue.Did)
102 entityType := "issue"
103 entityId := issue.AtUri().String()
104 repoId := &issue.Repo.Id
105 issueId := &issue.Id
106 var pullId *int64
107
108 n.notifyEvent(
109 ctx,
110 actorDid,
111 recipients,
112 models.NotificationTypeIssueCreated,
113 entityType,
114 entityId,
115 repoId,
116 issueId,
117 pullId,
118 )
119 n.notifyEvent(
120 ctx,
121 actorDid,
122 sets.Collect(slices.Values(mentions)),
123 models.NotificationTypeUserMentioned,
124 entityType,
125 entityId,
126 repoId,
127 issueId,
128 pullId,
129 )
130}
131
132func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) {
133 l := log.FromContext(ctx)
134
135 issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.Subject))
136 if err != nil {
137 l.Error("failed to get issues", "err", err)
138 return
139 }
140 if len(issues) == 0 {
141 l.Error("no issue found for", "err", comment.Subject)
142 return
143 }
144 issue := issues[0]
145
146 // built the recipients list:
147 // - the owner of the repo
148 // - | if the comment is a reply -> everybody on that thread
149 // | if the comment is a top level -> just the issue owner
150 // - remove mentioned users from the recipients list
151 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
152
153 if comment.IsReply() {
154 // if this comment is a reply, then notify everybody in that thread
155 parentAtUri := *comment.ReplyTo
156
157 // find the parent thread, and add all DIDs from here to the recipient list
158 for _, t := range issue.CommentList() {
159 if t.Self.AtUri() == parentAtUri {
160 for _, p := range t.Participants() {
161 recipients.Insert(p)
162 }
163 }
164 }
165 } else {
166 // not a reply, notify just the issue author
167 recipients.Insert(syntax.DID(issue.Did))
168 }
169
170 for _, m := range mentions {
171 recipients.Remove(m)
172 }
173
174 actorDid := syntax.DID(comment.Did)
175 entityType := "issue"
176 entityId := issue.AtUri().String()
177 repoId := &issue.Repo.Id
178 issueId := &issue.Id
179 var pullId *int64
180
181 n.notifyEvent(
182 ctx,
183 actorDid,
184 recipients,
185 models.NotificationTypeIssueCommented,
186 entityType,
187 entityId,
188 repoId,
189 issueId,
190 pullId,
191 )
192 n.notifyEvent(
193 ctx,
194 actorDid,
195 sets.Collect(slices.Values(mentions)),
196 models.NotificationTypeUserMentioned,
197 entityType,
198 entityId,
199 repoId,
200 issueId,
201 pullId,
202 )
203}
204
205func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) {
206 // no-op for now
207}
208
209func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, issue *models.Issue) {}
210func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, pull *models.Pull) {}
211
212func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) {
213 actorDid := syntax.DID(follow.UserDid)
214 recipients := sets.Singleton(syntax.DID(follow.SubjectDid))
215 eventType := models.NotificationTypeFollowed
216 entityType := "follow"
217 entityId := follow.UserDid
218 var repoId, issueId, pullId *int64
219
220 n.notifyEvent(
221 ctx,
222 actorDid,
223 recipients,
224 eventType,
225 entityType,
226 entityId,
227 repoId,
228 issueId,
229 pullId,
230 )
231}
232
233func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {
234 // no-op
235}
236
237func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {
238 l := log.FromContext(ctx)
239
240 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
241 if err != nil {
242 l.Error("failed to get repos", "err", err)
243 return
244 }
245 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
246 if err != nil {
247 l.Error("failed to fetch collaborators", "err", err)
248 return
249 }
250
251 // build the recipients list
252 // - owner of the repo
253 // - collaborators in the repo
254 recipients := sets.Singleton(syntax.DID(repo.Did))
255 for _, c := range collaborators {
256 recipients.Insert(c.SubjectDid)
257 }
258
259 actorDid := syntax.DID(pull.OwnerDid)
260 eventType := models.NotificationTypePullCreated
261 entityType := "pull"
262 entityId := pull.AtUri().String()
263 repoId := &repo.Id
264 var issueId *int64
265 p := int64(pull.ID)
266 pullId := &p
267
268 n.notifyEvent(
269 ctx,
270 actorDid,
271 recipients,
272 eventType,
273 entityType,
274 entityId,
275 repoId,
276 issueId,
277 pullId,
278 )
279}
280
281func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) {
282 l := log.FromContext(ctx)
283
284 pulls, err := db.GetPulls(n.db,
285 orm.FilterEq("owner_did", comment.Subject.Authority()),
286 orm.FilterEq("rkey", comment.Subject.RecordKey()),
287 )
288 if err != nil {
289 l.Error("failed to get pull", "err", err)
290 return
291 }
292 if len(pulls) == 0 {
293 l.Error("NewPullComment: no pull found", "aturi", comment.Subject)
294 return
295 }
296 pull := pulls[0]
297
298 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt))
299 if err != nil {
300 l.Error("failed to get repos", "err", err)
301 return
302 }
303
304 // build up the recipients list:
305 // - repo owner
306 // - all pull participants
307 // - remove those already mentioned
308 recipients := sets.Singleton(syntax.DID(repo.Did))
309 for _, p := range pull.Participants() {
310 recipients.Insert(syntax.DID(p))
311 }
312 for _, m := range mentions {
313 recipients.Remove(m)
314 }
315
316 actorDid := comment.Did
317 eventType := models.NotificationTypePullCommented
318 entityType := "pull"
319 entityId := pull.AtUri().String()
320 repoId := &repo.Id
321 var issueId *int64
322 p := int64(pull.ID)
323 pullId := &p
324
325 n.notifyEvent(
326 ctx,
327 actorDid,
328 recipients,
329 eventType,
330 entityType,
331 entityId,
332 repoId,
333 issueId,
334 pullId,
335 )
336 n.notifyEvent(
337 ctx,
338 actorDid,
339 sets.Collect(slices.Values(mentions)),
340 models.NotificationTypeUserMentioned,
341 entityType,
342 entityId,
343 repoId,
344 issueId,
345 pullId,
346 )
347}
348
349func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) {
350 // no-op
351}
352
353func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) {
354 // no-op
355}
356
357func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) {
358 // no-op
359}
360
361func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) {
362 // no-op
363}
364
365func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) {
366 // no-op for now; webhooks are handled by the webhook notifier
367}
368
369func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
370 l := log.FromContext(ctx)
371
372 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt()))
373 if err != nil {
374 l.Error("failed to fetch collaborators", "err", err)
375 return
376 }
377
378 // build up the recipients list:
379 // - repo owner
380 // - repo collaborators
381 // - all issue participants
382 recipients := sets.Singleton(syntax.DID(issue.Repo.Did))
383 for _, c := range collaborators {
384 recipients.Insert(c.SubjectDid)
385 }
386 for _, p := range issue.Participants() {
387 recipients.Insert(syntax.DID(p))
388 }
389
390 entityType := "issue"
391 entityId := issue.AtUri().String()
392 repoId := &issue.Repo.Id
393 issueId := &issue.Id
394 var pullId *int64
395 var eventType models.NotificationType
396
397 if issue.Open {
398 eventType = models.NotificationTypeIssueReopen
399 } else {
400 eventType = models.NotificationTypeIssueClosed
401 }
402
403 n.notifyEvent(
404 ctx,
405 actor,
406 recipients,
407 eventType,
408 entityType,
409 entityId,
410 repoId,
411 issueId,
412 pullId,
413 )
414}
415
416func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
417 l := log.FromContext(ctx)
418
419 // Get repo details
420 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt)))
421 if err != nil {
422 l.Error("failed to get repos", "err", err)
423 return
424 }
425
426 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt()))
427 if err != nil {
428 l.Error("failed to fetch collaborators", "err", err)
429 return
430 }
431
432 // build up the recipients list:
433 // - repo owner
434 // - all pull participants
435 recipients := sets.Singleton(syntax.DID(repo.Did))
436 for _, c := range collaborators {
437 recipients.Insert(c.SubjectDid)
438 }
439 for _, p := range pull.Participants() {
440 recipients.Insert(syntax.DID(p))
441 }
442
443 entityType := "pull"
444 entityId := pull.AtUri().String()
445 repoId := &repo.Id
446 var issueId *int64
447 var eventType models.NotificationType
448 switch pull.State {
449 case models.PullClosed:
450 eventType = models.NotificationTypePullClosed
451 case models.PullOpen:
452 eventType = models.NotificationTypePullReopen
453 case models.PullMerged:
454 eventType = models.NotificationTypePullMerged
455 default:
456 l.Error("unexpected new PR state", "state", pull.State)
457 return
458 }
459 p := int64(pull.ID)
460 pullId := &p
461
462 n.notifyEvent(
463 ctx,
464 actor,
465 recipients,
466 eventType,
467 entityType,
468 entityId,
469 repoId,
470 issueId,
471 pullId,
472 )
473}
474
475func (n *databaseNotifier) notifyEvent(
476 ctx context.Context,
477 actorDid syntax.DID,
478 recipients sets.Set[syntax.DID],
479 eventType models.NotificationType,
480 entityType string,
481 entityId string,
482 repoId *int64,
483 issueId *int64,
484 pullId *int64,
485) {
486 l := log.FromContext(ctx)
487
488 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody
489 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions {
490 return
491 }
492
493 recipients.Remove(actorDid)
494
495 prefMap, err := db.GetNotificationPreferences(
496 n.db,
497 orm.FilterIn("user_did", slices.Collect(recipients.All())),
498 )
499 if err != nil {
500 // failed to get prefs for users
501 return
502 }
503
504 // create a transaction for bulk notification storage
505 tx, err := n.db.Begin()
506 if err != nil {
507 // failed to start tx
508 return
509 }
510 defer tx.Rollback()
511
512 // filter based on preferences
513 for recipientDid := range recipients.All() {
514 prefs, ok := prefMap[recipientDid]
515 if !ok {
516 prefs = models.DefaultNotificationPreferences(recipientDid)
517 }
518
519 // skip users who don’t want this type
520 if !prefs.ShouldNotify(eventType) {
521 continue
522 }
523
524 // create notification
525 notif := &models.Notification{
526 RecipientDid: recipientDid.String(),
527 ActorDid: actorDid.String(),
528 Type: eventType,
529 EntityType: entityType,
530 EntityId: entityId,
531 RepoId: repoId,
532 IssueId: issueId,
533 PullId: pullId,
534 }
535
536 if err := db.CreateNotification(tx, notif); err != nil {
537 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err)
538 }
539 }
540
541 if err := tx.Commit(); err != nil {
542 // failed to commit
543 return
544 }
545}