A vibe coded tangled fork which supports pijul.
at c25b9122e5530efbb3f10a095482ce415e1c8712 1224 lines 30 kB view raw
1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/appview/cloudflare" 16 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 "tangled.org/core/appview/oauth" 23 "tangled.org/core/appview/pages" 24 "tangled.org/core/appview/reporesolver" 25 "tangled.org/core/appview/validator" 26 xrpcclient "tangled.org/core/appview/xrpcclient" 27 "tangled.org/core/eventconsumer" 28 "tangled.org/core/idresolver" 29 "tangled.org/core/orm" 30 "tangled.org/core/rbac" 31 "tangled.org/core/tid" 32 "tangled.org/core/xrpc/serviceauth" 33 34 comatproto "github.com/bluesky-social/indigo/api/atproto" 35 atpclient "github.com/bluesky-social/indigo/atproto/client" 36 "github.com/bluesky-social/indigo/atproto/syntax" 37 lexutil "github.com/bluesky-social/indigo/lex/util" 38 securejoin "github.com/cyphar/filepath-securejoin" 39 "github.com/go-chi/chi/v5" 40) 41 42type Repo struct { 43 repoResolver *reporesolver.RepoResolver 44 idResolver *idresolver.Resolver 45 config *config.Config 46 oauth *oauth.OAuth 47 pages *pages.Pages 48 spindlestream *eventconsumer.Consumer 49 db *db.DB 50 enforcer *rbac.Enforcer 51 notifier notify.Notifier 52 logger *slog.Logger 53 serviceAuth *serviceauth.ServiceAuth 54 validator *validator.Validator 55 cfClient *cloudflare.Client 56} 57 58func New( 59 oauth *oauth.OAuth, 60 repoResolver *reporesolver.RepoResolver, 61 pages *pages.Pages, 62 spindlestream *eventconsumer.Consumer, 63 idResolver *idresolver.Resolver, 64 db *db.DB, 65 config *config.Config, 66 notifier notify.Notifier, 67 enforcer *rbac.Enforcer, 68 logger *slog.Logger, 69 validator *validator.Validator, 70 cfClient *cloudflare.Client, 71) *Repo { 72 return &Repo{ 73 oauth: oauth, 74 repoResolver: repoResolver, 75 pages: pages, 76 idResolver: idResolver, 77 config: config, 78 spindlestream: spindlestream, 79 db: db, 80 notifier: notifier, 81 enforcer: enforcer, 82 logger: logger, 83 validator: validator, 84 cfClient: cfClient, 85 } 86} 87 88// modify the spindle configured for this repo 89func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 90 user := rp.oauth.GetMultiAccountUser(r) 91 l := rp.logger.With("handler", "EditSpindle") 92 l = l.With("did", user.Active.Did) 93 94 errorId := "operation-error" 95 fail := func(msg string, err error) { 96 l.Error(msg, "err", err) 97 rp.pages.Notice(w, errorId, msg) 98 } 99 100 f, err := rp.repoResolver.Resolve(r) 101 if err != nil { 102 fail("Failed to resolve repo. Try again later", err) 103 return 104 } 105 106 newSpindle := r.FormValue("spindle") 107 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 108 client, err := rp.oauth.AuthorizedClient(r) 109 if err != nil { 110 fail("Failed to authorize. Try again later.", err) 111 return 112 } 113 114 if !removingSpindle { 115 // ensure that this is a valid spindle for this user 116 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did) 117 if err != nil { 118 fail("Failed to find spindles. Try again later.", err) 119 return 120 } 121 122 if !slices.Contains(validSpindles, newSpindle) { 123 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 124 return 125 } 126 } 127 128 newRepo := *f 129 newRepo.Spindle = newSpindle 130 record := newRepo.AsRecord() 131 132 spindlePtr := &newSpindle 133 if removingSpindle { 134 spindlePtr = nil 135 newRepo.Spindle = "" 136 } 137 138 // optimistic update 139 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr) 140 if err != nil { 141 fail("Failed to update spindle. Try again later.", err) 142 return 143 } 144 145 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 146 if err != nil { 147 fail("Failed to update spindle, no record found on PDS.", err) 148 return 149 } 150 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 151 Collection: tangled.RepoNSID, 152 Repo: newRepo.Did, 153 Rkey: newRepo.Rkey, 154 SwapRecord: ex.Cid, 155 Record: &lexutil.LexiconTypeDecoder{ 156 Val: &record, 157 }, 158 }) 159 160 if err != nil { 161 fail("Failed to update spindle, unable to save to PDS.", err) 162 return 163 } 164 165 if !removingSpindle { 166 // add this spindle to spindle stream 167 rp.spindlestream.AddSource( 168 context.Background(), 169 eventconsumer.NewSpindleSource(newSpindle), 170 ) 171 } 172 173 rp.pages.HxRefresh(w) 174} 175 176func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 177 user := rp.oauth.GetMultiAccountUser(r) 178 l := rp.logger.With("handler", "AddLabel") 179 l = l.With("did", user.Active.Did) 180 181 f, err := rp.repoResolver.Resolve(r) 182 if err != nil { 183 l.Error("failed to get repo and knot", "err", err) 184 return 185 } 186 187 errorId := "add-label-error" 188 fail := func(msg string, err error) { 189 l.Error(msg, "err", err) 190 rp.pages.Notice(w, errorId, msg) 191 } 192 193 // get form values for label definition 194 name := r.FormValue("name") 195 concreteType := r.FormValue("valueType") 196 valueFormat := r.FormValue("valueFormat") 197 enumValues := r.FormValue("enumValues") 198 scope := r.Form["scope"] 199 color := r.FormValue("color") 200 multiple := r.FormValue("multiple") == "true" 201 202 var variants []string 203 for part := range strings.SplitSeq(enumValues, ",") { 204 if part = strings.TrimSpace(part); part != "" { 205 variants = append(variants, part) 206 } 207 } 208 209 if concreteType == "" { 210 concreteType = "null" 211 } 212 213 format := models.ValueTypeFormatAny 214 if valueFormat == "did" { 215 format = models.ValueTypeFormatDid 216 } 217 218 valueType := models.ValueType{ 219 Type: models.ConcreteType(concreteType), 220 Format: format, 221 Enum: variants, 222 } 223 224 label := models.LabelDefinition{ 225 Did: user.Active.Did, 226 Rkey: tid.TID(), 227 Name: name, 228 ValueType: valueType, 229 Scope: scope, 230 Color: &color, 231 Multiple: multiple, 232 Created: time.Now(), 233 } 234 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 235 fail(err.Error(), err) 236 return 237 } 238 239 // announce this relation into the firehose, store into owners' pds 240 client, err := rp.oauth.AuthorizedClient(r) 241 if err != nil { 242 fail(err.Error(), err) 243 return 244 } 245 246 // emit a labelRecord 247 labelRecord := label.AsRecord() 248 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 249 Collection: tangled.LabelDefinitionNSID, 250 Repo: label.Did, 251 Rkey: label.Rkey, 252 Record: &lexutil.LexiconTypeDecoder{ 253 Val: &labelRecord, 254 }, 255 }) 256 // invalid record 257 if err != nil { 258 fail("Failed to write record to PDS.", err) 259 return 260 } 261 262 aturi := resp.Uri 263 l = l.With("at-uri", aturi) 264 l.Info("wrote label record to PDS") 265 266 // update the repo to subscribe to this label 267 newRepo := *f 268 newRepo.Labels = append(newRepo.Labels, aturi) 269 repoRecord := newRepo.AsRecord() 270 271 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 272 if err != nil { 273 fail("Failed to update labels, no record found on PDS.", err) 274 return 275 } 276 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 277 Collection: tangled.RepoNSID, 278 Repo: newRepo.Did, 279 Rkey: newRepo.Rkey, 280 SwapRecord: ex.Cid, 281 Record: &lexutil.LexiconTypeDecoder{ 282 Val: &repoRecord, 283 }, 284 }) 285 if err != nil { 286 fail("Failed to update labels for repo.", err) 287 return 288 } 289 290 tx, err := rp.db.BeginTx(r.Context(), nil) 291 if err != nil { 292 fail("Failed to add label.", err) 293 return 294 } 295 296 rollback := func() { 297 err1 := tx.Rollback() 298 err2 := rollbackRecord(context.Background(), aturi, client) 299 300 // ignore txn complete errors, this is okay 301 if errors.Is(err1, sql.ErrTxDone) { 302 err1 = nil 303 } 304 305 if errs := errors.Join(err1, err2); errs != nil { 306 l.Error("failed to rollback changes", "errs", errs) 307 return 308 } 309 } 310 defer rollback() 311 312 _, err = db.AddLabelDefinition(tx, &label) 313 if err != nil { 314 fail("Failed to add label.", err) 315 return 316 } 317 318 err = db.SubscribeLabel(tx, &models.RepoLabel{ 319 RepoAt: f.RepoAt(), 320 LabelAt: label.AtUri(), 321 }) 322 323 err = tx.Commit() 324 if err != nil { 325 fail("Failed to add label.", err) 326 return 327 } 328 329 // clear aturi when everything is successful 330 aturi = "" 331 332 rp.pages.HxRefresh(w) 333} 334 335func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 336 user := rp.oauth.GetMultiAccountUser(r) 337 l := rp.logger.With("handler", "DeleteLabel") 338 l = l.With("did", user.Active.Did) 339 340 f, err := rp.repoResolver.Resolve(r) 341 if err != nil { 342 l.Error("failed to get repo and knot", "err", err) 343 return 344 } 345 346 errorId := "label-operation" 347 fail := func(msg string, err error) { 348 l.Error(msg, "err", err) 349 rp.pages.Notice(w, errorId, msg) 350 } 351 352 // get form values 353 labelId := r.FormValue("label-id") 354 355 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 356 if err != nil { 357 fail("Failed to find label definition.", err) 358 return 359 } 360 361 client, err := rp.oauth.AuthorizedClient(r) 362 if err != nil { 363 fail(err.Error(), err) 364 return 365 } 366 367 // delete label record from PDS 368 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 369 Collection: tangled.LabelDefinitionNSID, 370 Repo: label.Did, 371 Rkey: label.Rkey, 372 }) 373 if err != nil { 374 fail("Failed to delete label record from PDS.", err) 375 return 376 } 377 378 // update repo record to remove the label reference 379 newRepo := *f 380 var updated []string 381 removedAt := label.AtUri().String() 382 for _, l := range newRepo.Labels { 383 if l != removedAt { 384 updated = append(updated, l) 385 } 386 } 387 newRepo.Labels = updated 388 repoRecord := newRepo.AsRecord() 389 390 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 391 if err != nil { 392 fail("Failed to update labels, no record found on PDS.", err) 393 return 394 } 395 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 396 Collection: tangled.RepoNSID, 397 Repo: newRepo.Did, 398 Rkey: newRepo.Rkey, 399 SwapRecord: ex.Cid, 400 Record: &lexutil.LexiconTypeDecoder{ 401 Val: &repoRecord, 402 }, 403 }) 404 if err != nil { 405 fail("Failed to update repo record.", err) 406 return 407 } 408 409 // transaction for DB changes 410 tx, err := rp.db.BeginTx(r.Context(), nil) 411 if err != nil { 412 fail("Failed to delete label.", err) 413 return 414 } 415 defer tx.Rollback() 416 417 err = db.UnsubscribeLabel( 418 tx, 419 orm.FilterEq("repo_at", f.RepoAt()), 420 orm.FilterEq("label_at", removedAt), 421 ) 422 if err != nil { 423 fail("Failed to unsubscribe label.", err) 424 return 425 } 426 427 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 428 if err != nil { 429 fail("Failed to delete label definition.", err) 430 return 431 } 432 433 err = tx.Commit() 434 if err != nil { 435 fail("Failed to delete label.", err) 436 return 437 } 438 439 // everything succeeded 440 rp.pages.HxRefresh(w) 441} 442 443func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 444 user := rp.oauth.GetMultiAccountUser(r) 445 l := rp.logger.With("handler", "SubscribeLabel") 446 l = l.With("did", user.Active.Did) 447 448 f, err := rp.repoResolver.Resolve(r) 449 if err != nil { 450 l.Error("failed to get repo and knot", "err", err) 451 return 452 } 453 454 if err := r.ParseForm(); err != nil { 455 l.Error("invalid form", "err", err) 456 return 457 } 458 459 errorId := "default-label-operation" 460 fail := func(msg string, err error) { 461 l.Error(msg, "err", err) 462 rp.pages.Notice(w, errorId, msg) 463 } 464 465 labelAts := r.Form["label"] 466 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 467 if err != nil { 468 fail("Failed to subscribe to label.", err) 469 return 470 } 471 472 newRepo := *f 473 newRepo.Labels = append(newRepo.Labels, labelAts...) 474 475 // dedup 476 slices.Sort(newRepo.Labels) 477 newRepo.Labels = slices.Compact(newRepo.Labels) 478 479 repoRecord := newRepo.AsRecord() 480 481 client, err := rp.oauth.AuthorizedClient(r) 482 if err != nil { 483 fail(err.Error(), err) 484 return 485 } 486 487 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 488 if err != nil { 489 fail("Failed to update labels, no record found on PDS.", err) 490 return 491 } 492 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 493 Collection: tangled.RepoNSID, 494 Repo: newRepo.Did, 495 Rkey: newRepo.Rkey, 496 SwapRecord: ex.Cid, 497 Record: &lexutil.LexiconTypeDecoder{ 498 Val: &repoRecord, 499 }, 500 }) 501 502 tx, err := rp.db.Begin() 503 if err != nil { 504 fail("Failed to subscribe to label.", err) 505 return 506 } 507 defer tx.Rollback() 508 509 for _, l := range labelAts { 510 err = db.SubscribeLabel(tx, &models.RepoLabel{ 511 RepoAt: f.RepoAt(), 512 LabelAt: syntax.ATURI(l), 513 }) 514 if err != nil { 515 fail("Failed to subscribe to label.", err) 516 return 517 } 518 } 519 520 if err := tx.Commit(); err != nil { 521 fail("Failed to subscribe to label.", err) 522 return 523 } 524 525 // everything succeeded 526 rp.pages.HxRefresh(w) 527} 528 529func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 530 user := rp.oauth.GetMultiAccountUser(r) 531 l := rp.logger.With("handler", "UnsubscribeLabel") 532 l = l.With("did", user.Active.Did) 533 534 f, err := rp.repoResolver.Resolve(r) 535 if err != nil { 536 l.Error("failed to get repo and knot", "err", err) 537 return 538 } 539 540 if err := r.ParseForm(); err != nil { 541 l.Error("invalid form", "err", err) 542 return 543 } 544 545 errorId := "default-label-operation" 546 fail := func(msg string, err error) { 547 l.Error(msg, "err", err) 548 rp.pages.Notice(w, errorId, msg) 549 } 550 551 labelAts := r.Form["label"] 552 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 553 if err != nil { 554 fail("Failed to unsubscribe to label.", err) 555 return 556 } 557 558 // update repo record to remove the label reference 559 newRepo := *f 560 var updated []string 561 for _, l := range newRepo.Labels { 562 if !slices.Contains(labelAts, l) { 563 updated = append(updated, l) 564 } 565 } 566 newRepo.Labels = updated 567 repoRecord := newRepo.AsRecord() 568 569 client, err := rp.oauth.AuthorizedClient(r) 570 if err != nil { 571 fail(err.Error(), err) 572 return 573 } 574 575 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 576 if err != nil { 577 fail("Failed to update labels, no record found on PDS.", err) 578 return 579 } 580 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 581 Collection: tangled.RepoNSID, 582 Repo: newRepo.Did, 583 Rkey: newRepo.Rkey, 584 SwapRecord: ex.Cid, 585 Record: &lexutil.LexiconTypeDecoder{ 586 Val: &repoRecord, 587 }, 588 }) 589 590 err = db.UnsubscribeLabel( 591 rp.db, 592 orm.FilterEq("repo_at", f.RepoAt()), 593 orm.FilterIn("label_at", labelAts), 594 ) 595 if err != nil { 596 fail("Failed to unsubscribe label.", err) 597 return 598 } 599 600 // everything succeeded 601 rp.pages.HxRefresh(w) 602} 603 604func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 605 l := rp.logger.With("handler", "LabelPanel") 606 607 f, err := rp.repoResolver.Resolve(r) 608 if err != nil { 609 l.Error("failed to get repo and knot", "err", err) 610 return 611 } 612 613 subjectStr := r.FormValue("subject") 614 subject, err := syntax.ParseATURI(subjectStr) 615 if err != nil { 616 l.Error("failed to get repo and knot", "err", err) 617 return 618 } 619 620 labelDefs, err := db.GetLabelDefinitions( 621 rp.db, 622 orm.FilterIn("at_uri", f.Labels), 623 orm.FilterContains("scope", subject.Collection().String()), 624 ) 625 if err != nil { 626 l.Error("failed to fetch label defs", "err", err) 627 return 628 } 629 630 defs := make(map[string]*models.LabelDefinition) 631 for _, l := range labelDefs { 632 defs[l.AtUri().String()] = &l 633 } 634 635 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 636 if err != nil { 637 l.Error("failed to build label state", "err", err) 638 return 639 } 640 state := states[subject] 641 642 user := rp.oauth.GetMultiAccountUser(r) 643 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 644 LoggedInUser: user, 645 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 646 Defs: defs, 647 Subject: subject.String(), 648 State: state, 649 }) 650} 651 652func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 653 l := rp.logger.With("handler", "EditLabelPanel") 654 655 f, err := rp.repoResolver.Resolve(r) 656 if err != nil { 657 l.Error("failed to get repo and knot", "err", err) 658 return 659 } 660 661 subjectStr := r.FormValue("subject") 662 subject, err := syntax.ParseATURI(subjectStr) 663 if err != nil { 664 l.Error("failed to get repo and knot", "err", err) 665 return 666 } 667 668 labelDefs, err := db.GetLabelDefinitions( 669 rp.db, 670 orm.FilterIn("at_uri", f.Labels), 671 orm.FilterContains("scope", subject.Collection().String()), 672 ) 673 if err != nil { 674 l.Error("failed to fetch labels", "err", err) 675 return 676 } 677 678 defs := make(map[string]*models.LabelDefinition) 679 for _, l := range labelDefs { 680 defs[l.AtUri().String()] = &l 681 } 682 683 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 684 if err != nil { 685 l.Error("failed to build label state", "err", err) 686 return 687 } 688 state := states[subject] 689 690 user := rp.oauth.GetMultiAccountUser(r) 691 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 692 LoggedInUser: user, 693 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 694 Defs: defs, 695 Subject: subject.String(), 696 State: state, 697 }) 698} 699 700func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 701 user := rp.oauth.GetMultiAccountUser(r) 702 l := rp.logger.With("handler", "AddCollaborator") 703 l = l.With("did", user.Active.Did) 704 705 f, err := rp.repoResolver.Resolve(r) 706 if err != nil { 707 l.Error("failed to get repo and knot", "err", err) 708 return 709 } 710 711 errorId := "add-collaborator-error" 712 fail := func(msg string, err error) { 713 l.Error(msg, "err", err) 714 rp.pages.Notice(w, errorId, msg) 715 } 716 717 collaborator := r.FormValue("collaborator") 718 if collaborator == "" { 719 fail("Invalid form.", nil) 720 return 721 } 722 723 // remove a single leading `@`, to make @handle work with ResolveIdent 724 collaborator = strings.TrimPrefix(collaborator, "@") 725 726 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 727 if err != nil { 728 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 729 return 730 } 731 732 if collaboratorIdent.DID.String() == user.Active.Did { 733 fail("You seem to be adding yourself as a collaborator.", nil) 734 return 735 } 736 l = l.With("collaborator", collaboratorIdent.Handle) 737 l = l.With("knot", f.Knot) 738 739 // announce this relation into the firehose, store into owners' pds 740 client, err := rp.oauth.AuthorizedClient(r) 741 if err != nil { 742 fail("Failed to write to PDS.", err) 743 return 744 } 745 746 // emit a record 747 currentUser := rp.oauth.GetMultiAccountUser(r) 748 rkey := tid.TID() 749 createdAt := time.Now() 750 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 751 Collection: tangled.RepoCollaboratorNSID, 752 Repo: currentUser.Active.Did, 753 Rkey: rkey, 754 Record: &lexutil.LexiconTypeDecoder{ 755 Val: &tangled.RepoCollaborator{ 756 Subject: collaboratorIdent.DID.String(), 757 Repo: string(f.RepoAt()), 758 CreatedAt: createdAt.Format(time.RFC3339), 759 }}, 760 }) 761 // invalid record 762 if err != nil { 763 fail("Failed to write record to PDS.", err) 764 return 765 } 766 767 aturi := resp.Uri 768 l = l.With("at-uri", aturi) 769 l.Info("wrote record to PDS") 770 771 tx, err := rp.db.BeginTx(r.Context(), nil) 772 if err != nil { 773 fail("Failed to add collaborator.", err) 774 return 775 } 776 777 rollback := func() { 778 err1 := tx.Rollback() 779 err2 := rp.enforcer.E.LoadPolicy() 780 err3 := rollbackRecord(context.Background(), aturi, client) 781 782 // ignore txn complete errors, this is okay 783 if errors.Is(err1, sql.ErrTxDone) { 784 err1 = nil 785 } 786 787 if errs := errors.Join(err1, err2, err3); errs != nil { 788 l.Error("failed to rollback changes", "errs", errs) 789 return 790 } 791 } 792 defer rollback() 793 794 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo()) 795 if err != nil { 796 fail("Failed to add collaborator permissions.", err) 797 return 798 } 799 800 err = db.AddCollaborator(tx, models.Collaborator{ 801 Did: syntax.DID(currentUser.Active.Did), 802 Rkey: rkey, 803 SubjectDid: collaboratorIdent.DID, 804 RepoAt: f.RepoAt(), 805 Created: createdAt, 806 }) 807 if err != nil { 808 fail("Failed to add collaborator.", err) 809 return 810 } 811 812 err = tx.Commit() 813 if err != nil { 814 fail("Failed to add collaborator.", err) 815 return 816 } 817 818 err = rp.enforcer.E.SavePolicy() 819 if err != nil { 820 fail("Failed to update collaborator permissions.", err) 821 return 822 } 823 824 // clear aturi to when everything is successful 825 aturi = "" 826 827 rp.pages.HxRefresh(w) 828} 829 830func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 831 user := rp.oauth.GetMultiAccountUser(r) 832 l := rp.logger.With("handler", "DeleteRepo") 833 834 noticeId := "operation-error" 835 f, err := rp.repoResolver.Resolve(r) 836 if err != nil { 837 l.Error("failed to get repo and knot", "err", err) 838 return 839 } 840 841 // remove record from pds 842 atpClient, err := rp.oauth.AuthorizedClient(r) 843 if err != nil { 844 l.Error("failed to get authorized client", "err", err) 845 return 846 } 847 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 848 Collection: tangled.RepoNSID, 849 Repo: user.Active.Did, 850 Rkey: f.Rkey, 851 }) 852 if err != nil { 853 l.Error("failed to delete record", "err", err) 854 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 855 return 856 } 857 l.Info("removed repo record", "aturi", f.RepoAt().String()) 858 859 client, err := rp.oauth.ServiceClient( 860 r, 861 oauth.WithService(f.Knot), 862 oauth.WithLxm(tangled.RepoDeleteNSID), 863 oauth.WithDev(rp.config.Core.Dev), 864 ) 865 if err != nil { 866 l.Error("failed to connect to knot server", "err", err) 867 return 868 } 869 870 err = tangled.RepoDelete( 871 r.Context(), 872 client, 873 &tangled.RepoDelete_Input{ 874 Did: f.Did, 875 Name: f.Name, 876 Rkey: f.Rkey, 877 }, 878 ) 879 if err := xrpcclient.HandleXrpcErr(err); err != nil { 880 rp.pages.Notice(w, noticeId, err.Error()) 881 return 882 } 883 l.Info("deleted repo from knot") 884 885 tx, err := rp.db.BeginTx(r.Context(), nil) 886 if err != nil { 887 l.Error("failed to start tx") 888 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 889 return 890 } 891 defer func() { 892 tx.Rollback() 893 err = rp.enforcer.E.LoadPolicy() 894 if err != nil { 895 l.Error("failed to rollback policies") 896 } 897 }() 898 899 // remove collaborator RBAC 900 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot) 901 if err != nil { 902 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 903 return 904 } 905 for _, c := range repoCollaborators { 906 did := c[0] 907 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo()) 908 } 909 l.Info("removed collaborators") 910 911 // remove repo RBAC 912 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo()) 913 if err != nil { 914 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 915 return 916 } 917 918 // remove repo from db 919 err = db.RemoveRepo(tx, f.Did, f.Name) 920 if err != nil { 921 rp.pages.Notice(w, noticeId, "Failed to update appview") 922 return 923 } 924 l.Info("removed repo from db") 925 926 err = tx.Commit() 927 if err != nil { 928 l.Error("failed to commit changes", "err", err) 929 http.Error(w, err.Error(), http.StatusInternalServerError) 930 return 931 } 932 933 err = rp.enforcer.E.SavePolicy() 934 if err != nil { 935 l.Error("failed to update ACLs", "err", err) 936 http.Error(w, err.Error(), http.StatusInternalServerError) 937 return 938 } 939 940 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 941} 942 943func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 944 l := rp.logger.With("handler", "SyncRepoFork") 945 946 ref := chi.URLParam(r, "ref") 947 ref, _ = url.PathUnescape(ref) 948 949 user := rp.oauth.GetMultiAccountUser(r) 950 f, err := rp.repoResolver.Resolve(r) 951 if err != nil { 952 l.Error("failed to resolve source repo", "err", err) 953 return 954 } 955 956 switch r.Method { 957 case http.MethodPost: 958 client, err := rp.oauth.ServiceClient( 959 r, 960 oauth.WithService(f.Knot), 961 oauth.WithLxm(tangled.RepoForkSyncNSID), 962 oauth.WithDev(rp.config.Core.Dev), 963 ) 964 if err != nil { 965 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 966 return 967 } 968 969 if f.Source == "" { 970 rp.pages.Notice(w, "repo", "This repository is not a fork.") 971 return 972 } 973 974 err = tangled.RepoForkSync( 975 r.Context(), 976 client, 977 &tangled.RepoForkSync_Input{ 978 Did: user.Active.Did, 979 Name: f.Name, 980 Source: f.Source, 981 Branch: ref, 982 }, 983 ) 984 if err := xrpcclient.HandleXrpcErr(err); err != nil { 985 rp.pages.Notice(w, "repo", err.Error()) 986 return 987 } 988 989 rp.pages.HxRefresh(w) 990 return 991 } 992} 993 994func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 995 l := rp.logger.With("handler", "ForkRepo") 996 997 user := rp.oauth.GetMultiAccountUser(r) 998 f, err := rp.repoResolver.Resolve(r) 999 if err != nil { 1000 l.Error("failed to resolve source repo", "err", err) 1001 return 1002 } 1003 1004 switch r.Method { 1005 case http.MethodGet: 1006 user := rp.oauth.GetMultiAccountUser(r) 1007 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did) 1008 if err != nil { 1009 rp.pages.Notice(w, "repo", "Invalid user account.") 1010 return 1011 } 1012 1013 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1014 LoggedInUser: user, 1015 Knots: knots, 1016 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1017 }) 1018 1019 case http.MethodPost: 1020 l := rp.logger.With("handler", "ForkRepo") 1021 1022 targetKnot := r.FormValue("knot") 1023 if targetKnot == "" { 1024 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.") 1025 return 1026 } 1027 l = l.With("targetKnot", targetKnot) 1028 1029 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create") 1030 if err != nil || !ok { 1031 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1032 return 1033 } 1034 1035 // choose a name for a fork 1036 forkName := r.FormValue("repo_name") 1037 if forkName == "" { 1038 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1039 return 1040 } 1041 1042 // this check is *only* to see if the forked repo name already exists 1043 // in the user's account. 1044 existingRepo, err := db.GetRepo( 1045 rp.db, 1046 orm.FilterEq("did", user.Active.Did), 1047 orm.FilterEq("name", forkName), 1048 ) 1049 if err != nil { 1050 if !errors.Is(err, sql.ErrNoRows) { 1051 l.Error("error fetching existing repo from db", "err", err) 1052 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1053 return 1054 } 1055 } else if existingRepo != nil { 1056 // repo with this name already exists 1057 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1058 return 1059 } 1060 l = l.With("forkName", forkName) 1061 1062 uri := "https" 1063 if rp.config.Core.Dev { 1064 uri = "http" 1065 } 1066 1067 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name) 1068 l = l.With("cloneUrl", forkSourceUrl) 1069 1070 sourceAt := f.RepoAt().String() 1071 1072 // create an atproto record for this fork 1073 rkey := tid.TID() 1074 repo := &models.Repo{ 1075 Did: user.Active.Did, 1076 Name: forkName, 1077 Knot: targetKnot, 1078 Rkey: rkey, 1079 Source: sourceAt, 1080 Description: f.Description, 1081 Created: time.Now(), 1082 Labels: rp.config.Label.DefaultLabelDefs, 1083 } 1084 record := repo.AsRecord() 1085 1086 atpClient, err := rp.oauth.AuthorizedClient(r) 1087 if err != nil { 1088 l.Error("failed to create xrpcclient", "err", err) 1089 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1090 return 1091 } 1092 1093 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1094 Collection: tangled.RepoNSID, 1095 Repo: user.Active.Did, 1096 Rkey: rkey, 1097 Record: &lexutil.LexiconTypeDecoder{ 1098 Val: &record, 1099 }, 1100 }) 1101 if err != nil { 1102 l.Error("failed to write to PDS", "err", err) 1103 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1104 return 1105 } 1106 1107 aturi := atresp.Uri 1108 l = l.With("aturi", aturi) 1109 l.Info("wrote to PDS") 1110 1111 tx, err := rp.db.BeginTx(r.Context(), nil) 1112 if err != nil { 1113 l.Info("txn failed", "err", err) 1114 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1115 return 1116 } 1117 1118 // The rollback function reverts a few things on failure: 1119 // - the pending txn 1120 // - the ACLs 1121 // - the atproto record created 1122 rollback := func() { 1123 err1 := tx.Rollback() 1124 err2 := rp.enforcer.E.LoadPolicy() 1125 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1126 1127 // ignore txn complete errors, this is okay 1128 if errors.Is(err1, sql.ErrTxDone) { 1129 err1 = nil 1130 } 1131 1132 if errs := errors.Join(err1, err2, err3); errs != nil { 1133 l.Error("failed to rollback changes", "errs", errs) 1134 return 1135 } 1136 } 1137 defer rollback() 1138 1139 // TODO: this could coordinate better with the knot to recieve a clone status 1140 client, err := rp.oauth.ServiceClient( 1141 r, 1142 oauth.WithService(targetKnot), 1143 oauth.WithLxm(tangled.RepoCreateNSID), 1144 oauth.WithDev(rp.config.Core.Dev), 1145 oauth.WithTimeout(time.Second*20), // big repos take time to clone 1146 ) 1147 if err != nil { 1148 l.Error("could not create service client", "err", err) 1149 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1150 return 1151 } 1152 1153 err = tangled.RepoCreate( 1154 r.Context(), 1155 client, 1156 &tangled.RepoCreate_Input{ 1157 Rkey: rkey, 1158 Source: &forkSourceUrl, 1159 }, 1160 ) 1161 if err := xrpcclient.HandleXrpcErr(err); err != nil { 1162 rp.pages.Notice(w, "repo", err.Error()) 1163 return 1164 } 1165 1166 err = db.AddRepo(tx, repo) 1167 if err != nil { 1168 l.Error("failed to AddRepo", "err", err) 1169 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1170 return 1171 } 1172 1173 // acls 1174 p, _ := securejoin.SecureJoin(user.Active.Did, forkName) 1175 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p) 1176 if err != nil { 1177 l.Error("failed to add ACLs", "err", err) 1178 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1179 return 1180 } 1181 1182 err = tx.Commit() 1183 if err != nil { 1184 l.Error("failed to commit changes", "err", err) 1185 http.Error(w, err.Error(), http.StatusInternalServerError) 1186 return 1187 } 1188 1189 err = rp.enforcer.E.SavePolicy() 1190 if err != nil { 1191 l.Error("failed to update ACLs", "err", err) 1192 http.Error(w, err.Error(), http.StatusInternalServerError) 1193 return 1194 } 1195 1196 // reset the ATURI because the transaction completed successfully 1197 aturi = "" 1198 1199 rp.notifier.NewRepo(r.Context(), repo) 1200 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName)) 1201 } 1202} 1203 1204// this is used to rollback changes made to the PDS 1205// 1206// it is a no-op if the provided ATURI is empty 1207func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 1208 if aturi == "" { 1209 return nil 1210 } 1211 1212 parsed := syntax.ATURI(aturi) 1213 1214 collection := parsed.Collection().String() 1215 repo := parsed.Authority().String() 1216 rkey := parsed.RecordKey().String() 1217 1218 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1219 Collection: collection, 1220 Repo: repo, 1221 Rkey: rkey, 1222 }) 1223 return err 1224}