A vibe coded tangled fork which supports pijul.
at master 1223 lines 30 kB view raw
1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/appview/cloudflare" 16 17 "tangled.org/core/api/tangled" 18 "tangled.org/core/appview/config" 19 "tangled.org/core/appview/db" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 "tangled.org/core/appview/oauth" 23 "tangled.org/core/appview/pages" 24 "tangled.org/core/appview/reporesolver" 25 "tangled.org/core/appview/validator" 26 xrpcclient "tangled.org/core/appview/xrpcclient" 27 "tangled.org/core/eventconsumer" 28 "tangled.org/core/idresolver" 29 "tangled.org/core/orm" 30 "tangled.org/core/rbac" 31 "tangled.org/core/tid" 32 "tangled.org/core/xrpc/serviceauth" 33 34 comatproto "github.com/bluesky-social/indigo/api/atproto" 35 atpclient "github.com/bluesky-social/indigo/atproto/client" 36 "github.com/bluesky-social/indigo/atproto/syntax" 37 lexutil "github.com/bluesky-social/indigo/lex/util" 38 securejoin "github.com/cyphar/filepath-securejoin" 39 "github.com/go-chi/chi/v5" 40) 41 42type Repo struct { 43 repoResolver *reporesolver.RepoResolver 44 idResolver *idresolver.Resolver 45 config *config.Config 46 oauth *oauth.OAuth 47 pages *pages.Pages 48 spindlestream *eventconsumer.Consumer 49 db *db.DB 50 enforcer *rbac.Enforcer 51 notifier notify.Notifier 52 logger *slog.Logger 53 serviceAuth *serviceauth.ServiceAuth 54 validator *validator.Validator 55 cfClient *cloudflare.Client 56} 57 58func New( 59 oauth *oauth.OAuth, 60 repoResolver *reporesolver.RepoResolver, 61 pages *pages.Pages, 62 spindlestream *eventconsumer.Consumer, 63 idResolver *idresolver.Resolver, 64 db *db.DB, 65 config *config.Config, 66 notifier notify.Notifier, 67 enforcer *rbac.Enforcer, 68 logger *slog.Logger, 69 validator *validator.Validator, 70 cfClient *cloudflare.Client, 71) *Repo { 72 return &Repo{ 73 oauth: oauth, 74 repoResolver: repoResolver, 75 pages: pages, 76 idResolver: idResolver, 77 config: config, 78 spindlestream: spindlestream, 79 db: db, 80 notifier: notifier, 81 enforcer: enforcer, 82 logger: logger, 83 validator: validator, 84 cfClient: cfClient, 85 } 86} 87 88// modify the spindle configured for this repo 89func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 90 user := rp.oauth.GetMultiAccountUser(r) 91 l := rp.logger.With("handler", "EditSpindle") 92 l = l.With("did", user.Active.Did) 93 94 errorId := "operation-error" 95 fail := func(msg string, err error) { 96 l.Error(msg, "err", err) 97 rp.pages.Notice(w, errorId, msg) 98 } 99 100 f, err := rp.repoResolver.Resolve(r) 101 if err != nil { 102 fail("Failed to resolve repo. Try again later", err) 103 return 104 } 105 106 newSpindle := r.FormValue("spindle") 107 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 108 client, err := rp.oauth.AuthorizedClient(r) 109 if err != nil { 110 fail("Failed to authorize. Try again later.", err) 111 return 112 } 113 114 if !removingSpindle { 115 // ensure that this is a valid spindle for this user 116 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did) 117 if err != nil { 118 fail("Failed to find spindles. Try again later.", err) 119 return 120 } 121 122 if !slices.Contains(validSpindles, newSpindle) { 123 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 124 return 125 } 126 } 127 128 newRepo := *f 129 newRepo.Spindle = newSpindle 130 record := newRepo.AsRecord() 131 132 spindlePtr := &newSpindle 133 if removingSpindle { 134 spindlePtr = nil 135 newRepo.Spindle = "" 136 } 137 138 // optimistic update 139 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr) 140 if err != nil { 141 fail("Failed to update spindle. Try again later.", err) 142 return 143 } 144 145 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 146 if err != nil { 147 fail("Failed to update spindle, no record found on PDS.", err) 148 return 149 } 150 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 151 Collection: tangled.RepoNSID, 152 Repo: newRepo.Did, 153 Rkey: newRepo.Rkey, 154 SwapRecord: ex.Cid, 155 Record: &lexutil.LexiconTypeDecoder{ 156 Val: &record, 157 }, 158 }) 159 160 if err != nil { 161 fail("Failed to update spindle, unable to save to PDS.", err) 162 return 163 } 164 165 if !removingSpindle { 166 // add this spindle to spindle stream 167 rp.spindlestream.AddSource( 168 context.Background(), 169 eventconsumer.NewSpindleSource(newSpindle), 170 ) 171 } 172 173 rp.pages.HxRefresh(w) 174} 175 176func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 177 user := rp.oauth.GetMultiAccountUser(r) 178 l := rp.logger.With("handler", "AddLabel") 179 l = l.With("did", user.Active.Did) 180 181 f, err := rp.repoResolver.Resolve(r) 182 if err != nil { 183 l.Error("failed to get repo and knot", "err", err) 184 return 185 } 186 187 errorId := "add-label-error" 188 fail := func(msg string, err error) { 189 l.Error(msg, "err", err) 190 rp.pages.Notice(w, errorId, msg) 191 } 192 193 // get form values for label definition 194 name := r.FormValue("name") 195 concreteType := r.FormValue("valueType") 196 valueFormat := r.FormValue("valueFormat") 197 enumValues := r.FormValue("enumValues") 198 scope := r.Form["scope"] 199 color := r.FormValue("color") 200 multiple := r.FormValue("multiple") == "true" 201 202 var variants []string 203 for part := range strings.SplitSeq(enumValues, ",") { 204 if part = strings.TrimSpace(part); part != "" { 205 variants = append(variants, part) 206 } 207 } 208 209 if concreteType == "" { 210 concreteType = "null" 211 } 212 213 format := models.ValueTypeFormatAny 214 if valueFormat == "did" { 215 format = models.ValueTypeFormatDid 216 } 217 218 valueType := models.ValueType{ 219 Type: models.ConcreteType(concreteType), 220 Format: format, 221 Enum: variants, 222 } 223 224 label := models.LabelDefinition{ 225 Did: user.Active.Did, 226 Rkey: tid.TID(), 227 Name: name, 228 ValueType: valueType, 229 Scope: scope, 230 Color: &color, 231 Multiple: multiple, 232 Created: time.Now(), 233 } 234 if err := rp.validator.ValidateLabelDefinition(&label); err != nil { 235 fail(err.Error(), err) 236 return 237 } 238 239 // announce this relation into the firehose, store into owners' pds 240 client, err := rp.oauth.AuthorizedClient(r) 241 if err != nil { 242 fail(err.Error(), err) 243 return 244 } 245 246 // emit a labelRecord 247 labelRecord := label.AsRecord() 248 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 249 Collection: tangled.LabelDefinitionNSID, 250 Repo: label.Did, 251 Rkey: label.Rkey, 252 Record: &lexutil.LexiconTypeDecoder{ 253 Val: &labelRecord, 254 }, 255 }) 256 // invalid record 257 if err != nil { 258 fail("Failed to write record to PDS.", err) 259 return 260 } 261 262 aturi := resp.Uri 263 l = l.With("at-uri", aturi) 264 l.Info("wrote label record to PDS") 265 266 // update the repo to subscribe to this label 267 newRepo := *f 268 newRepo.Labels = append(newRepo.Labels, aturi) 269 repoRecord := newRepo.AsRecord() 270 271 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 272 if err != nil { 273 fail("Failed to update labels, no record found on PDS.", err) 274 return 275 } 276 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 277 Collection: tangled.RepoNSID, 278 Repo: newRepo.Did, 279 Rkey: newRepo.Rkey, 280 SwapRecord: ex.Cid, 281 Record: &lexutil.LexiconTypeDecoder{ 282 Val: &repoRecord, 283 }, 284 }) 285 if err != nil { 286 fail("Failed to update labels for repo.", err) 287 return 288 } 289 290 tx, err := rp.db.BeginTx(r.Context(), nil) 291 if err != nil { 292 fail("Failed to add label.", err) 293 return 294 } 295 296 rollback := func() { 297 err1 := tx.Rollback() 298 err2 := rollbackRecord(context.Background(), aturi, client) 299 300 // ignore txn complete errors, this is okay 301 if errors.Is(err1, sql.ErrTxDone) { 302 err1 = nil 303 } 304 305 if errs := errors.Join(err1, err2); errs != nil { 306 l.Error("failed to rollback changes", "errs", errs) 307 return 308 } 309 } 310 defer rollback() 311 312 _, err = db.AddLabelDefinition(tx, &label) 313 if err != nil { 314 fail("Failed to add label.", err) 315 return 316 } 317 318 err = db.SubscribeLabel(tx, &models.RepoLabel{ 319 RepoAt: f.RepoAt(), 320 LabelAt: label.AtUri(), 321 }) 322 323 err = tx.Commit() 324 if err != nil { 325 fail("Failed to add label.", err) 326 return 327 } 328 329 // clear aturi when everything is successful 330 aturi = "" 331 332 rp.pages.HxRefresh(w) 333} 334 335func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 336 user := rp.oauth.GetMultiAccountUser(r) 337 l := rp.logger.With("handler", "DeleteLabel") 338 l = l.With("did", user.Active.Did) 339 340 f, err := rp.repoResolver.Resolve(r) 341 if err != nil { 342 l.Error("failed to get repo and knot", "err", err) 343 return 344 } 345 346 errorId := "label-operation" 347 fail := func(msg string, err error) { 348 l.Error(msg, "err", err) 349 rp.pages.Notice(w, errorId, msg) 350 } 351 352 // get form values 353 labelId := r.FormValue("label-id") 354 355 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 356 if err != nil { 357 fail("Failed to find label definition.", err) 358 return 359 } 360 361 client, err := rp.oauth.AuthorizedClient(r) 362 if err != nil { 363 fail(err.Error(), err) 364 return 365 } 366 367 // delete label record from PDS 368 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 369 Collection: tangled.LabelDefinitionNSID, 370 Repo: label.Did, 371 Rkey: label.Rkey, 372 }) 373 if err != nil { 374 fail("Failed to delete label record from PDS.", err) 375 return 376 } 377 378 // update repo record to remove the label reference 379 newRepo := *f 380 var updated []string 381 removedAt := label.AtUri().String() 382 for _, l := range newRepo.Labels { 383 if l != removedAt { 384 updated = append(updated, l) 385 } 386 } 387 newRepo.Labels = updated 388 repoRecord := newRepo.AsRecord() 389 390 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 391 if err != nil { 392 fail("Failed to update labels, no record found on PDS.", err) 393 return 394 } 395 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 396 Collection: tangled.RepoNSID, 397 Repo: newRepo.Did, 398 Rkey: newRepo.Rkey, 399 SwapRecord: ex.Cid, 400 Record: &lexutil.LexiconTypeDecoder{ 401 Val: &repoRecord, 402 }, 403 }) 404 if err != nil { 405 fail("Failed to update repo record.", err) 406 return 407 } 408 409 // transaction for DB changes 410 tx, err := rp.db.BeginTx(r.Context(), nil) 411 if err != nil { 412 fail("Failed to delete label.", err) 413 return 414 } 415 defer tx.Rollback() 416 417 err = db.UnsubscribeLabel( 418 tx, 419 orm.FilterEq("repo_at", f.RepoAt()), 420 orm.FilterEq("label_at", removedAt), 421 ) 422 if err != nil { 423 fail("Failed to unsubscribe label.", err) 424 return 425 } 426 427 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 428 if err != nil { 429 fail("Failed to delete label definition.", err) 430 return 431 } 432 433 err = tx.Commit() 434 if err != nil { 435 fail("Failed to delete label.", err) 436 return 437 } 438 439 // everything succeeded 440 rp.pages.HxRefresh(w) 441} 442 443func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 444 user := rp.oauth.GetMultiAccountUser(r) 445 l := rp.logger.With("handler", "SubscribeLabel") 446 l = l.With("did", user.Active.Did) 447 448 f, err := rp.repoResolver.Resolve(r) 449 if err != nil { 450 l.Error("failed to get repo and knot", "err", err) 451 return 452 } 453 454 if err := r.ParseForm(); err != nil { 455 l.Error("invalid form", "err", err) 456 return 457 } 458 459 errorId := "default-label-operation" 460 fail := func(msg string, err error) { 461 l.Error(msg, "err", err) 462 rp.pages.Notice(w, errorId, msg) 463 } 464 465 labelAts := r.Form["label"] 466 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 467 if err != nil { 468 fail("Failed to subscribe to label.", err) 469 return 470 } 471 472 newRepo := *f 473 newRepo.Labels = append(newRepo.Labels, labelAts...) 474 475 // dedup 476 slices.Sort(newRepo.Labels) 477 newRepo.Labels = slices.Compact(newRepo.Labels) 478 479 repoRecord := newRepo.AsRecord() 480 481 client, err := rp.oauth.AuthorizedClient(r) 482 if err != nil { 483 fail(err.Error(), err) 484 return 485 } 486 487 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 488 if err != nil { 489 fail("Failed to update labels, no record found on PDS.", err) 490 return 491 } 492 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 493 Collection: tangled.RepoNSID, 494 Repo: newRepo.Did, 495 Rkey: newRepo.Rkey, 496 SwapRecord: ex.Cid, 497 Record: &lexutil.LexiconTypeDecoder{ 498 Val: &repoRecord, 499 }, 500 }) 501 502 tx, err := rp.db.Begin() 503 if err != nil { 504 fail("Failed to subscribe to label.", err) 505 return 506 } 507 defer tx.Rollback() 508 509 for _, l := range labelAts { 510 err = db.SubscribeLabel(tx, &models.RepoLabel{ 511 RepoAt: f.RepoAt(), 512 LabelAt: syntax.ATURI(l), 513 }) 514 if err != nil { 515 fail("Failed to subscribe to label.", err) 516 return 517 } 518 } 519 520 if err := tx.Commit(); err != nil { 521 fail("Failed to subscribe to label.", err) 522 return 523 } 524 525 // everything succeeded 526 rp.pages.HxRefresh(w) 527} 528 529func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 530 user := rp.oauth.GetMultiAccountUser(r) 531 l := rp.logger.With("handler", "UnsubscribeLabel") 532 l = l.With("did", user.Active.Did) 533 534 f, err := rp.repoResolver.Resolve(r) 535 if err != nil { 536 l.Error("failed to get repo and knot", "err", err) 537 return 538 } 539 540 if err := r.ParseForm(); err != nil { 541 l.Error("invalid form", "err", err) 542 return 543 } 544 545 errorId := "default-label-operation" 546 fail := func(msg string, err error) { 547 l.Error(msg, "err", err) 548 rp.pages.Notice(w, errorId, msg) 549 } 550 551 labelAts := r.Form["label"] 552 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 553 if err != nil { 554 fail("Failed to unsubscribe to label.", err) 555 return 556 } 557 558 // update repo record to remove the label reference 559 newRepo := *f 560 var updated []string 561 for _, l := range newRepo.Labels { 562 if !slices.Contains(labelAts, l) { 563 updated = append(updated, l) 564 } 565 } 566 newRepo.Labels = updated 567 repoRecord := newRepo.AsRecord() 568 569 client, err := rp.oauth.AuthorizedClient(r) 570 if err != nil { 571 fail(err.Error(), err) 572 return 573 } 574 575 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 576 if err != nil { 577 fail("Failed to update labels, no record found on PDS.", err) 578 return 579 } 580 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 581 Collection: tangled.RepoNSID, 582 Repo: newRepo.Did, 583 Rkey: newRepo.Rkey, 584 SwapRecord: ex.Cid, 585 Record: &lexutil.LexiconTypeDecoder{ 586 Val: &repoRecord, 587 }, 588 }) 589 590 err = db.UnsubscribeLabel( 591 rp.db, 592 orm.FilterEq("repo_at", f.RepoAt()), 593 orm.FilterIn("label_at", labelAts), 594 ) 595 if err != nil { 596 fail("Failed to unsubscribe label.", err) 597 return 598 } 599 600 // everything succeeded 601 rp.pages.HxRefresh(w) 602} 603 604func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 605 l := rp.logger.With("handler", "LabelPanel") 606 607 f, err := rp.repoResolver.Resolve(r) 608 if err != nil { 609 l.Error("failed to get repo and knot", "err", err) 610 return 611 } 612 613 subjectStr := r.FormValue("subject") 614 subject, err := syntax.ParseATURI(subjectStr) 615 if err != nil { 616 l.Error("failed to get repo and knot", "err", err) 617 return 618 } 619 620 labelDefs, err := db.GetLabelDefinitions( 621 rp.db, 622 orm.FilterIn("at_uri", f.Labels), 623 orm.FilterContains("scope", subject.Collection().String()), 624 ) 625 if err != nil { 626 l.Error("failed to fetch label defs", "err", err) 627 return 628 } 629 630 defs := make(map[string]*models.LabelDefinition) 631 for _, l := range labelDefs { 632 defs[l.AtUri().String()] = &l 633 } 634 635 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 636 if err != nil { 637 l.Error("failed to build label state", "err", err) 638 return 639 } 640 state := states[subject] 641 642 user := rp.oauth.GetMultiAccountUser(r) 643 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 644 LoggedInUser: user, 645 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 646 Defs: defs, 647 Subject: subject.String(), 648 State: state, 649 }) 650} 651 652func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 653 l := rp.logger.With("handler", "EditLabelPanel") 654 655 f, err := rp.repoResolver.Resolve(r) 656 if err != nil { 657 l.Error("failed to get repo and knot", "err", err) 658 return 659 } 660 661 subjectStr := r.FormValue("subject") 662 subject, err := syntax.ParseATURI(subjectStr) 663 if err != nil { 664 l.Error("failed to get repo and knot", "err", err) 665 return 666 } 667 668 labelDefs, err := db.GetLabelDefinitions( 669 rp.db, 670 orm.FilterIn("at_uri", f.Labels), 671 orm.FilterContains("scope", subject.Collection().String()), 672 ) 673 if err != nil { 674 l.Error("failed to fetch labels", "err", err) 675 return 676 } 677 678 defs := make(map[string]*models.LabelDefinition) 679 for _, l := range labelDefs { 680 defs[l.AtUri().String()] = &l 681 } 682 683 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 684 if err != nil { 685 l.Error("failed to build label state", "err", err) 686 return 687 } 688 state := states[subject] 689 690 user := rp.oauth.GetMultiAccountUser(r) 691 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 692 LoggedInUser: user, 693 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 694 Defs: defs, 695 Subject: subject.String(), 696 State: state, 697 }) 698} 699 700func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 701 user := rp.oauth.GetMultiAccountUser(r) 702 l := rp.logger.With("handler", "AddCollaborator") 703 l = l.With("did", user.Active.Did) 704 705 f, err := rp.repoResolver.Resolve(r) 706 if err != nil { 707 l.Error("failed to get repo and knot", "err", err) 708 return 709 } 710 711 errorId := "add-collaborator-error" 712 fail := func(msg string, err error) { 713 l.Error(msg, "err", err) 714 rp.pages.Notice(w, errorId, msg) 715 } 716 717 collaborator := r.FormValue("collaborator") 718 if collaborator == "" { 719 fail("Invalid form.", nil) 720 return 721 } 722 723 // remove a single leading `@`, to make @handle work with ResolveIdent 724 collaborator = strings.TrimPrefix(collaborator, "@") 725 726 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 727 if err != nil { 728 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 729 return 730 } 731 732 if collaboratorIdent.DID.String() == user.Active.Did { 733 fail("You seem to be adding yourself as a collaborator.", nil) 734 return 735 } 736 l = l.With("collaborator", collaboratorIdent.Handle) 737 l = l.With("knot", f.Knot) 738 739 // announce this relation into the firehose, store into owners' pds 740 client, err := rp.oauth.AuthorizedClient(r) 741 if err != nil { 742 fail("Failed to write to PDS.", err) 743 return 744 } 745 746 // emit a record 747 currentUser := rp.oauth.GetMultiAccountUser(r) 748 rkey := tid.TID() 749 createdAt := time.Now() 750 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 751 Collection: tangled.RepoCollaboratorNSID, 752 Repo: currentUser.Active.Did, 753 Rkey: rkey, 754 Record: &lexutil.LexiconTypeDecoder{ 755 Val: &tangled.RepoCollaborator{ 756 Subject: collaboratorIdent.DID.String(), 757 Repo: string(f.RepoAt()), 758 CreatedAt: createdAt.Format(time.RFC3339), 759 }}, 760 }) 761 // invalid record 762 if err != nil { 763 fail("Failed to write record to PDS.", err) 764 return 765 } 766 767 aturi := resp.Uri 768 l = l.With("at-uri", aturi) 769 l.Info("wrote record to PDS") 770 771 tx, err := rp.db.BeginTx(r.Context(), nil) 772 if err != nil { 773 fail("Failed to add collaborator.", err) 774 return 775 } 776 777 rollback := func() { 778 err1 := tx.Rollback() 779 err2 := rp.enforcer.E.LoadPolicy() 780 err3 := rollbackRecord(context.Background(), aturi, client) 781 782 // ignore txn complete errors, this is okay 783 if errors.Is(err1, sql.ErrTxDone) { 784 err1 = nil 785 } 786 787 if errs := errors.Join(err1, err2, err3); errs != nil { 788 l.Error("failed to rollback changes", "errs", errs) 789 return 790 } 791 } 792 defer rollback() 793 794 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo()) 795 if err != nil { 796 fail("Failed to add collaborator permissions.", err) 797 return 798 } 799 err = db.AddCollaborator(tx, models.Collaborator{ 800 Did: syntax.DID(currentUser.Active.Did), 801 Rkey: rkey, 802 SubjectDid: collaboratorIdent.DID, 803 RepoAt: f.RepoAt(), 804 Created: createdAt, 805 }) 806 if err != nil { 807 fail("Failed to add collaborator.", err) 808 return 809 } 810 811 err = tx.Commit() 812 if err != nil { 813 fail("Failed to add collaborator.", err) 814 return 815 } 816 817 err = rp.enforcer.E.SavePolicy() 818 if err != nil { 819 fail("Failed to update collaborator permissions.", err) 820 return 821 } 822 823 // clear aturi to when everything is successful 824 aturi = "" 825 826 rp.pages.HxRefresh(w) 827} 828 829func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 830 user := rp.oauth.GetMultiAccountUser(r) 831 l := rp.logger.With("handler", "DeleteRepo") 832 833 noticeId := "operation-error" 834 f, err := rp.repoResolver.Resolve(r) 835 if err != nil { 836 l.Error("failed to get repo and knot", "err", err) 837 return 838 } 839 840 // remove record from pds 841 atpClient, err := rp.oauth.AuthorizedClient(r) 842 if err != nil { 843 l.Error("failed to get authorized client", "err", err) 844 return 845 } 846 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 847 Collection: tangled.RepoNSID, 848 Repo: user.Active.Did, 849 Rkey: f.Rkey, 850 }) 851 if err != nil { 852 l.Error("failed to delete record", "err", err) 853 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 854 return 855 } 856 l.Info("removed repo record", "aturi", f.RepoAt().String()) 857 858 client, err := rp.oauth.ServiceClient( 859 r, 860 oauth.WithService(f.Knot), 861 oauth.WithLxm(tangled.RepoDeleteNSID), 862 oauth.WithDev(rp.config.Core.Dev), 863 ) 864 if err != nil { 865 l.Error("failed to connect to knot server", "err", err) 866 return 867 } 868 869 err = tangled.RepoDelete( 870 r.Context(), 871 client, 872 &tangled.RepoDelete_Input{ 873 Did: f.Did, 874 Name: f.Name, 875 Rkey: f.Rkey, 876 }, 877 ) 878 if err := xrpcclient.HandleXrpcErr(err); err != nil { 879 rp.pages.Notice(w, noticeId, err.Error()) 880 return 881 } 882 l.Info("deleted repo from knot") 883 884 tx, err := rp.db.BeginTx(r.Context(), nil) 885 if err != nil { 886 l.Error("failed to start tx") 887 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 888 return 889 } 890 defer func() { 891 tx.Rollback() 892 err = rp.enforcer.E.LoadPolicy() 893 if err != nil { 894 l.Error("failed to rollback policies") 895 } 896 }() 897 898 // remove collaborator RBAC 899 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot) 900 if err != nil { 901 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 902 return 903 } 904 for _, c := range repoCollaborators { 905 did := c[0] 906 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo()) 907 } 908 l.Info("removed collaborators") 909 910 // remove repo RBAC 911 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo()) 912 if err != nil { 913 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 914 return 915 } 916 // remove repo from db 917 err = db.RemoveRepo(tx, f.Did, f.Name) 918 if err != nil { 919 rp.pages.Notice(w, noticeId, "Failed to update appview") 920 return 921 } 922 l.Info("removed repo from db") 923 924 err = tx.Commit() 925 if err != nil { 926 l.Error("failed to commit changes", "err", err) 927 http.Error(w, err.Error(), http.StatusInternalServerError) 928 return 929 } 930 931 err = rp.enforcer.E.SavePolicy() 932 if err != nil { 933 l.Error("failed to update ACLs", "err", err) 934 http.Error(w, err.Error(), http.StatusInternalServerError) 935 return 936 } 937 938 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 939} 940 941func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 942 l := rp.logger.With("handler", "SyncRepoFork") 943 944 ref := chi.URLParam(r, "ref") 945 ref, _ = url.PathUnescape(ref) 946 947 user := rp.oauth.GetMultiAccountUser(r) 948 f, err := rp.repoResolver.Resolve(r) 949 if err != nil { 950 l.Error("failed to resolve source repo", "err", err) 951 return 952 } 953 954 switch r.Method { 955 case http.MethodPost: 956 client, err := rp.oauth.ServiceClient( 957 r, 958 oauth.WithService(f.Knot), 959 oauth.WithLxm(tangled.RepoForkSyncNSID), 960 oauth.WithDev(rp.config.Core.Dev), 961 ) 962 if err != nil { 963 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 964 return 965 } 966 967 if f.Source == "" { 968 rp.pages.Notice(w, "repo", "This repository is not a fork.") 969 return 970 } 971 972 err = tangled.RepoForkSync( 973 r.Context(), 974 client, 975 &tangled.RepoForkSync_Input{ 976 Did: user.Active.Did, 977 Name: f.Name, 978 Source: f.Source, 979 Branch: ref, 980 }, 981 ) 982 if err := xrpcclient.HandleXrpcErr(err); err != nil { 983 rp.pages.Notice(w, "repo", err.Error()) 984 return 985 } 986 987 rp.pages.HxRefresh(w) 988 return 989 } 990} 991 992func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 993 l := rp.logger.With("handler", "ForkRepo") 994 995 user := rp.oauth.GetMultiAccountUser(r) 996 f, err := rp.repoResolver.Resolve(r) 997 if err != nil { 998 l.Error("failed to resolve source repo", "err", err) 999 return 1000 } 1001 1002 switch r.Method { 1003 case http.MethodGet: 1004 user := rp.oauth.GetMultiAccountUser(r) 1005 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did) 1006 if err != nil { 1007 rp.pages.Notice(w, "repo", "Invalid user account.") 1008 return 1009 } 1010 1011 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1012 LoggedInUser: user, 1013 Knots: knots, 1014 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1015 }) 1016 1017 case http.MethodPost: 1018 l := rp.logger.With("handler", "ForkRepo") 1019 1020 targetKnot := r.FormValue("knot") 1021 if targetKnot == "" { 1022 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.") 1023 return 1024 } 1025 l = l.With("targetKnot", targetKnot) 1026 1027 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create") 1028 if err != nil || !ok { 1029 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1030 return 1031 } 1032 1033 // choose a name for a fork 1034 forkName := r.FormValue("repo_name") 1035 if forkName == "" { 1036 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1037 return 1038 } 1039 1040 // this check is *only* to see if the forked repo name already exists 1041 // in the user's account. 1042 existingRepo, err := db.GetRepo( 1043 rp.db, 1044 orm.FilterEq("did", user.Active.Did), 1045 orm.FilterEq("name", forkName), 1046 ) 1047 if err != nil { 1048 if !errors.Is(err, sql.ErrNoRows) { 1049 l.Error("error fetching existing repo from db", "err", err) 1050 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1051 return 1052 } 1053 } else if existingRepo != nil { 1054 // repo with this name already exists 1055 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1056 return 1057 } 1058 l = l.With("forkName", forkName) 1059 1060 uri := "https" 1061 if rp.config.Core.Dev { 1062 uri = "http" 1063 } 1064 1065 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name) 1066 l = l.With("cloneUrl", forkSourceUrl) 1067 1068 sourceAt := f.RepoAt().String() 1069 1070 // create an atproto record for this fork 1071 rkey := tid.TID() 1072 repo := &models.Repo{ 1073 Did: user.Active.Did, 1074 Name: forkName, 1075 Knot: targetKnot, 1076 Rkey: rkey, 1077 Source: sourceAt, 1078 Description: f.Description, 1079 Created: time.Now(), 1080 Labels: rp.config.Label.DefaultLabelDefs, 1081 Vcs: f.Vcs, 1082 } 1083 record := repo.AsRecord() 1084 1085 atpClient, err := rp.oauth.AuthorizedClient(r) 1086 if err != nil { 1087 l.Error("failed to create xrpcclient", "err", err) 1088 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1089 return 1090 } 1091 1092 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1093 Collection: tangled.RepoNSID, 1094 Repo: user.Active.Did, 1095 Rkey: rkey, 1096 Record: &lexutil.LexiconTypeDecoder{ 1097 Val: &record, 1098 }, 1099 }) 1100 if err != nil { 1101 l.Error("failed to write to PDS", "err", err) 1102 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1103 return 1104 } 1105 1106 aturi := atresp.Uri 1107 l = l.With("aturi", aturi) 1108 l.Info("wrote to PDS") 1109 1110 tx, err := rp.db.BeginTx(r.Context(), nil) 1111 if err != nil { 1112 l.Info("txn failed", "err", err) 1113 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1114 return 1115 } 1116 1117 // The rollback function reverts a few things on failure: 1118 // - the pending txn 1119 // - the ACLs 1120 // - the atproto record created 1121 rollback := func() { 1122 err1 := tx.Rollback() 1123 err2 := rp.enforcer.E.LoadPolicy() 1124 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1125 1126 // ignore txn complete errors, this is okay 1127 if errors.Is(err1, sql.ErrTxDone) { 1128 err1 = nil 1129 } 1130 1131 if errs := errors.Join(err1, err2, err3); errs != nil { 1132 l.Error("failed to rollback changes", "errs", errs) 1133 return 1134 } 1135 } 1136 defer rollback() 1137 1138 // TODO: this could coordinate better with the knot to recieve a clone status 1139 client, err := rp.oauth.ServiceClient( 1140 r, 1141 oauth.WithService(targetKnot), 1142 oauth.WithLxm(tangled.RepoCreateNSID), 1143 oauth.WithDev(rp.config.Core.Dev), 1144 oauth.WithTimeout(time.Second*20), // big repos take time to clone 1145 ) 1146 if err != nil { 1147 l.Error("could not create service client", "err", err) 1148 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1149 return 1150 } 1151 1152 err = tangled.RepoCreate( 1153 r.Context(), 1154 client, 1155 &tangled.RepoCreate_Input{ 1156 Rkey: rkey, 1157 Source: &forkSourceUrl, 1158 Vcs: &f.Vcs, 1159 }, 1160 ) 1161 if err := xrpcclient.HandleXrpcErr(err); err != nil { 1162 rp.pages.Notice(w, "repo", err.Error()) 1163 return 1164 } 1165 1166 err = db.AddRepo(tx, repo) 1167 if err != nil { 1168 l.Error("failed to AddRepo", "err", err) 1169 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1170 return 1171 } 1172 1173 // acls 1174 p, _ := securejoin.SecureJoin(user.Active.Did, forkName) 1175 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p) 1176 if err != nil { 1177 l.Error("failed to add ACLs", "err", err) 1178 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1179 return 1180 } 1181 err = tx.Commit() 1182 if err != nil { 1183 l.Error("failed to commit changes", "err", err) 1184 http.Error(w, err.Error(), http.StatusInternalServerError) 1185 return 1186 } 1187 1188 err = rp.enforcer.E.SavePolicy() 1189 if err != nil { 1190 l.Error("failed to update ACLs", "err", err) 1191 http.Error(w, err.Error(), http.StatusInternalServerError) 1192 return 1193 } 1194 1195 // reset the ATURI because the transaction completed successfully 1196 aturi = "" 1197 1198 rp.notifier.NewRepo(r.Context(), repo) 1199 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName)) 1200 } 1201} 1202 1203// this is used to rollback changes made to the PDS 1204// 1205// it is a no-op if the provided ATURI is empty 1206func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 1207 if aturi == "" { 1208 return nil 1209 } 1210 1211 parsed := syntax.ATURI(aturi) 1212 1213 collection := parsed.Collection().String() 1214 repo := parsed.Authority().String() 1215 rkey := parsed.RecordKey().String() 1216 1217 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1218 Collection: collection, 1219 Repo: repo, 1220 Rkey: rkey, 1221 }) 1222 return err 1223}