A vibe coded tangled fork which supports pijul.
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/appview/cloudflare"
16
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 "tangled.org/core/appview/oauth"
23 "tangled.org/core/appview/pages"
24 "tangled.org/core/appview/reporesolver"
25 xrpcclient "tangled.org/core/appview/xrpcclient"
26 "tangled.org/core/eventconsumer"
27 "tangled.org/core/idresolver"
28 "tangled.org/core/orm"
29 "tangled.org/core/rbac"
30 "tangled.org/core/tid"
31 "tangled.org/core/xrpc/serviceauth"
32
33 comatproto "github.com/bluesky-social/indigo/api/atproto"
34 atpclient "github.com/bluesky-social/indigo/atproto/client"
35 "github.com/bluesky-social/indigo/atproto/syntax"
36 lexutil "github.com/bluesky-social/indigo/lex/util"
37 securejoin "github.com/cyphar/filepath-securejoin"
38 "github.com/go-chi/chi/v5"
39)
40
41type Repo struct {
42 repoResolver *reporesolver.RepoResolver
43 idResolver *idresolver.Resolver
44 config *config.Config
45 oauth *oauth.OAuth
46 pages *pages.Pages
47 spindlestream *eventconsumer.Consumer
48 db *db.DB
49 enforcer *rbac.Enforcer
50 notifier notify.Notifier
51 logger *slog.Logger
52 serviceAuth *serviceauth.ServiceAuth
53 cfClient *cloudflare.Client
54}
55
56func New(
57 oauth *oauth.OAuth,
58 repoResolver *reporesolver.RepoResolver,
59 pages *pages.Pages,
60 spindlestream *eventconsumer.Consumer,
61 idResolver *idresolver.Resolver,
62 db *db.DB,
63 config *config.Config,
64 notifier notify.Notifier,
65 enforcer *rbac.Enforcer,
66 logger *slog.Logger,
67 cfClient *cloudflare.Client,
68) *Repo {
69 return &Repo{
70 oauth: oauth,
71 repoResolver: repoResolver,
72 pages: pages,
73 idResolver: idResolver,
74 config: config,
75 spindlestream: spindlestream,
76 db: db,
77 notifier: notifier,
78 enforcer: enforcer,
79 logger: logger,
80 cfClient: cfClient,
81 }
82}
83
84// modify the spindle configured for this repo
85func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
86 user := rp.oauth.GetMultiAccountUser(r)
87 l := rp.logger.With("handler", "EditSpindle")
88 l = l.With("did", user.Active.Did)
89
90 errorId := "operation-error"
91 fail := func(msg string, err error) {
92 l.Error(msg, "err", err)
93 rp.pages.Notice(w, errorId, msg)
94 }
95
96 f, err := rp.repoResolver.Resolve(r)
97 if err != nil {
98 fail("Failed to resolve repo. Try again later", err)
99 return
100 }
101
102 newSpindle := r.FormValue("spindle")
103 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
104 client, err := rp.oauth.AuthorizedClient(r)
105 if err != nil {
106 fail("Failed to authorize. Try again later.", err)
107 return
108 }
109
110 if !removingSpindle {
111 // ensure that this is a valid spindle for this user
112 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did)
113 if err != nil {
114 fail("Failed to find spindles. Try again later.", err)
115 return
116 }
117
118 if !slices.Contains(validSpindles, newSpindle) {
119 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
120 return
121 }
122 }
123
124 newRepo := *f
125 newRepo.Spindle = newSpindle
126 record := newRepo.AsRecord()
127
128 spindlePtr := &newSpindle
129 if removingSpindle {
130 spindlePtr = nil
131 newRepo.Spindle = ""
132 }
133
134 // optimistic update
135 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
136 if err != nil {
137 fail("Failed to update spindle. Try again later.", err)
138 return
139 }
140
141 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
142 if err != nil {
143 fail("Failed to update spindle, no record found on PDS.", err)
144 return
145 }
146 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
147 Collection: tangled.RepoNSID,
148 Repo: newRepo.Did,
149 Rkey: newRepo.Rkey,
150 SwapRecord: ex.Cid,
151 Record: &lexutil.LexiconTypeDecoder{
152 Val: &record,
153 },
154 })
155
156 if err != nil {
157 fail("Failed to update spindle, unable to save to PDS.", err)
158 return
159 }
160
161 if !removingSpindle {
162 // add this spindle to spindle stream
163 rp.spindlestream.AddSource(
164 context.Background(),
165 eventconsumer.NewSpindleSource(newSpindle),
166 )
167 }
168
169 rp.pages.HxRefresh(w)
170}
171
172func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
173 user := rp.oauth.GetMultiAccountUser(r)
174 l := rp.logger.With("handler", "AddLabel")
175 l = l.With("did", user.Active.Did)
176
177 f, err := rp.repoResolver.Resolve(r)
178 if err != nil {
179 l.Error("failed to get repo and knot", "err", err)
180 return
181 }
182
183 errorId := "add-label-error"
184 fail := func(msg string, err error) {
185 l.Error(msg, "err", err)
186 rp.pages.Notice(w, errorId, msg)
187 }
188
189 // get form values for label definition
190 name := r.FormValue("name")
191 concreteType := r.FormValue("valueType")
192 valueFormat := r.FormValue("valueFormat")
193 enumValues := r.FormValue("enumValues")
194 scope := r.Form["scope"]
195 color := r.FormValue("color")
196 multiple := r.FormValue("multiple") == "true"
197
198 var variants []string
199 for part := range strings.SplitSeq(enumValues, ",") {
200 if part = strings.TrimSpace(part); part != "" {
201 variants = append(variants, part)
202 }
203 }
204
205 if concreteType == "" {
206 concreteType = "null"
207 }
208
209 format := models.ValueTypeFormatAny
210 if valueFormat == "did" {
211 format = models.ValueTypeFormatDid
212 }
213
214 valueType := models.ValueType{
215 Type: models.ConcreteType(concreteType),
216 Format: format,
217 Enum: variants,
218 }
219
220 label := models.LabelDefinition{
221 Did: user.Active.Did,
222 Rkey: tid.TID(),
223 Name: name,
224 ValueType: valueType,
225 Scope: scope,
226 Color: &color,
227 Multiple: multiple,
228 Created: time.Now(),
229 }
230 if err := label.Validate(); err != nil {
231 fail(err.Error(), err)
232 return
233 }
234
235 // announce this relation into the firehose, store into owners' pds
236 client, err := rp.oauth.AuthorizedClient(r)
237 if err != nil {
238 fail(err.Error(), err)
239 return
240 }
241
242 // emit a labelRecord
243 labelRecord := label.AsRecord()
244 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
245 Collection: tangled.LabelDefinitionNSID,
246 Repo: label.Did,
247 Rkey: label.Rkey,
248 Record: &lexutil.LexiconTypeDecoder{
249 Val: &labelRecord,
250 },
251 })
252 // invalid record
253 if err != nil {
254 fail("Failed to write record to PDS.", err)
255 return
256 }
257
258 aturi := resp.Uri
259 l = l.With("at-uri", aturi)
260 l.Info("wrote label record to PDS")
261
262 // update the repo to subscribe to this label
263 newRepo := *f
264 newRepo.Labels = append(newRepo.Labels, aturi)
265 repoRecord := newRepo.AsRecord()
266
267 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
268 if err != nil {
269 fail("Failed to update labels, no record found on PDS.", err)
270 return
271 }
272 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
273 Collection: tangled.RepoNSID,
274 Repo: newRepo.Did,
275 Rkey: newRepo.Rkey,
276 SwapRecord: ex.Cid,
277 Record: &lexutil.LexiconTypeDecoder{
278 Val: &repoRecord,
279 },
280 })
281 if err != nil {
282 fail("Failed to update labels for repo.", err)
283 return
284 }
285
286 tx, err := rp.db.BeginTx(r.Context(), nil)
287 if err != nil {
288 fail("Failed to add label.", err)
289 return
290 }
291
292 rollback := func() {
293 err1 := tx.Rollback()
294 err2 := rollbackRecord(context.Background(), aturi, client)
295
296 // ignore txn complete errors, this is okay
297 if errors.Is(err1, sql.ErrTxDone) {
298 err1 = nil
299 }
300
301 if errs := errors.Join(err1, err2); errs != nil {
302 l.Error("failed to rollback changes", "errs", errs)
303 return
304 }
305 }
306 defer rollback()
307
308 _, err = db.AddLabelDefinition(tx, &label)
309 if err != nil {
310 fail("Failed to add label.", err)
311 return
312 }
313
314 err = db.SubscribeLabel(tx, &models.RepoLabel{
315 RepoAt: f.RepoAt(),
316 LabelAt: label.AtUri(),
317 })
318
319 err = tx.Commit()
320 if err != nil {
321 fail("Failed to add label.", err)
322 return
323 }
324
325 // clear aturi when everything is successful
326 aturi = ""
327
328 rp.pages.HxRefresh(w)
329}
330
331func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
332 user := rp.oauth.GetMultiAccountUser(r)
333 l := rp.logger.With("handler", "DeleteLabel")
334 l = l.With("did", user.Active.Did)
335
336 f, err := rp.repoResolver.Resolve(r)
337 if err != nil {
338 l.Error("failed to get repo and knot", "err", err)
339 return
340 }
341
342 errorId := "label-operation"
343 fail := func(msg string, err error) {
344 l.Error(msg, "err", err)
345 rp.pages.Notice(w, errorId, msg)
346 }
347
348 // get form values
349 labelId := r.FormValue("label-id")
350
351 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
352 if err != nil {
353 fail("Failed to find label definition.", err)
354 return
355 }
356
357 client, err := rp.oauth.AuthorizedClient(r)
358 if err != nil {
359 fail(err.Error(), err)
360 return
361 }
362
363 // delete label record from PDS
364 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
365 Collection: tangled.LabelDefinitionNSID,
366 Repo: label.Did,
367 Rkey: label.Rkey,
368 })
369 if err != nil {
370 fail("Failed to delete label record from PDS.", err)
371 return
372 }
373
374 // update repo record to remove the label reference
375 newRepo := *f
376 var updated []string
377 removedAt := label.AtUri().String()
378 for _, l := range newRepo.Labels {
379 if l != removedAt {
380 updated = append(updated, l)
381 }
382 }
383 newRepo.Labels = updated
384 repoRecord := newRepo.AsRecord()
385
386 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
387 if err != nil {
388 fail("Failed to update labels, no record found on PDS.", err)
389 return
390 }
391 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
392 Collection: tangled.RepoNSID,
393 Repo: newRepo.Did,
394 Rkey: newRepo.Rkey,
395 SwapRecord: ex.Cid,
396 Record: &lexutil.LexiconTypeDecoder{
397 Val: &repoRecord,
398 },
399 })
400 if err != nil {
401 fail("Failed to update repo record.", err)
402 return
403 }
404
405 // transaction for DB changes
406 tx, err := rp.db.BeginTx(r.Context(), nil)
407 if err != nil {
408 fail("Failed to delete label.", err)
409 return
410 }
411 defer tx.Rollback()
412
413 err = db.UnsubscribeLabel(
414 tx,
415 orm.FilterEq("repo_at", f.RepoAt()),
416 orm.FilterEq("label_at", removedAt),
417 )
418 if err != nil {
419 fail("Failed to unsubscribe label.", err)
420 return
421 }
422
423 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
424 if err != nil {
425 fail("Failed to delete label definition.", err)
426 return
427 }
428
429 err = tx.Commit()
430 if err != nil {
431 fail("Failed to delete label.", err)
432 return
433 }
434
435 // everything succeeded
436 rp.pages.HxRefresh(w)
437}
438
439func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
440 user := rp.oauth.GetMultiAccountUser(r)
441 l := rp.logger.With("handler", "SubscribeLabel")
442 l = l.With("did", user.Active.Did)
443
444 f, err := rp.repoResolver.Resolve(r)
445 if err != nil {
446 l.Error("failed to get repo and knot", "err", err)
447 return
448 }
449
450 if err := r.ParseForm(); err != nil {
451 l.Error("invalid form", "err", err)
452 return
453 }
454
455 errorId := "default-label-operation"
456 fail := func(msg string, err error) {
457 l.Error(msg, "err", err)
458 rp.pages.Notice(w, errorId, msg)
459 }
460
461 labelAts := r.Form["label"]
462 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
463 if err != nil {
464 fail("Failed to subscribe to label.", err)
465 return
466 }
467
468 newRepo := *f
469 newRepo.Labels = append(newRepo.Labels, labelAts...)
470
471 // dedup
472 slices.Sort(newRepo.Labels)
473 newRepo.Labels = slices.Compact(newRepo.Labels)
474
475 repoRecord := newRepo.AsRecord()
476
477 client, err := rp.oauth.AuthorizedClient(r)
478 if err != nil {
479 fail(err.Error(), err)
480 return
481 }
482
483 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
484 if err != nil {
485 fail("Failed to update labels, no record found on PDS.", err)
486 return
487 }
488 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
489 Collection: tangled.RepoNSID,
490 Repo: newRepo.Did,
491 Rkey: newRepo.Rkey,
492 SwapRecord: ex.Cid,
493 Record: &lexutil.LexiconTypeDecoder{
494 Val: &repoRecord,
495 },
496 })
497
498 tx, err := rp.db.Begin()
499 if err != nil {
500 fail("Failed to subscribe to label.", err)
501 return
502 }
503 defer tx.Rollback()
504
505 for _, l := range labelAts {
506 err = db.SubscribeLabel(tx, &models.RepoLabel{
507 RepoAt: f.RepoAt(),
508 LabelAt: syntax.ATURI(l),
509 })
510 if err != nil {
511 fail("Failed to subscribe to label.", err)
512 return
513 }
514 }
515
516 if err := tx.Commit(); err != nil {
517 fail("Failed to subscribe to label.", err)
518 return
519 }
520
521 // everything succeeded
522 rp.pages.HxRefresh(w)
523}
524
525func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
526 user := rp.oauth.GetMultiAccountUser(r)
527 l := rp.logger.With("handler", "UnsubscribeLabel")
528 l = l.With("did", user.Active.Did)
529
530 f, err := rp.repoResolver.Resolve(r)
531 if err != nil {
532 l.Error("failed to get repo and knot", "err", err)
533 return
534 }
535
536 if err := r.ParseForm(); err != nil {
537 l.Error("invalid form", "err", err)
538 return
539 }
540
541 errorId := "default-label-operation"
542 fail := func(msg string, err error) {
543 l.Error(msg, "err", err)
544 rp.pages.Notice(w, errorId, msg)
545 }
546
547 labelAts := r.Form["label"]
548 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
549 if err != nil {
550 fail("Failed to unsubscribe to label.", err)
551 return
552 }
553
554 // update repo record to remove the label reference
555 newRepo := *f
556 var updated []string
557 for _, l := range newRepo.Labels {
558 if !slices.Contains(labelAts, l) {
559 updated = append(updated, l)
560 }
561 }
562 newRepo.Labels = updated
563 repoRecord := newRepo.AsRecord()
564
565 client, err := rp.oauth.AuthorizedClient(r)
566 if err != nil {
567 fail(err.Error(), err)
568 return
569 }
570
571 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
572 if err != nil {
573 fail("Failed to update labels, no record found on PDS.", err)
574 return
575 }
576 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
577 Collection: tangled.RepoNSID,
578 Repo: newRepo.Did,
579 Rkey: newRepo.Rkey,
580 SwapRecord: ex.Cid,
581 Record: &lexutil.LexiconTypeDecoder{
582 Val: &repoRecord,
583 },
584 })
585
586 err = db.UnsubscribeLabel(
587 rp.db,
588 orm.FilterEq("repo_at", f.RepoAt()),
589 orm.FilterIn("label_at", labelAts),
590 )
591 if err != nil {
592 fail("Failed to unsubscribe label.", err)
593 return
594 }
595
596 // everything succeeded
597 rp.pages.HxRefresh(w)
598}
599
600func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
601 l := rp.logger.With("handler", "LabelPanel")
602
603 f, err := rp.repoResolver.Resolve(r)
604 if err != nil {
605 l.Error("failed to get repo and knot", "err", err)
606 return
607 }
608
609 subjectStr := r.FormValue("subject")
610 subject, err := syntax.ParseATURI(subjectStr)
611 if err != nil {
612 l.Error("failed to get repo and knot", "err", err)
613 return
614 }
615
616 labelDefs, err := db.GetLabelDefinitions(
617 rp.db,
618 orm.FilterIn("at_uri", f.Labels),
619 orm.FilterContains("scope", subject.Collection().String()),
620 )
621 if err != nil {
622 l.Error("failed to fetch label defs", "err", err)
623 return
624 }
625
626 defs := make(map[string]*models.LabelDefinition)
627 for _, l := range labelDefs {
628 defs[l.AtUri().String()] = &l
629 }
630
631 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
632 if err != nil {
633 l.Error("failed to build label state", "err", err)
634 return
635 }
636 state := states[subject]
637
638 user := rp.oauth.GetMultiAccountUser(r)
639 rp.pages.LabelPanel(w, pages.LabelPanelParams{
640 LoggedInUser: user,
641 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
642 Defs: defs,
643 Subject: subject.String(),
644 State: state,
645 })
646}
647
648func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
649 l := rp.logger.With("handler", "EditLabelPanel")
650
651 f, err := rp.repoResolver.Resolve(r)
652 if err != nil {
653 l.Error("failed to get repo and knot", "err", err)
654 return
655 }
656
657 subjectStr := r.FormValue("subject")
658 subject, err := syntax.ParseATURI(subjectStr)
659 if err != nil {
660 l.Error("failed to get repo and knot", "err", err)
661 return
662 }
663
664 labelDefs, err := db.GetLabelDefinitions(
665 rp.db,
666 orm.FilterIn("at_uri", f.Labels),
667 orm.FilterContains("scope", subject.Collection().String()),
668 )
669 if err != nil {
670 l.Error("failed to fetch labels", "err", err)
671 return
672 }
673
674 defs := make(map[string]*models.LabelDefinition)
675 for _, l := range labelDefs {
676 defs[l.AtUri().String()] = &l
677 }
678
679 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
680 if err != nil {
681 l.Error("failed to build label state", "err", err)
682 return
683 }
684 state := states[subject]
685
686 user := rp.oauth.GetMultiAccountUser(r)
687 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
688 LoggedInUser: user,
689 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
690 Defs: defs,
691 Subject: subject.String(),
692 State: state,
693 })
694}
695
696func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
697 user := rp.oauth.GetMultiAccountUser(r)
698 l := rp.logger.With("handler", "AddCollaborator")
699 l = l.With("did", user.Active.Did)
700
701 f, err := rp.repoResolver.Resolve(r)
702 if err != nil {
703 l.Error("failed to get repo and knot", "err", err)
704 return
705 }
706
707 errorId := "add-collaborator-error"
708 fail := func(msg string, err error) {
709 l.Error(msg, "err", err)
710 rp.pages.Notice(w, errorId, msg)
711 }
712
713 collaborator := r.FormValue("collaborator")
714 if collaborator == "" {
715 fail("Invalid form.", nil)
716 return
717 }
718
719 // remove a single leading `@`, to make @handle work with ResolveIdent
720 collaborator = strings.TrimPrefix(collaborator, "@")
721
722 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
723 if err != nil {
724 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
725 return
726 }
727
728 if collaboratorIdent.DID.String() == user.Active.Did {
729 fail("You seem to be adding yourself as a collaborator.", nil)
730 return
731 }
732 l = l.With("collaborator", collaboratorIdent.Handle)
733 l = l.With("knot", f.Knot)
734
735 // announce this relation into the firehose, store into owners' pds
736 client, err := rp.oauth.AuthorizedClient(r)
737 if err != nil {
738 fail("Failed to write to PDS.", err)
739 return
740 }
741
742 // emit a record
743 currentUser := rp.oauth.GetMultiAccountUser(r)
744 rkey := tid.TID()
745 createdAt := time.Now()
746 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
747 Collection: tangled.RepoCollaboratorNSID,
748 Repo: currentUser.Active.Did,
749 Rkey: rkey,
750 Record: &lexutil.LexiconTypeDecoder{
751 Val: &tangled.RepoCollaborator{
752 Subject: collaboratorIdent.DID.String(),
753 Repo: string(f.RepoAt()),
754 CreatedAt: createdAt.Format(time.RFC3339),
755 }},
756 })
757 // invalid record
758 if err != nil {
759 fail("Failed to write record to PDS.", err)
760 return
761 }
762
763 aturi := resp.Uri
764 l = l.With("at-uri", aturi)
765 l.Info("wrote record to PDS")
766
767 tx, err := rp.db.BeginTx(r.Context(), nil)
768 if err != nil {
769 fail("Failed to add collaborator.", err)
770 return
771 }
772
773 rollback := func() {
774 err1 := tx.Rollback()
775 err2 := rp.enforcer.E.LoadPolicy()
776 err3 := rollbackRecord(context.Background(), aturi, client)
777
778 // ignore txn complete errors, this is okay
779 if errors.Is(err1, sql.ErrTxDone) {
780 err1 = nil
781 }
782
783 if errs := errors.Join(err1, err2, err3); errs != nil {
784 l.Error("failed to rollback changes", "errs", errs)
785 return
786 }
787 }
788 defer rollback()
789
790 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo())
791 if err != nil {
792 fail("Failed to add collaborator permissions.", err)
793 return
794 }
795
796 err = db.AddCollaborator(tx, models.Collaborator{
797 Did: syntax.DID(currentUser.Active.Did),
798 Rkey: rkey,
799 SubjectDid: collaboratorIdent.DID,
800 RepoAt: f.RepoAt(),
801 Created: createdAt,
802 })
803 if err != nil {
804 fail("Failed to add collaborator.", err)
805 return
806 }
807
808 err = tx.Commit()
809 if err != nil {
810 fail("Failed to add collaborator.", err)
811 return
812 }
813
814 err = rp.enforcer.E.SavePolicy()
815 if err != nil {
816 fail("Failed to update collaborator permissions.", err)
817 return
818 }
819
820 // clear aturi to when everything is successful
821 aturi = ""
822
823 rp.pages.HxRefresh(w)
824}
825
826func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
827 user := rp.oauth.GetMultiAccountUser(r)
828 l := rp.logger.With("handler", "DeleteRepo")
829
830 noticeId := "operation-error"
831 f, err := rp.repoResolver.Resolve(r)
832 if err != nil {
833 l.Error("failed to get repo and knot", "err", err)
834 return
835 }
836
837 // remove record from pds
838 atpClient, err := rp.oauth.AuthorizedClient(r)
839 if err != nil {
840 l.Error("failed to get authorized client", "err", err)
841 return
842 }
843 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
844 Collection: tangled.RepoNSID,
845 Repo: user.Active.Did,
846 Rkey: f.Rkey,
847 })
848 if err != nil {
849 l.Error("failed to delete record", "err", err)
850 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
851 return
852 }
853 l.Info("removed repo record", "aturi", f.RepoAt().String())
854
855 client, err := rp.oauth.ServiceClient(
856 r,
857 oauth.WithService(f.Knot),
858 oauth.WithLxm(tangled.RepoDeleteNSID),
859 oauth.WithDev(rp.config.Core.Dev),
860 )
861 if err != nil {
862 l.Error("failed to connect to knot server", "err", err)
863 return
864 }
865
866 err = tangled.RepoDelete(
867 r.Context(),
868 client,
869 &tangled.RepoDelete_Input{
870 Did: f.Did,
871 Name: f.Name,
872 Rkey: f.Rkey,
873 },
874 )
875 if err := xrpcclient.HandleXrpcErr(err); err != nil {
876 rp.pages.Notice(w, noticeId, err.Error())
877 return
878 }
879 l.Info("deleted repo from knot")
880
881 tx, err := rp.db.BeginTx(r.Context(), nil)
882 if err != nil {
883 l.Error("failed to start tx")
884 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
885 return
886 }
887 defer func() {
888 tx.Rollback()
889 err = rp.enforcer.E.LoadPolicy()
890 if err != nil {
891 l.Error("failed to rollback policies")
892 }
893 }()
894
895 // remove collaborator RBAC
896 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot)
897 if err != nil {
898 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
899 return
900 }
901 for _, c := range repoCollaborators {
902 did := c[0]
903 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo())
904 }
905 l.Info("removed collaborators")
906
907 // remove repo RBAC
908 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo())
909 if err != nil {
910 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
911 return
912 }
913
914 // remove repo from db
915 err = db.RemoveRepo(tx, f.Did, f.Name)
916 if err != nil {
917 rp.pages.Notice(w, noticeId, "Failed to update appview")
918 return
919 }
920 l.Info("removed repo from db")
921
922 err = tx.Commit()
923 if err != nil {
924 l.Error("failed to commit changes", "err", err)
925 http.Error(w, err.Error(), http.StatusInternalServerError)
926 return
927 }
928
929 err = rp.enforcer.E.SavePolicy()
930 if err != nil {
931 l.Error("failed to update ACLs", "err", err)
932 http.Error(w, err.Error(), http.StatusInternalServerError)
933 return
934 }
935
936 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
937}
938
939func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
940 l := rp.logger.With("handler", "SyncRepoFork")
941
942 ref := chi.URLParam(r, "ref")
943 ref, _ = url.PathUnescape(ref)
944
945 user := rp.oauth.GetMultiAccountUser(r)
946 f, err := rp.repoResolver.Resolve(r)
947 if err != nil {
948 l.Error("failed to resolve source repo", "err", err)
949 return
950 }
951
952 switch r.Method {
953 case http.MethodPost:
954 client, err := rp.oauth.ServiceClient(
955 r,
956 oauth.WithService(f.Knot),
957 oauth.WithLxm(tangled.RepoForkSyncNSID),
958 oauth.WithDev(rp.config.Core.Dev),
959 )
960 if err != nil {
961 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
962 return
963 }
964
965 if f.Source == "" {
966 rp.pages.Notice(w, "repo", "This repository is not a fork.")
967 return
968 }
969
970 err = tangled.RepoForkSync(
971 r.Context(),
972 client,
973 &tangled.RepoForkSync_Input{
974 Did: user.Active.Did,
975 Name: f.Name,
976 Source: f.Source,
977 Branch: ref,
978 },
979 )
980 if err := xrpcclient.HandleXrpcErr(err); err != nil {
981 rp.pages.Notice(w, "repo", err.Error())
982 return
983 }
984
985 rp.pages.HxRefresh(w)
986 return
987 }
988}
989
990func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
991 l := rp.logger.With("handler", "ForkRepo")
992
993 user := rp.oauth.GetMultiAccountUser(r)
994 f, err := rp.repoResolver.Resolve(r)
995 if err != nil {
996 l.Error("failed to resolve source repo", "err", err)
997 return
998 }
999
1000 switch r.Method {
1001 case http.MethodGet:
1002 user := rp.oauth.GetMultiAccountUser(r)
1003 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did)
1004 if err != nil {
1005 rp.pages.Notice(w, "repo", "Invalid user account.")
1006 return
1007 }
1008
1009 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1010 LoggedInUser: user,
1011 Knots: knots,
1012 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1013 })
1014
1015 case http.MethodPost:
1016 l := rp.logger.With("handler", "ForkRepo")
1017
1018 targetKnot := r.FormValue("knot")
1019 if targetKnot == "" {
1020 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1021 return
1022 }
1023 l = l.With("targetKnot", targetKnot)
1024
1025 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create")
1026 if err != nil || !ok {
1027 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1028 return
1029 }
1030
1031 // choose a name for a fork
1032 forkName := r.FormValue("repo_name")
1033 if forkName == "" {
1034 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1035 return
1036 }
1037
1038 // this check is *only* to see if the forked repo name already exists
1039 // in the user's account.
1040 existingRepo, err := db.GetRepo(
1041 rp.db,
1042 orm.FilterEq("did", user.Active.Did),
1043 orm.FilterEq("name", forkName),
1044 )
1045 if err != nil {
1046 if !errors.Is(err, sql.ErrNoRows) {
1047 l.Error("error fetching existing repo from db", "err", err)
1048 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1049 return
1050 }
1051 } else if existingRepo != nil {
1052 // repo with this name already exists
1053 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1054 return
1055 }
1056 l = l.With("forkName", forkName)
1057
1058 uri := "https"
1059 if rp.config.Core.Dev {
1060 uri = "http"
1061 }
1062
1063 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name)
1064 l = l.With("cloneUrl", forkSourceUrl)
1065
1066 sourceAt := f.RepoAt().String()
1067
1068 // create an atproto record for this fork
1069 rkey := tid.TID()
1070 repo := &models.Repo{
1071 Did: user.Active.Did,
1072 Name: forkName,
1073 Knot: targetKnot,
1074 Rkey: rkey,
1075 Source: sourceAt,
1076 Description: f.Description,
1077 Created: time.Now(),
1078 Labels: rp.config.Label.DefaultLabelDefs,
1079 }
1080 record := repo.AsRecord()
1081
1082 atpClient, err := rp.oauth.AuthorizedClient(r)
1083 if err != nil {
1084 l.Error("failed to create xrpcclient", "err", err)
1085 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1086 return
1087 }
1088
1089 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1090 Collection: tangled.RepoNSID,
1091 Repo: user.Active.Did,
1092 Rkey: rkey,
1093 Record: &lexutil.LexiconTypeDecoder{
1094 Val: &record,
1095 },
1096 })
1097 if err != nil {
1098 l.Error("failed to write to PDS", "err", err)
1099 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1100 return
1101 }
1102
1103 aturi := atresp.Uri
1104 l = l.With("aturi", aturi)
1105 l.Info("wrote to PDS")
1106
1107 tx, err := rp.db.BeginTx(r.Context(), nil)
1108 if err != nil {
1109 l.Info("txn failed", "err", err)
1110 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1111 return
1112 }
1113
1114 // The rollback function reverts a few things on failure:
1115 // - the pending txn
1116 // - the ACLs
1117 // - the atproto record created
1118 rollback := func() {
1119 err1 := tx.Rollback()
1120 err2 := rp.enforcer.E.LoadPolicy()
1121 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1122
1123 // ignore txn complete errors, this is okay
1124 if errors.Is(err1, sql.ErrTxDone) {
1125 err1 = nil
1126 }
1127
1128 if errs := errors.Join(err1, err2, err3); errs != nil {
1129 l.Error("failed to rollback changes", "errs", errs)
1130 return
1131 }
1132 }
1133 defer rollback()
1134
1135 // TODO: this could coordinate better with the knot to recieve a clone status
1136 client, err := rp.oauth.ServiceClient(
1137 r,
1138 oauth.WithService(targetKnot),
1139 oauth.WithLxm(tangled.RepoCreateNSID),
1140 oauth.WithDev(rp.config.Core.Dev),
1141 oauth.WithTimeout(time.Second*20), // big repos take time to clone
1142 )
1143 if err != nil {
1144 l.Error("could not create service client", "err", err)
1145 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1146 return
1147 }
1148
1149 err = tangled.RepoCreate(
1150 r.Context(),
1151 client,
1152 &tangled.RepoCreate_Input{
1153 Rkey: rkey,
1154 Source: &forkSourceUrl,
1155 },
1156 )
1157 if err := xrpcclient.HandleXrpcErr(err); err != nil {
1158 rp.pages.Notice(w, "repo", err.Error())
1159 return
1160 }
1161
1162 err = db.AddRepo(tx, repo)
1163 if err != nil {
1164 l.Error("failed to AddRepo", "err", err)
1165 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1166 return
1167 }
1168
1169 // acls
1170 p, _ := securejoin.SecureJoin(user.Active.Did, forkName)
1171 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p)
1172 if err != nil {
1173 l.Error("failed to add ACLs", "err", err)
1174 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1175 return
1176 }
1177
1178 err = tx.Commit()
1179 if err != nil {
1180 l.Error("failed to commit changes", "err", err)
1181 http.Error(w, err.Error(), http.StatusInternalServerError)
1182 return
1183 }
1184
1185 err = rp.enforcer.E.SavePolicy()
1186 if err != nil {
1187 l.Error("failed to update ACLs", "err", err)
1188 http.Error(w, err.Error(), http.StatusInternalServerError)
1189 return
1190 }
1191
1192 // reset the ATURI because the transaction completed successfully
1193 aturi = ""
1194
1195 rp.notifier.NewRepo(r.Context(), repo)
1196 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName))
1197 }
1198}
1199
1200// this is used to rollback changes made to the PDS
1201//
1202// it is a no-op if the provided ATURI is empty
1203func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
1204 if aturi == "" {
1205 return nil
1206 }
1207
1208 parsed := syntax.ATURI(aturi)
1209
1210 collection := parsed.Collection().String()
1211 repo := parsed.Authority().String()
1212 rkey := parsed.RecordKey().String()
1213
1214 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1215 Collection: collection,
1216 Repo: repo,
1217 Rkey: rkey,
1218 })
1219 return err
1220}