A vibe coded tangled fork which supports pijul.
at 309533109d6d109a9ef9661eb9583908c61115a2 545 lines 12 kB view raw
1package db 2 3import ( 4 "context" 5 "slices" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 "tangled.org/core/api/tangled" 9 "tangled.org/core/appview/db" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/appview/notify" 12 "tangled.org/core/idresolver" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15 "tangled.org/core/sets" 16) 17 18const ( 19 maxMentions = 8 20) 21 22type databaseNotifier struct { 23 db *db.DB 24 res *idresolver.Resolver 25} 26 27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 28 return &databaseNotifier{ 29 db: database, 30 res: resolver, 31 } 32} 33 34var _ notify.Notifier = &databaseNotifier{} 35 36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 37 // no-op for now 38} 39 40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 41 l := log.FromContext(ctx) 42 43 if star.RepoAt.Collection().String() != tangled.RepoNSID { 44 // skip string stars for now 45 return 46 } 47 var err error 48 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt))) 49 if err != nil { 50 l.Error("failed to get repos", "err", err) 51 return 52 } 53 54 actorDid := syntax.DID(star.Did) 55 recipients := sets.Singleton(syntax.DID(repo.Did)) 56 eventType := models.NotificationTypeRepoStarred 57 entityType := "repo" 58 entityId := star.RepoAt.String() 59 repoId := &repo.Id 60 var issueId *int64 61 var pullId *int64 62 63 n.notifyEvent( 64 ctx, 65 actorDid, 66 recipients, 67 eventType, 68 entityType, 69 entityId, 70 repoId, 71 issueId, 72 pullId, 73 ) 74} 75 76func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 77 // no-op 78} 79 80func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 81 l := log.FromContext(ctx) 82 83 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 84 if err != nil { 85 l.Error("failed to fetch collaborators", "err", err) 86 return 87 } 88 89 // build the recipients list 90 // - owner of the repo 91 // - collaborators in the repo 92 // - remove users already mentioned 93 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 94 for _, c := range collaborators { 95 recipients.Insert(c.SubjectDid) 96 } 97 for _, m := range mentions { 98 recipients.Remove(m) 99 } 100 101 actorDid := syntax.DID(issue.Did) 102 entityType := "issue" 103 entityId := issue.AtUri().String() 104 repoId := &issue.Repo.Id 105 issueId := &issue.Id 106 var pullId *int64 107 108 n.notifyEvent( 109 ctx, 110 actorDid, 111 recipients, 112 models.NotificationTypeIssueCreated, 113 entityType, 114 entityId, 115 repoId, 116 issueId, 117 pullId, 118 ) 119 n.notifyEvent( 120 ctx, 121 actorDid, 122 sets.Collect(slices.Values(mentions)), 123 models.NotificationTypeUserMentioned, 124 entityType, 125 entityId, 126 repoId, 127 issueId, 128 pullId, 129 ) 130} 131 132func (n *databaseNotifier) NewIssueComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 133 l := log.FromContext(ctx) 134 135 issues, err := db.GetIssues(n.db, orm.FilterEq("at_uri", comment.Subject)) 136 if err != nil { 137 l.Error("failed to get issues", "err", err) 138 return 139 } 140 if len(issues) == 0 { 141 l.Error("no issue found for", "err", comment.Subject) 142 return 143 } 144 issue := issues[0] 145 146 // built the recipients list: 147 // - the owner of the repo 148 // - | if the comment is a reply -> everybody on that thread 149 // | if the comment is a top level -> just the issue owner 150 // - remove mentioned users from the recipients list 151 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 152 153 if comment.IsReply() { 154 // if this comment is a reply, then notify everybody in that thread 155 parentAtUri := *comment.ReplyTo 156 157 // find the parent thread, and add all DIDs from here to the recipient list 158 for _, t := range issue.CommentList() { 159 if t.Self.AtUri() == parentAtUri { 160 for _, p := range t.Participants() { 161 recipients.Insert(p) 162 } 163 } 164 } 165 } else { 166 // not a reply, notify just the issue author 167 recipients.Insert(syntax.DID(issue.Did)) 168 } 169 170 for _, m := range mentions { 171 recipients.Remove(m) 172 } 173 174 actorDid := syntax.DID(comment.Did) 175 entityType := "issue" 176 entityId := issue.AtUri().String() 177 repoId := &issue.Repo.Id 178 issueId := &issue.Id 179 var pullId *int64 180 181 n.notifyEvent( 182 ctx, 183 actorDid, 184 recipients, 185 models.NotificationTypeIssueCommented, 186 entityType, 187 entityId, 188 repoId, 189 issueId, 190 pullId, 191 ) 192 n.notifyEvent( 193 ctx, 194 actorDid, 195 sets.Collect(slices.Values(mentions)), 196 models.NotificationTypeUserMentioned, 197 entityType, 198 entityId, 199 repoId, 200 issueId, 201 pullId, 202 ) 203} 204 205func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 206 // no-op for now 207} 208 209func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, issue *models.Issue) {} 210func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, pull *models.Pull) {} 211 212func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 213 actorDid := syntax.DID(follow.UserDid) 214 recipients := sets.Singleton(syntax.DID(follow.SubjectDid)) 215 eventType := models.NotificationTypeFollowed 216 entityType := "follow" 217 entityId := follow.UserDid 218 var repoId, issueId, pullId *int64 219 220 n.notifyEvent( 221 ctx, 222 actorDid, 223 recipients, 224 eventType, 225 entityType, 226 entityId, 227 repoId, 228 issueId, 229 pullId, 230 ) 231} 232 233func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 234 // no-op 235} 236 237func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 238 l := log.FromContext(ctx) 239 240 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 241 if err != nil { 242 l.Error("failed to get repos", "err", err) 243 return 244 } 245 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 246 if err != nil { 247 l.Error("failed to fetch collaborators", "err", err) 248 return 249 } 250 251 // build the recipients list 252 // - owner of the repo 253 // - collaborators in the repo 254 recipients := sets.Singleton(syntax.DID(repo.Did)) 255 for _, c := range collaborators { 256 recipients.Insert(c.SubjectDid) 257 } 258 259 actorDid := syntax.DID(pull.OwnerDid) 260 eventType := models.NotificationTypePullCreated 261 entityType := "pull" 262 entityId := pull.AtUri().String() 263 repoId := &repo.Id 264 var issueId *int64 265 p := int64(pull.ID) 266 pullId := &p 267 268 n.notifyEvent( 269 ctx, 270 actorDid, 271 recipients, 272 eventType, 273 entityType, 274 entityId, 275 repoId, 276 issueId, 277 pullId, 278 ) 279} 280 281func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 282 l := log.FromContext(ctx) 283 284 pulls, err := db.GetPulls(n.db, 285 orm.FilterEq("owner_did", comment.Subject.Authority()), 286 orm.FilterEq("rkey", comment.Subject.RecordKey()), 287 ) 288 if err != nil { 289 l.Error("failed to get pull", "err", err) 290 return 291 } 292 if len(pulls) == 0 { 293 l.Error("NewPullComment: no pull found", "aturi", comment.Subject) 294 return 295 } 296 pull := pulls[0] 297 298 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt)) 299 if err != nil { 300 l.Error("failed to get repos", "err", err) 301 return 302 } 303 304 // build up the recipients list: 305 // - repo owner 306 // - all pull participants 307 // - remove those already mentioned 308 recipients := sets.Singleton(syntax.DID(repo.Did)) 309 for _, p := range pull.Participants() { 310 recipients.Insert(syntax.DID(p)) 311 } 312 for _, m := range mentions { 313 recipients.Remove(m) 314 } 315 316 actorDid := comment.Did 317 eventType := models.NotificationTypePullCommented 318 entityType := "pull" 319 entityId := pull.AtUri().String() 320 repoId := &repo.Id 321 var issueId *int64 322 p := int64(pull.ID) 323 pullId := &p 324 325 n.notifyEvent( 326 ctx, 327 actorDid, 328 recipients, 329 eventType, 330 entityType, 331 entityId, 332 repoId, 333 issueId, 334 pullId, 335 ) 336 n.notifyEvent( 337 ctx, 338 actorDid, 339 sets.Collect(slices.Values(mentions)), 340 models.NotificationTypeUserMentioned, 341 entityType, 342 entityId, 343 repoId, 344 issueId, 345 pullId, 346 ) 347} 348 349func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 350 // no-op 351} 352 353func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 354 // no-op 355} 356 357func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 358 // no-op 359} 360 361func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 362 // no-op 363} 364 365func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 366 // no-op for now; webhooks are handled by the webhook notifier 367} 368 369func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 370 l := log.FromContext(ctx) 371 372 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 373 if err != nil { 374 l.Error("failed to fetch collaborators", "err", err) 375 return 376 } 377 378 // build up the recipients list: 379 // - repo owner 380 // - repo collaborators 381 // - all issue participants 382 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 383 for _, c := range collaborators { 384 recipients.Insert(c.SubjectDid) 385 } 386 for _, p := range issue.Participants() { 387 recipients.Insert(syntax.DID(p)) 388 } 389 390 entityType := "issue" 391 entityId := issue.AtUri().String() 392 repoId := &issue.Repo.Id 393 issueId := &issue.Id 394 var pullId *int64 395 var eventType models.NotificationType 396 397 if issue.Open { 398 eventType = models.NotificationTypeIssueReopen 399 } else { 400 eventType = models.NotificationTypeIssueClosed 401 } 402 403 n.notifyEvent( 404 ctx, 405 actor, 406 recipients, 407 eventType, 408 entityType, 409 entityId, 410 repoId, 411 issueId, 412 pullId, 413 ) 414} 415 416func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 417 l := log.FromContext(ctx) 418 419 // Get repo details 420 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 421 if err != nil { 422 l.Error("failed to get repos", "err", err) 423 return 424 } 425 426 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 427 if err != nil { 428 l.Error("failed to fetch collaborators", "err", err) 429 return 430 } 431 432 // build up the recipients list: 433 // - repo owner 434 // - all pull participants 435 recipients := sets.Singleton(syntax.DID(repo.Did)) 436 for _, c := range collaborators { 437 recipients.Insert(c.SubjectDid) 438 } 439 for _, p := range pull.Participants() { 440 recipients.Insert(syntax.DID(p)) 441 } 442 443 entityType := "pull" 444 entityId := pull.AtUri().String() 445 repoId := &repo.Id 446 var issueId *int64 447 var eventType models.NotificationType 448 switch pull.State { 449 case models.PullClosed: 450 eventType = models.NotificationTypePullClosed 451 case models.PullOpen: 452 eventType = models.NotificationTypePullReopen 453 case models.PullMerged: 454 eventType = models.NotificationTypePullMerged 455 default: 456 l.Error("unexpected new PR state", "state", pull.State) 457 return 458 } 459 p := int64(pull.ID) 460 pullId := &p 461 462 n.notifyEvent( 463 ctx, 464 actor, 465 recipients, 466 eventType, 467 entityType, 468 entityId, 469 repoId, 470 issueId, 471 pullId, 472 ) 473} 474 475func (n *databaseNotifier) notifyEvent( 476 ctx context.Context, 477 actorDid syntax.DID, 478 recipients sets.Set[syntax.DID], 479 eventType models.NotificationType, 480 entityType string, 481 entityId string, 482 repoId *int64, 483 issueId *int64, 484 pullId *int64, 485) { 486 l := log.FromContext(ctx) 487 488 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody 489 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions { 490 return 491 } 492 493 recipients.Remove(actorDid) 494 495 prefMap, err := db.GetNotificationPreferences( 496 n.db, 497 orm.FilterIn("user_did", slices.Collect(recipients.All())), 498 ) 499 if err != nil { 500 // failed to get prefs for users 501 return 502 } 503 504 // create a transaction for bulk notification storage 505 tx, err := n.db.Begin() 506 if err != nil { 507 // failed to start tx 508 return 509 } 510 defer tx.Rollback() 511 512 // filter based on preferences 513 for recipientDid := range recipients.All() { 514 prefs, ok := prefMap[recipientDid] 515 if !ok { 516 prefs = models.DefaultNotificationPreferences(recipientDid) 517 } 518 519 // skip users who don’t want this type 520 if !prefs.ShouldNotify(eventType) { 521 continue 522 } 523 524 // create notification 525 notif := &models.Notification{ 526 RecipientDid: recipientDid.String(), 527 ActorDid: actorDid.String(), 528 Type: eventType, 529 EntityType: entityType, 530 EntityId: entityId, 531 RepoId: repoId, 532 IssueId: issueId, 533 PullId: pullId, 534 } 535 536 if err := db.CreateNotification(tx, notif); err != nil { 537 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err) 538 } 539 } 540 541 if err := tx.Commit(); err != nil { 542 // failed to commit 543 return 544 } 545}