A vibe coded tangled fork which supports pijul.
at sl/wnrvrwyvrlzo 533 lines 12 kB view raw
1package db 2 3import ( 4 "context" 5 "slices" 6 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 "tangled.org/core/api/tangled" 9 "tangled.org/core/appview/db" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/appview/notify" 12 "tangled.org/core/idresolver" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15 "tangled.org/core/sets" 16) 17 18const ( 19 maxMentions = 8 20) 21 22type databaseNotifier struct { 23 db *db.DB 24 res *idresolver.Resolver 25} 26 27func NewDatabaseNotifier(database *db.DB, resolver *idresolver.Resolver) notify.Notifier { 28 return &databaseNotifier{ 29 db: database, 30 res: resolver, 31 } 32} 33 34var _ notify.Notifier = &databaseNotifier{} 35 36func (n *databaseNotifier) NewRepo(ctx context.Context, repo *models.Repo) { 37 // no-op for now 38} 39 40func (n *databaseNotifier) NewStar(ctx context.Context, star *models.Star) { 41 l := log.FromContext(ctx) 42 43 if star.RepoAt.Collection().String() != tangled.RepoNSID { 44 // skip string stars for now 45 return 46 } 47 var err error 48 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(star.RepoAt))) 49 if err != nil { 50 l.Error("failed to get repos", "err", err) 51 return 52 } 53 54 actorDid := syntax.DID(star.Did) 55 recipients := sets.Singleton(syntax.DID(repo.Did)) 56 eventType := models.NotificationTypeRepoStarred 57 entityType := "repo" 58 entityId := star.RepoAt.String() 59 repoId := &repo.Id 60 var issueId *int64 61 var pullId *int64 62 63 n.notifyEvent( 64 ctx, 65 actorDid, 66 recipients, 67 eventType, 68 entityType, 69 entityId, 70 repoId, 71 issueId, 72 pullId, 73 ) 74} 75 76func (n *databaseNotifier) DeleteStar(ctx context.Context, star *models.Star) { 77 // no-op 78} 79 80func (n *databaseNotifier) NewComment(ctx context.Context, comment *models.Comment) { 81 l := log.FromContext(ctx) 82 83 var ( 84 // built the recipients list: 85 // - the owner of the repo 86 // - | if the comment is a reply -> everybody on that thread 87 // | if the comment is a top level -> just the issue owner 88 // - remove mentioned users from the recipients list 89 recipients = sets.New[syntax.DID]() 90 entityType string 91 entityId string 92 repoId *int64 93 issueId *int64 94 pullId *int64 95 ) 96 97 subjectDid, err := comment.Subject.Authority().AsDID() 98 if err != nil { 99 l.Error("expected did based at-uri for comment.subject") 100 return 101 } 102 switch comment.Subject.Collection() { 103 case tangled.RepoIssueNSID: 104 issues, err := db.GetIssues( 105 n.db, 106 orm.FilterEq("did", subjectDid), 107 orm.FilterEq("rkey", comment.Subject.RecordKey()), 108 ) 109 if err != nil { 110 l.Error("failed to get issues", "err", err) 111 return 112 } 113 if len(issues) == 0 { 114 l.Error("no issue found", "subject", comment.Subject) 115 return 116 } 117 issue := issues[0] 118 119 recipients.Insert(syntax.DID(issue.Repo.Did)) 120 if comment.IsReply() { 121 // if this comment is a reply, then notify everybody in that thread 122 parentAtUri := *comment.ReplyTo 123 124 // find the parent thread, and add all DIDs from here to the recipient list 125 for _, t := range issue.CommentList() { 126 if t.Self.AtUri() == parentAtUri { 127 for _, p := range t.Participants() { 128 recipients.Insert(p) 129 } 130 } 131 } 132 } else { 133 // not a reply, notify just the issue author 134 recipients.Insert(syntax.DID(issue.Did)) 135 } 136 137 entityType = "issue" 138 entityId = issue.AtUri().String() 139 repoId = &issue.Repo.Id 140 issueId = &issue.Id 141 case tangled.RepoPullNSID: 142 pulls, err := db.GetPulls( 143 n.db, 144 orm.FilterEq("owner_did", subjectDid), 145 orm.FilterEq("rkey", comment.Subject.RecordKey()), 146 ) 147 if err != nil { 148 l.Error("NewComment: failed to get pulls", "err", err) 149 return 150 } 151 if len(pulls) == 0 { 152 l.Error("NewComment: no pull found", "aturi", comment.Subject) 153 return 154 } 155 pull := pulls[0] 156 157 pull.Repo, err = db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt)) 158 if err != nil { 159 l.Error("NewComment: failed to get repo", "err", err) 160 return 161 } 162 163 recipients.Insert(syntax.DID(pull.Repo.Did)) 164 for _, p := range pull.Participants() { 165 recipients.Insert(syntax.DID(p)) 166 } 167 168 entityType = "pull" 169 entityId = pull.AtUri().String() 170 repoId = &pull.Repo.Id 171 p := int64(pull.ID) 172 pullId = &p 173 default: 174 return // no-op 175 } 176 177 for _, m := range comment.Mentions { 178 recipients.Remove(m) 179 } 180 181 n.notifyEvent( 182 ctx, 183 comment.Did, 184 recipients, 185 models.NotificationTypeIssueCommented, 186 entityType, 187 entityId, 188 repoId, 189 issueId, 190 pullId, 191 ) 192 n.notifyEvent( 193 ctx, 194 comment.Did, 195 sets.Collect(slices.Values(comment.Mentions)), 196 models.NotificationTypeUserMentioned, 197 entityType, 198 entityId, 199 repoId, 200 issueId, 201 pullId, 202 ) 203} 204 205func (n *databaseNotifier) DeleteComment(ctx context.Context, comment *models.Comment) { 206 // no-op 207} 208 209func (n *databaseNotifier) NewIssue(ctx context.Context, issue *models.Issue, mentions []syntax.DID) { 210 l := log.FromContext(ctx) 211 212 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 213 if err != nil { 214 l.Error("failed to fetch collaborators", "err", err) 215 return 216 } 217 218 // build the recipients list 219 // - owner of the repo 220 // - collaborators in the repo 221 // - remove users already mentioned 222 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 223 for _, c := range collaborators { 224 recipients.Insert(c.SubjectDid) 225 } 226 for _, m := range mentions { 227 recipients.Remove(m) 228 } 229 230 actorDid := syntax.DID(issue.Did) 231 entityType := "issue" 232 entityId := issue.AtUri().String() 233 repoId := &issue.Repo.Id 234 issueId := &issue.Id 235 var pullId *int64 236 237 n.notifyEvent( 238 ctx, 239 actorDid, 240 recipients, 241 models.NotificationTypeIssueCreated, 242 entityType, 243 entityId, 244 repoId, 245 issueId, 246 pullId, 247 ) 248 n.notifyEvent( 249 ctx, 250 actorDid, 251 sets.Collect(slices.Values(mentions)), 252 models.NotificationTypeUserMentioned, 253 entityType, 254 entityId, 255 repoId, 256 issueId, 257 pullId, 258 ) 259} 260 261func (n *databaseNotifier) DeleteIssue(ctx context.Context, issue *models.Issue) { 262 // no-op for now 263} 264 265func (n *databaseNotifier) NewIssueLabelOp(ctx context.Context, issue *models.Issue) {} 266func (n *databaseNotifier) NewPullLabelOp(ctx context.Context, pull *models.Pull) {} 267 268func (n *databaseNotifier) NewFollow(ctx context.Context, follow *models.Follow) { 269 actorDid := syntax.DID(follow.UserDid) 270 recipients := sets.Singleton(syntax.DID(follow.SubjectDid)) 271 eventType := models.NotificationTypeFollowed 272 entityType := "follow" 273 entityId := follow.UserDid 274 var repoId, issueId, pullId *int64 275 276 n.notifyEvent( 277 ctx, 278 actorDid, 279 recipients, 280 eventType, 281 entityType, 282 entityId, 283 repoId, 284 issueId, 285 pullId, 286 ) 287} 288 289func (n *databaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) { 290 // no-op 291} 292 293func (n *databaseNotifier) NewPull(ctx context.Context, pull *models.Pull) { 294 l := log.FromContext(ctx) 295 296 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 297 if err != nil { 298 l.Error("failed to get repos", "err", err) 299 return 300 } 301 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 302 if err != nil { 303 l.Error("failed to fetch collaborators", "err", err) 304 return 305 } 306 307 // build the recipients list 308 // - owner of the repo 309 // - collaborators in the repo 310 recipients := sets.Singleton(syntax.DID(repo.Did)) 311 for _, c := range collaborators { 312 recipients.Insert(c.SubjectDid) 313 } 314 315 actorDid := syntax.DID(pull.OwnerDid) 316 eventType := models.NotificationTypePullCreated 317 entityType := "pull" 318 entityId := pull.AtUri().String() 319 repoId := &repo.Id 320 var issueId *int64 321 p := int64(pull.ID) 322 pullId := &p 323 324 n.notifyEvent( 325 ctx, 326 actorDid, 327 recipients, 328 eventType, 329 entityType, 330 entityId, 331 repoId, 332 issueId, 333 pullId, 334 ) 335} 336 337func (n *databaseNotifier) UpdateProfile(ctx context.Context, profile *models.Profile) { 338 // no-op 339} 340 341func (n *databaseNotifier) DeleteString(ctx context.Context, did, rkey string) { 342 // no-op 343} 344 345func (n *databaseNotifier) EditString(ctx context.Context, string *models.String) { 346 // no-op 347} 348 349func (n *databaseNotifier) NewString(ctx context.Context, string *models.String) { 350 // no-op 351} 352 353func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 354 // no-op for now; webhooks are handled by the webhook notifier 355} 356 357func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 358 l := log.FromContext(ctx) 359 360 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", issue.Repo.RepoAt())) 361 if err != nil { 362 l.Error("failed to fetch collaborators", "err", err) 363 return 364 } 365 366 // build up the recipients list: 367 // - repo owner 368 // - repo collaborators 369 // - all issue participants 370 recipients := sets.Singleton(syntax.DID(issue.Repo.Did)) 371 for _, c := range collaborators { 372 recipients.Insert(c.SubjectDid) 373 } 374 for _, p := range issue.Participants() { 375 recipients.Insert(syntax.DID(p)) 376 } 377 378 entityType := "issue" 379 entityId := issue.AtUri().String() 380 repoId := &issue.Repo.Id 381 issueId := &issue.Id 382 var pullId *int64 383 var eventType models.NotificationType 384 385 if issue.Open { 386 eventType = models.NotificationTypeIssueReopen 387 } else { 388 eventType = models.NotificationTypeIssueClosed 389 } 390 391 n.notifyEvent( 392 ctx, 393 actor, 394 recipients, 395 eventType, 396 entityType, 397 entityId, 398 repoId, 399 issueId, 400 pullId, 401 ) 402} 403 404func (n *databaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { 405 l := log.FromContext(ctx) 406 407 // Get repo details 408 repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", string(pull.RepoAt))) 409 if err != nil { 410 l.Error("failed to get repos", "err", err) 411 return 412 } 413 414 collaborators, err := db.GetCollaborators(n.db, orm.FilterEq("repo_at", repo.RepoAt())) 415 if err != nil { 416 l.Error("failed to fetch collaborators", "err", err) 417 return 418 } 419 420 // build up the recipients list: 421 // - repo owner 422 // - all pull participants 423 recipients := sets.Singleton(syntax.DID(repo.Did)) 424 for _, c := range collaborators { 425 recipients.Insert(c.SubjectDid) 426 } 427 for _, p := range pull.Participants() { 428 recipients.Insert(syntax.DID(p)) 429 } 430 431 entityType := "pull" 432 entityId := pull.AtUri().String() 433 repoId := &repo.Id 434 var issueId *int64 435 var eventType models.NotificationType 436 switch pull.State { 437 case models.PullClosed: 438 eventType = models.NotificationTypePullClosed 439 case models.PullOpen: 440 eventType = models.NotificationTypePullReopen 441 case models.PullMerged: 442 eventType = models.NotificationTypePullMerged 443 default: 444 l.Error("unexpected new PR state", "state", pull.State) 445 return 446 } 447 p := int64(pull.ID) 448 pullId := &p 449 450 n.notifyEvent( 451 ctx, 452 actor, 453 recipients, 454 eventType, 455 entityType, 456 entityId, 457 repoId, 458 issueId, 459 pullId, 460 ) 461} 462 463func (n *databaseNotifier) notifyEvent( 464 ctx context.Context, 465 actorDid syntax.DID, 466 recipients sets.Set[syntax.DID], 467 eventType models.NotificationType, 468 entityType string, 469 entityId string, 470 repoId *int64, 471 issueId *int64, 472 pullId *int64, 473) { 474 l := log.FromContext(ctx) 475 476 // if the user is attempting to mention >maxMentions users, this is probably spam, do not mention anybody 477 if eventType == models.NotificationTypeUserMentioned && recipients.Len() > maxMentions { 478 return 479 } 480 481 recipients.Remove(actorDid) 482 483 prefMap, err := db.GetNotificationPreferences( 484 n.db, 485 orm.FilterIn("user_did", slices.Collect(recipients.All())), 486 ) 487 if err != nil { 488 // failed to get prefs for users 489 return 490 } 491 492 // create a transaction for bulk notification storage 493 tx, err := n.db.Begin() 494 if err != nil { 495 // failed to start tx 496 return 497 } 498 defer tx.Rollback() 499 500 // filter based on preferences 501 for recipientDid := range recipients.All() { 502 prefs, ok := prefMap[recipientDid] 503 if !ok { 504 prefs = models.DefaultNotificationPreferences(recipientDid) 505 } 506 507 // skip users who don’t want this type 508 if !prefs.ShouldNotify(eventType) { 509 continue 510 } 511 512 // create notification 513 notif := &models.Notification{ 514 RecipientDid: recipientDid.String(), 515 ActorDid: actorDid.String(), 516 Type: eventType, 517 EntityType: entityType, 518 EntityId: entityId, 519 RepoId: repoId, 520 IssueId: issueId, 521 PullId: pullId, 522 } 523 524 if err := db.CreateNotification(tx, notif); err != nil { 525 l.Error("failed to create notification", "recipientDid", recipientDid, "err", err) 526 } 527 } 528 529 if err := tx.Commit(); err != nil { 530 // failed to commit 531 return 532 } 533}