A vibe coded tangled fork which supports pijul.
at f4a09133a636451c495fa96c98b99379b7692497 1214 lines 30 kB view raw
1package repo 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "slices" 12 "strings" 13 "time" 14 15 "tangled.org/core/api/tangled" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/notify" 20 "tangled.org/core/appview/oauth" 21 "tangled.org/core/appview/pages" 22 "tangled.org/core/appview/reporesolver" 23 xrpcclient "tangled.org/core/appview/xrpcclient" 24 "tangled.org/core/eventconsumer" 25 "tangled.org/core/idresolver" 26 "tangled.org/core/orm" 27 "tangled.org/core/rbac" 28 "tangled.org/core/tid" 29 "tangled.org/core/xrpc/serviceauth" 30 31 comatproto "github.com/bluesky-social/indigo/api/atproto" 32 atpclient "github.com/bluesky-social/indigo/atproto/client" 33 "github.com/bluesky-social/indigo/atproto/syntax" 34 lexutil "github.com/bluesky-social/indigo/lex/util" 35 securejoin "github.com/cyphar/filepath-securejoin" 36 "github.com/go-chi/chi/v5" 37) 38 39type Repo struct { 40 repoResolver *reporesolver.RepoResolver 41 idResolver *idresolver.Resolver 42 config *config.Config 43 oauth *oauth.OAuth 44 pages *pages.Pages 45 spindlestream *eventconsumer.Consumer 46 db *db.DB 47 enforcer *rbac.Enforcer 48 notifier notify.Notifier 49 logger *slog.Logger 50 serviceAuth *serviceauth.ServiceAuth 51} 52 53func New( 54 oauth *oauth.OAuth, 55 repoResolver *reporesolver.RepoResolver, 56 pages *pages.Pages, 57 spindlestream *eventconsumer.Consumer, 58 idResolver *idresolver.Resolver, 59 db *db.DB, 60 config *config.Config, 61 notifier notify.Notifier, 62 enforcer *rbac.Enforcer, 63 logger *slog.Logger, 64) *Repo { 65 return &Repo{oauth: oauth, 66 repoResolver: repoResolver, 67 pages: pages, 68 idResolver: idResolver, 69 config: config, 70 spindlestream: spindlestream, 71 db: db, 72 notifier: notifier, 73 enforcer: enforcer, 74 logger: logger, 75 } 76} 77 78// modify the spindle configured for this repo 79func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) { 80 user := rp.oauth.GetMultiAccountUser(r) 81 l := rp.logger.With("handler", "EditSpindle") 82 l = l.With("did", user.Active.Did) 83 84 errorId := "operation-error" 85 fail := func(msg string, err error) { 86 l.Error(msg, "err", err) 87 rp.pages.Notice(w, errorId, msg) 88 } 89 90 f, err := rp.repoResolver.Resolve(r) 91 if err != nil { 92 fail("Failed to resolve repo. Try again later", err) 93 return 94 } 95 96 newSpindle := r.FormValue("spindle") 97 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value 98 client, err := rp.oauth.AuthorizedClient(r) 99 if err != nil { 100 fail("Failed to authorize. Try again later.", err) 101 return 102 } 103 104 if !removingSpindle { 105 // ensure that this is a valid spindle for this user 106 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did) 107 if err != nil { 108 fail("Failed to find spindles. Try again later.", err) 109 return 110 } 111 112 if !slices.Contains(validSpindles, newSpindle) { 113 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles)) 114 return 115 } 116 } 117 118 newRepo := *f 119 newRepo.Spindle = newSpindle 120 record := newRepo.AsRecord() 121 122 spindlePtr := &newSpindle 123 if removingSpindle { 124 spindlePtr = nil 125 newRepo.Spindle = "" 126 } 127 128 // optimistic update 129 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr) 130 if err != nil { 131 fail("Failed to update spindle. Try again later.", err) 132 return 133 } 134 135 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 136 if err != nil { 137 fail("Failed to update spindle, no record found on PDS.", err) 138 return 139 } 140 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 141 Collection: tangled.RepoNSID, 142 Repo: newRepo.Did, 143 Rkey: newRepo.Rkey, 144 SwapRecord: ex.Cid, 145 Record: &lexutil.LexiconTypeDecoder{ 146 Val: &record, 147 }, 148 }) 149 150 if err != nil { 151 fail("Failed to update spindle, unable to save to PDS.", err) 152 return 153 } 154 155 if !removingSpindle { 156 // add this spindle to spindle stream 157 rp.spindlestream.AddSource( 158 context.Background(), 159 eventconsumer.NewSpindleSource(newSpindle), 160 ) 161 } 162 163 rp.pages.HxRefresh(w) 164} 165 166func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) { 167 user := rp.oauth.GetMultiAccountUser(r) 168 l := rp.logger.With("handler", "AddLabel") 169 l = l.With("did", user.Active.Did) 170 171 f, err := rp.repoResolver.Resolve(r) 172 if err != nil { 173 l.Error("failed to get repo and knot", "err", err) 174 return 175 } 176 177 errorId := "add-label-error" 178 fail := func(msg string, err error) { 179 l.Error(msg, "err", err) 180 rp.pages.Notice(w, errorId, msg) 181 } 182 183 // get form values for label definition 184 name := r.FormValue("name") 185 concreteType := r.FormValue("valueType") 186 valueFormat := r.FormValue("valueFormat") 187 enumValues := r.FormValue("enumValues") 188 scope := r.Form["scope"] 189 color := r.FormValue("color") 190 multiple := r.FormValue("multiple") == "true" 191 192 var variants []string 193 for part := range strings.SplitSeq(enumValues, ",") { 194 if part = strings.TrimSpace(part); part != "" { 195 variants = append(variants, part) 196 } 197 } 198 199 if concreteType == "" { 200 concreteType = "null" 201 } 202 203 format := models.ValueTypeFormatAny 204 if valueFormat == "did" { 205 format = models.ValueTypeFormatDid 206 } 207 208 valueType := models.ValueType{ 209 Type: models.ConcreteType(concreteType), 210 Format: format, 211 Enum: variants, 212 } 213 214 label := models.LabelDefinition{ 215 Did: user.Active.Did, 216 Rkey: tid.TID(), 217 Name: name, 218 ValueType: valueType, 219 Scope: scope, 220 Color: &color, 221 Multiple: multiple, 222 Created: time.Now(), 223 } 224 if err := label.Validate(); err != nil { 225 fail(err.Error(), err) 226 return 227 } 228 229 // announce this relation into the firehose, store into owners' pds 230 client, err := rp.oauth.AuthorizedClient(r) 231 if err != nil { 232 fail(err.Error(), err) 233 return 234 } 235 236 // emit a labelRecord 237 labelRecord := label.AsRecord() 238 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 239 Collection: tangled.LabelDefinitionNSID, 240 Repo: label.Did, 241 Rkey: label.Rkey, 242 Record: &lexutil.LexiconTypeDecoder{ 243 Val: &labelRecord, 244 }, 245 }) 246 // invalid record 247 if err != nil { 248 fail("Failed to write record to PDS.", err) 249 return 250 } 251 252 aturi := resp.Uri 253 l = l.With("at-uri", aturi) 254 l.Info("wrote label record to PDS") 255 256 // update the repo to subscribe to this label 257 newRepo := *f 258 newRepo.Labels = append(newRepo.Labels, aturi) 259 repoRecord := newRepo.AsRecord() 260 261 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 262 if err != nil { 263 fail("Failed to update labels, no record found on PDS.", err) 264 return 265 } 266 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 267 Collection: tangled.RepoNSID, 268 Repo: newRepo.Did, 269 Rkey: newRepo.Rkey, 270 SwapRecord: ex.Cid, 271 Record: &lexutil.LexiconTypeDecoder{ 272 Val: &repoRecord, 273 }, 274 }) 275 if err != nil { 276 fail("Failed to update labels for repo.", err) 277 return 278 } 279 280 tx, err := rp.db.BeginTx(r.Context(), nil) 281 if err != nil { 282 fail("Failed to add label.", err) 283 return 284 } 285 286 rollback := func() { 287 err1 := tx.Rollback() 288 err2 := rollbackRecord(context.Background(), aturi, client) 289 290 // ignore txn complete errors, this is okay 291 if errors.Is(err1, sql.ErrTxDone) { 292 err1 = nil 293 } 294 295 if errs := errors.Join(err1, err2); errs != nil { 296 l.Error("failed to rollback changes", "errs", errs) 297 return 298 } 299 } 300 defer rollback() 301 302 _, err = db.AddLabelDefinition(tx, &label) 303 if err != nil { 304 fail("Failed to add label.", err) 305 return 306 } 307 308 err = db.SubscribeLabel(tx, &models.RepoLabel{ 309 RepoAt: f.RepoAt(), 310 LabelAt: label.AtUri(), 311 }) 312 313 err = tx.Commit() 314 if err != nil { 315 fail("Failed to add label.", err) 316 return 317 } 318 319 // clear aturi when everything is successful 320 aturi = "" 321 322 rp.pages.HxRefresh(w) 323} 324 325func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) { 326 user := rp.oauth.GetMultiAccountUser(r) 327 l := rp.logger.With("handler", "DeleteLabel") 328 l = l.With("did", user.Active.Did) 329 330 f, err := rp.repoResolver.Resolve(r) 331 if err != nil { 332 l.Error("failed to get repo and knot", "err", err) 333 return 334 } 335 336 errorId := "label-operation" 337 fail := func(msg string, err error) { 338 l.Error(msg, "err", err) 339 rp.pages.Notice(w, errorId, msg) 340 } 341 342 // get form values 343 labelId := r.FormValue("label-id") 344 345 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId)) 346 if err != nil { 347 fail("Failed to find label definition.", err) 348 return 349 } 350 351 client, err := rp.oauth.AuthorizedClient(r) 352 if err != nil { 353 fail(err.Error(), err) 354 return 355 } 356 357 // delete label record from PDS 358 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{ 359 Collection: tangled.LabelDefinitionNSID, 360 Repo: label.Did, 361 Rkey: label.Rkey, 362 }) 363 if err != nil { 364 fail("Failed to delete label record from PDS.", err) 365 return 366 } 367 368 // update repo record to remove the label reference 369 newRepo := *f 370 var updated []string 371 removedAt := label.AtUri().String() 372 for _, l := range newRepo.Labels { 373 if l != removedAt { 374 updated = append(updated, l) 375 } 376 } 377 newRepo.Labels = updated 378 repoRecord := newRepo.AsRecord() 379 380 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey) 381 if err != nil { 382 fail("Failed to update labels, no record found on PDS.", err) 383 return 384 } 385 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 386 Collection: tangled.RepoNSID, 387 Repo: newRepo.Did, 388 Rkey: newRepo.Rkey, 389 SwapRecord: ex.Cid, 390 Record: &lexutil.LexiconTypeDecoder{ 391 Val: &repoRecord, 392 }, 393 }) 394 if err != nil { 395 fail("Failed to update repo record.", err) 396 return 397 } 398 399 // transaction for DB changes 400 tx, err := rp.db.BeginTx(r.Context(), nil) 401 if err != nil { 402 fail("Failed to delete label.", err) 403 return 404 } 405 defer tx.Rollback() 406 407 err = db.UnsubscribeLabel( 408 tx, 409 orm.FilterEq("repo_at", f.RepoAt()), 410 orm.FilterEq("label_at", removedAt), 411 ) 412 if err != nil { 413 fail("Failed to unsubscribe label.", err) 414 return 415 } 416 417 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id)) 418 if err != nil { 419 fail("Failed to delete label definition.", err) 420 return 421 } 422 423 err = tx.Commit() 424 if err != nil { 425 fail("Failed to delete label.", err) 426 return 427 } 428 429 // everything succeeded 430 rp.pages.HxRefresh(w) 431} 432 433func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) { 434 user := rp.oauth.GetMultiAccountUser(r) 435 l := rp.logger.With("handler", "SubscribeLabel") 436 l = l.With("did", user.Active.Did) 437 438 f, err := rp.repoResolver.Resolve(r) 439 if err != nil { 440 l.Error("failed to get repo and knot", "err", err) 441 return 442 } 443 444 if err := r.ParseForm(); err != nil { 445 l.Error("invalid form", "err", err) 446 return 447 } 448 449 errorId := "default-label-operation" 450 fail := func(msg string, err error) { 451 l.Error(msg, "err", err) 452 rp.pages.Notice(w, errorId, msg) 453 } 454 455 labelAts := r.Form["label"] 456 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 457 if err != nil { 458 fail("Failed to subscribe to label.", err) 459 return 460 } 461 462 newRepo := *f 463 newRepo.Labels = append(newRepo.Labels, labelAts...) 464 465 // dedup 466 slices.Sort(newRepo.Labels) 467 newRepo.Labels = slices.Compact(newRepo.Labels) 468 469 repoRecord := newRepo.AsRecord() 470 471 client, err := rp.oauth.AuthorizedClient(r) 472 if err != nil { 473 fail(err.Error(), err) 474 return 475 } 476 477 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 478 if err != nil { 479 fail("Failed to update labels, no record found on PDS.", err) 480 return 481 } 482 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 483 Collection: tangled.RepoNSID, 484 Repo: newRepo.Did, 485 Rkey: newRepo.Rkey, 486 SwapRecord: ex.Cid, 487 Record: &lexutil.LexiconTypeDecoder{ 488 Val: &repoRecord, 489 }, 490 }) 491 492 tx, err := rp.db.Begin() 493 if err != nil { 494 fail("Failed to subscribe to label.", err) 495 return 496 } 497 defer tx.Rollback() 498 499 for _, l := range labelAts { 500 err = db.SubscribeLabel(tx, &models.RepoLabel{ 501 RepoAt: f.RepoAt(), 502 LabelAt: syntax.ATURI(l), 503 }) 504 if err != nil { 505 fail("Failed to subscribe to label.", err) 506 return 507 } 508 } 509 510 if err := tx.Commit(); err != nil { 511 fail("Failed to subscribe to label.", err) 512 return 513 } 514 515 // everything succeeded 516 rp.pages.HxRefresh(w) 517} 518 519func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) { 520 user := rp.oauth.GetMultiAccountUser(r) 521 l := rp.logger.With("handler", "UnsubscribeLabel") 522 l = l.With("did", user.Active.Did) 523 524 f, err := rp.repoResolver.Resolve(r) 525 if err != nil { 526 l.Error("failed to get repo and knot", "err", err) 527 return 528 } 529 530 if err := r.ParseForm(); err != nil { 531 l.Error("invalid form", "err", err) 532 return 533 } 534 535 errorId := "default-label-operation" 536 fail := func(msg string, err error) { 537 l.Error(msg, "err", err) 538 rp.pages.Notice(w, errorId, msg) 539 } 540 541 labelAts := r.Form["label"] 542 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts)) 543 if err != nil { 544 fail("Failed to unsubscribe to label.", err) 545 return 546 } 547 548 // update repo record to remove the label reference 549 newRepo := *f 550 var updated []string 551 for _, l := range newRepo.Labels { 552 if !slices.Contains(labelAts, l) { 553 updated = append(updated, l) 554 } 555 } 556 newRepo.Labels = updated 557 repoRecord := newRepo.AsRecord() 558 559 client, err := rp.oauth.AuthorizedClient(r) 560 if err != nil { 561 fail(err.Error(), err) 562 return 563 } 564 565 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey) 566 if err != nil { 567 fail("Failed to update labels, no record found on PDS.", err) 568 return 569 } 570 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 571 Collection: tangled.RepoNSID, 572 Repo: newRepo.Did, 573 Rkey: newRepo.Rkey, 574 SwapRecord: ex.Cid, 575 Record: &lexutil.LexiconTypeDecoder{ 576 Val: &repoRecord, 577 }, 578 }) 579 580 err = db.UnsubscribeLabel( 581 rp.db, 582 orm.FilterEq("repo_at", f.RepoAt()), 583 orm.FilterIn("label_at", labelAts), 584 ) 585 if err != nil { 586 fail("Failed to unsubscribe label.", err) 587 return 588 } 589 590 // everything succeeded 591 rp.pages.HxRefresh(w) 592} 593 594func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) { 595 l := rp.logger.With("handler", "LabelPanel") 596 597 f, err := rp.repoResolver.Resolve(r) 598 if err != nil { 599 l.Error("failed to get repo and knot", "err", err) 600 return 601 } 602 603 subjectStr := r.FormValue("subject") 604 subject, err := syntax.ParseATURI(subjectStr) 605 if err != nil { 606 l.Error("failed to get repo and knot", "err", err) 607 return 608 } 609 610 labelDefs, err := db.GetLabelDefinitions( 611 rp.db, 612 orm.FilterIn("at_uri", f.Labels), 613 orm.FilterContains("scope", subject.Collection().String()), 614 ) 615 if err != nil { 616 l.Error("failed to fetch label defs", "err", err) 617 return 618 } 619 620 defs := make(map[string]*models.LabelDefinition) 621 for _, l := range labelDefs { 622 defs[l.AtUri().String()] = &l 623 } 624 625 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 626 if err != nil { 627 l.Error("failed to build label state", "err", err) 628 return 629 } 630 state := states[subject] 631 632 user := rp.oauth.GetMultiAccountUser(r) 633 rp.pages.LabelPanel(w, pages.LabelPanelParams{ 634 LoggedInUser: user, 635 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 636 Defs: defs, 637 Subject: subject.String(), 638 State: state, 639 }) 640} 641 642func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) { 643 l := rp.logger.With("handler", "EditLabelPanel") 644 645 f, err := rp.repoResolver.Resolve(r) 646 if err != nil { 647 l.Error("failed to get repo and knot", "err", err) 648 return 649 } 650 651 subjectStr := r.FormValue("subject") 652 subject, err := syntax.ParseATURI(subjectStr) 653 if err != nil { 654 l.Error("failed to get repo and knot", "err", err) 655 return 656 } 657 658 labelDefs, err := db.GetLabelDefinitions( 659 rp.db, 660 orm.FilterIn("at_uri", f.Labels), 661 orm.FilterContains("scope", subject.Collection().String()), 662 ) 663 if err != nil { 664 l.Error("failed to fetch labels", "err", err) 665 return 666 } 667 668 defs := make(map[string]*models.LabelDefinition) 669 for _, l := range labelDefs { 670 defs[l.AtUri().String()] = &l 671 } 672 673 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject)) 674 if err != nil { 675 l.Error("failed to build label state", "err", err) 676 return 677 } 678 state := states[subject] 679 680 user := rp.oauth.GetMultiAccountUser(r) 681 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{ 682 LoggedInUser: user, 683 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 684 Defs: defs, 685 Subject: subject.String(), 686 State: state, 687 }) 688} 689 690func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) { 691 user := rp.oauth.GetMultiAccountUser(r) 692 l := rp.logger.With("handler", "AddCollaborator") 693 l = l.With("did", user.Active.Did) 694 695 f, err := rp.repoResolver.Resolve(r) 696 if err != nil { 697 l.Error("failed to get repo and knot", "err", err) 698 return 699 } 700 701 errorId := "add-collaborator-error" 702 fail := func(msg string, err error) { 703 l.Error(msg, "err", err) 704 rp.pages.Notice(w, errorId, msg) 705 } 706 707 collaborator := r.FormValue("collaborator") 708 if collaborator == "" { 709 fail("Invalid form.", nil) 710 return 711 } 712 713 // remove a single leading `@`, to make @handle work with ResolveIdent 714 collaborator = strings.TrimPrefix(collaborator, "@") 715 716 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator) 717 if err != nil { 718 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err) 719 return 720 } 721 722 if collaboratorIdent.DID.String() == user.Active.Did { 723 fail("You seem to be adding yourself as a collaborator.", nil) 724 return 725 } 726 l = l.With("collaborator", collaboratorIdent.Handle) 727 l = l.With("knot", f.Knot) 728 729 // announce this relation into the firehose, store into owners' pds 730 client, err := rp.oauth.AuthorizedClient(r) 731 if err != nil { 732 fail("Failed to write to PDS.", err) 733 return 734 } 735 736 // emit a record 737 currentUser := rp.oauth.GetMultiAccountUser(r) 738 rkey := tid.TID() 739 createdAt := time.Now() 740 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 741 Collection: tangled.RepoCollaboratorNSID, 742 Repo: currentUser.Active.Did, 743 Rkey: rkey, 744 Record: &lexutil.LexiconTypeDecoder{ 745 Val: &tangled.RepoCollaborator{ 746 Subject: collaboratorIdent.DID.String(), 747 Repo: string(f.RepoAt()), 748 CreatedAt: createdAt.Format(time.RFC3339), 749 }}, 750 }) 751 // invalid record 752 if err != nil { 753 fail("Failed to write record to PDS.", err) 754 return 755 } 756 757 aturi := resp.Uri 758 l = l.With("at-uri", aturi) 759 l.Info("wrote record to PDS") 760 761 tx, err := rp.db.BeginTx(r.Context(), nil) 762 if err != nil { 763 fail("Failed to add collaborator.", err) 764 return 765 } 766 767 rollback := func() { 768 err1 := tx.Rollback() 769 err2 := rp.enforcer.E.LoadPolicy() 770 err3 := rollbackRecord(context.Background(), aturi, client) 771 772 // ignore txn complete errors, this is okay 773 if errors.Is(err1, sql.ErrTxDone) { 774 err1 = nil 775 } 776 777 if errs := errors.Join(err1, err2, err3); errs != nil { 778 l.Error("failed to rollback changes", "errs", errs) 779 return 780 } 781 } 782 defer rollback() 783 784 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo()) 785 if err != nil { 786 fail("Failed to add collaborator permissions.", err) 787 return 788 } 789 790 err = db.AddCollaborator(tx, models.Collaborator{ 791 Did: syntax.DID(currentUser.Active.Did), 792 Rkey: rkey, 793 SubjectDid: collaboratorIdent.DID, 794 RepoAt: f.RepoAt(), 795 Created: createdAt, 796 }) 797 if err != nil { 798 fail("Failed to add collaborator.", err) 799 return 800 } 801 802 err = tx.Commit() 803 if err != nil { 804 fail("Failed to add collaborator.", err) 805 return 806 } 807 808 err = rp.enforcer.E.SavePolicy() 809 if err != nil { 810 fail("Failed to update collaborator permissions.", err) 811 return 812 } 813 814 // clear aturi to when everything is successful 815 aturi = "" 816 817 rp.pages.HxRefresh(w) 818} 819 820func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) { 821 user := rp.oauth.GetMultiAccountUser(r) 822 l := rp.logger.With("handler", "DeleteRepo") 823 824 noticeId := "operation-error" 825 f, err := rp.repoResolver.Resolve(r) 826 if err != nil { 827 l.Error("failed to get repo and knot", "err", err) 828 return 829 } 830 831 // remove record from pds 832 atpClient, err := rp.oauth.AuthorizedClient(r) 833 if err != nil { 834 l.Error("failed to get authorized client", "err", err) 835 return 836 } 837 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{ 838 Collection: tangled.RepoNSID, 839 Repo: user.Active.Did, 840 Rkey: f.Rkey, 841 }) 842 if err != nil { 843 l.Error("failed to delete record", "err", err) 844 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.") 845 return 846 } 847 l.Info("removed repo record", "aturi", f.RepoAt().String()) 848 849 client, err := rp.oauth.ServiceClient( 850 r, 851 oauth.WithService(f.Knot), 852 oauth.WithLxm(tangled.RepoDeleteNSID), 853 oauth.WithDev(rp.config.Core.Dev), 854 ) 855 if err != nil { 856 l.Error("failed to connect to knot server", "err", err) 857 return 858 } 859 860 err = tangled.RepoDelete( 861 r.Context(), 862 client, 863 &tangled.RepoDelete_Input{ 864 Did: f.Did, 865 Name: f.Name, 866 Rkey: f.Rkey, 867 }, 868 ) 869 if err := xrpcclient.HandleXrpcErr(err); err != nil { 870 rp.pages.Notice(w, noticeId, err.Error()) 871 return 872 } 873 l.Info("deleted repo from knot") 874 875 tx, err := rp.db.BeginTx(r.Context(), nil) 876 if err != nil { 877 l.Error("failed to start tx") 878 w.Write(fmt.Append(nil, "failed to add collaborator: ", err)) 879 return 880 } 881 defer func() { 882 tx.Rollback() 883 err = rp.enforcer.E.LoadPolicy() 884 if err != nil { 885 l.Error("failed to rollback policies") 886 } 887 }() 888 889 // remove collaborator RBAC 890 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot) 891 if err != nil { 892 rp.pages.Notice(w, noticeId, "Failed to remove collaborators") 893 return 894 } 895 for _, c := range repoCollaborators { 896 did := c[0] 897 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo()) 898 } 899 l.Info("removed collaborators") 900 901 // remove repo RBAC 902 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo()) 903 if err != nil { 904 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules") 905 return 906 } 907 908 // remove repo from db 909 err = db.RemoveRepo(tx, f.Did, f.Name) 910 if err != nil { 911 rp.pages.Notice(w, noticeId, "Failed to update appview") 912 return 913 } 914 l.Info("removed repo from db") 915 916 err = tx.Commit() 917 if err != nil { 918 l.Error("failed to commit changes", "err", err) 919 http.Error(w, err.Error(), http.StatusInternalServerError) 920 return 921 } 922 923 err = rp.enforcer.E.SavePolicy() 924 if err != nil { 925 l.Error("failed to update ACLs", "err", err) 926 http.Error(w, err.Error(), http.StatusInternalServerError) 927 return 928 } 929 930 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did)) 931} 932 933func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) { 934 l := rp.logger.With("handler", "SyncRepoFork") 935 936 ref := chi.URLParam(r, "ref") 937 ref, _ = url.PathUnescape(ref) 938 939 user := rp.oauth.GetMultiAccountUser(r) 940 f, err := rp.repoResolver.Resolve(r) 941 if err != nil { 942 l.Error("failed to resolve source repo", "err", err) 943 return 944 } 945 946 switch r.Method { 947 case http.MethodPost: 948 client, err := rp.oauth.ServiceClient( 949 r, 950 oauth.WithService(f.Knot), 951 oauth.WithLxm(tangled.RepoForkSyncNSID), 952 oauth.WithDev(rp.config.Core.Dev), 953 ) 954 if err != nil { 955 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 956 return 957 } 958 959 if f.Source == "" { 960 rp.pages.Notice(w, "repo", "This repository is not a fork.") 961 return 962 } 963 964 err = tangled.RepoForkSync( 965 r.Context(), 966 client, 967 &tangled.RepoForkSync_Input{ 968 Did: user.Active.Did, 969 Name: f.Name, 970 Source: f.Source, 971 Branch: ref, 972 }, 973 ) 974 if err := xrpcclient.HandleXrpcErr(err); err != nil { 975 rp.pages.Notice(w, "repo", err.Error()) 976 return 977 } 978 979 rp.pages.HxRefresh(w) 980 return 981 } 982} 983 984func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) { 985 l := rp.logger.With("handler", "ForkRepo") 986 987 user := rp.oauth.GetMultiAccountUser(r) 988 f, err := rp.repoResolver.Resolve(r) 989 if err != nil { 990 l.Error("failed to resolve source repo", "err", err) 991 return 992 } 993 994 switch r.Method { 995 case http.MethodGet: 996 user := rp.oauth.GetMultiAccountUser(r) 997 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did) 998 if err != nil { 999 rp.pages.Notice(w, "repo", "Invalid user account.") 1000 return 1001 } 1002 1003 rp.pages.ForkRepo(w, pages.ForkRepoParams{ 1004 LoggedInUser: user, 1005 Knots: knots, 1006 RepoInfo: rp.repoResolver.GetRepoInfo(r, user), 1007 }) 1008 1009 case http.MethodPost: 1010 l := rp.logger.With("handler", "ForkRepo") 1011 1012 targetKnot := r.FormValue("knot") 1013 if targetKnot == "" { 1014 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.") 1015 return 1016 } 1017 l = l.With("targetKnot", targetKnot) 1018 1019 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create") 1020 if err != nil || !ok { 1021 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 1022 return 1023 } 1024 1025 // choose a name for a fork 1026 forkName := r.FormValue("repo_name") 1027 if forkName == "" { 1028 rp.pages.Notice(w, "repo", "Repository name cannot be empty.") 1029 return 1030 } 1031 1032 // this check is *only* to see if the forked repo name already exists 1033 // in the user's account. 1034 existingRepo, err := db.GetRepo( 1035 rp.db, 1036 orm.FilterEq("did", user.Active.Did), 1037 orm.FilterEq("name", forkName), 1038 ) 1039 if err != nil { 1040 if !errors.Is(err, sql.ErrNoRows) { 1041 l.Error("error fetching existing repo from db", "err", err) 1042 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.") 1043 return 1044 } 1045 } else if existingRepo != nil { 1046 // repo with this name already exists 1047 rp.pages.Notice(w, "repo", "A repository with this name already exists.") 1048 return 1049 } 1050 l = l.With("forkName", forkName) 1051 1052 uri := "https" 1053 if rp.config.Core.Dev { 1054 uri = "http" 1055 } 1056 1057 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name) 1058 l = l.With("cloneUrl", forkSourceUrl) 1059 1060 sourceAt := f.RepoAt().String() 1061 1062 // create an atproto record for this fork 1063 rkey := tid.TID() 1064 repo := &models.Repo{ 1065 Did: user.Active.Did, 1066 Name: forkName, 1067 Knot: targetKnot, 1068 Rkey: rkey, 1069 Source: sourceAt, 1070 Description: f.Description, 1071 Created: time.Now(), 1072 Labels: rp.config.Label.DefaultLabelDefs, 1073 } 1074 record := repo.AsRecord() 1075 1076 atpClient, err := rp.oauth.AuthorizedClient(r) 1077 if err != nil { 1078 l.Error("failed to create xrpcclient", "err", err) 1079 rp.pages.Notice(w, "repo", "Failed to fork repository.") 1080 return 1081 } 1082 1083 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 1084 Collection: tangled.RepoNSID, 1085 Repo: user.Active.Did, 1086 Rkey: rkey, 1087 Record: &lexutil.LexiconTypeDecoder{ 1088 Val: &record, 1089 }, 1090 }) 1091 if err != nil { 1092 l.Error("failed to write to PDS", "err", err) 1093 rp.pages.Notice(w, "repo", "Failed to announce repository creation.") 1094 return 1095 } 1096 1097 aturi := atresp.Uri 1098 l = l.With("aturi", aturi) 1099 l.Info("wrote to PDS") 1100 1101 tx, err := rp.db.BeginTx(r.Context(), nil) 1102 if err != nil { 1103 l.Info("txn failed", "err", err) 1104 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1105 return 1106 } 1107 1108 // The rollback function reverts a few things on failure: 1109 // - the pending txn 1110 // - the ACLs 1111 // - the atproto record created 1112 rollback := func() { 1113 err1 := tx.Rollback() 1114 err2 := rp.enforcer.E.LoadPolicy() 1115 err3 := rollbackRecord(context.Background(), aturi, atpClient) 1116 1117 // ignore txn complete errors, this is okay 1118 if errors.Is(err1, sql.ErrTxDone) { 1119 err1 = nil 1120 } 1121 1122 if errs := errors.Join(err1, err2, err3); errs != nil { 1123 l.Error("failed to rollback changes", "errs", errs) 1124 return 1125 } 1126 } 1127 defer rollback() 1128 1129 // TODO: this could coordinate better with the knot to recieve a clone status 1130 client, err := rp.oauth.ServiceClient( 1131 r, 1132 oauth.WithService(targetKnot), 1133 oauth.WithLxm(tangled.RepoCreateNSID), 1134 oauth.WithDev(rp.config.Core.Dev), 1135 oauth.WithTimeout(time.Second*20), // big repos take time to clone 1136 ) 1137 if err != nil { 1138 l.Error("could not create service client", "err", err) 1139 rp.pages.Notice(w, "repo", "Failed to connect to knot server.") 1140 return 1141 } 1142 1143 err = tangled.RepoCreate( 1144 r.Context(), 1145 client, 1146 &tangled.RepoCreate_Input{ 1147 Rkey: rkey, 1148 Source: &forkSourceUrl, 1149 }, 1150 ) 1151 if err := xrpcclient.HandleXrpcErr(err); err != nil { 1152 rp.pages.Notice(w, "repo", err.Error()) 1153 return 1154 } 1155 1156 err = db.AddRepo(tx, repo) 1157 if err != nil { 1158 l.Error("failed to AddRepo", "err", err) 1159 rp.pages.Notice(w, "repo", "Failed to save repository information.") 1160 return 1161 } 1162 1163 // acls 1164 p, _ := securejoin.SecureJoin(user.Active.Did, forkName) 1165 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p) 1166 if err != nil { 1167 l.Error("failed to add ACLs", "err", err) 1168 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.") 1169 return 1170 } 1171 1172 err = tx.Commit() 1173 if err != nil { 1174 l.Error("failed to commit changes", "err", err) 1175 http.Error(w, err.Error(), http.StatusInternalServerError) 1176 return 1177 } 1178 1179 err = rp.enforcer.E.SavePolicy() 1180 if err != nil { 1181 l.Error("failed to update ACLs", "err", err) 1182 http.Error(w, err.Error(), http.StatusInternalServerError) 1183 return 1184 } 1185 1186 // reset the ATURI because the transaction completed successfully 1187 aturi = "" 1188 1189 rp.notifier.NewRepo(r.Context(), repo) 1190 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName)) 1191 } 1192} 1193 1194// this is used to rollback changes made to the PDS 1195// 1196// it is a no-op if the provided ATURI is empty 1197func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 1198 if aturi == "" { 1199 return nil 1200 } 1201 1202 parsed := syntax.ATURI(aturi) 1203 1204 collection := parsed.Collection().String() 1205 repo := parsed.Authority().String() 1206 rkey := parsed.RecordKey().String() 1207 1208 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 1209 Collection: collection, 1210 Repo: repo, 1211 Rkey: rkey, 1212 }) 1213 return err 1214}