A vibe coded tangled fork which supports pijul.
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/api/tangled"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/notify"
20 "tangled.org/core/appview/oauth"
21 "tangled.org/core/appview/pages"
22 "tangled.org/core/appview/reporesolver"
23 xrpcclient "tangled.org/core/appview/xrpcclient"
24 "tangled.org/core/eventconsumer"
25 "tangled.org/core/idresolver"
26 "tangled.org/core/orm"
27 "tangled.org/core/rbac"
28 "tangled.org/core/tid"
29 "tangled.org/core/xrpc/serviceauth"
30
31 comatproto "github.com/bluesky-social/indigo/api/atproto"
32 atpclient "github.com/bluesky-social/indigo/atproto/client"
33 "github.com/bluesky-social/indigo/atproto/syntax"
34 lexutil "github.com/bluesky-social/indigo/lex/util"
35 securejoin "github.com/cyphar/filepath-securejoin"
36 "github.com/go-chi/chi/v5"
37)
38
39type Repo struct {
40 repoResolver *reporesolver.RepoResolver
41 idResolver *idresolver.Resolver
42 config *config.Config
43 oauth *oauth.OAuth
44 pages *pages.Pages
45 spindlestream *eventconsumer.Consumer
46 db *db.DB
47 enforcer *rbac.Enforcer
48 notifier notify.Notifier
49 logger *slog.Logger
50 serviceAuth *serviceauth.ServiceAuth
51}
52
53func New(
54 oauth *oauth.OAuth,
55 repoResolver *reporesolver.RepoResolver,
56 pages *pages.Pages,
57 spindlestream *eventconsumer.Consumer,
58 idResolver *idresolver.Resolver,
59 db *db.DB,
60 config *config.Config,
61 notifier notify.Notifier,
62 enforcer *rbac.Enforcer,
63 logger *slog.Logger,
64) *Repo {
65 return &Repo{oauth: oauth,
66 repoResolver: repoResolver,
67 pages: pages,
68 idResolver: idResolver,
69 config: config,
70 spindlestream: spindlestream,
71 db: db,
72 notifier: notifier,
73 enforcer: enforcer,
74 logger: logger,
75 }
76}
77
78// modify the spindle configured for this repo
79func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
80 user := rp.oauth.GetMultiAccountUser(r)
81 l := rp.logger.With("handler", "EditSpindle")
82 l = l.With("did", user.Active.Did)
83
84 errorId := "operation-error"
85 fail := func(msg string, err error) {
86 l.Error(msg, "err", err)
87 rp.pages.Notice(w, errorId, msg)
88 }
89
90 f, err := rp.repoResolver.Resolve(r)
91 if err != nil {
92 fail("Failed to resolve repo. Try again later", err)
93 return
94 }
95
96 newSpindle := r.FormValue("spindle")
97 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
98 client, err := rp.oauth.AuthorizedClient(r)
99 if err != nil {
100 fail("Failed to authorize. Try again later.", err)
101 return
102 }
103
104 if !removingSpindle {
105 // ensure that this is a valid spindle for this user
106 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did)
107 if err != nil {
108 fail("Failed to find spindles. Try again later.", err)
109 return
110 }
111
112 if !slices.Contains(validSpindles, newSpindle) {
113 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
114 return
115 }
116 }
117
118 newRepo := *f
119 newRepo.Spindle = newSpindle
120 record := newRepo.AsRecord()
121
122 spindlePtr := &newSpindle
123 if removingSpindle {
124 spindlePtr = nil
125 newRepo.Spindle = ""
126 }
127
128 // optimistic update
129 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
130 if err != nil {
131 fail("Failed to update spindle. Try again later.", err)
132 return
133 }
134
135 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
136 if err != nil {
137 fail("Failed to update spindle, no record found on PDS.", err)
138 return
139 }
140 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
141 Collection: tangled.RepoNSID,
142 Repo: newRepo.Did,
143 Rkey: newRepo.Rkey,
144 SwapRecord: ex.Cid,
145 Record: &lexutil.LexiconTypeDecoder{
146 Val: &record,
147 },
148 })
149
150 if err != nil {
151 fail("Failed to update spindle, unable to save to PDS.", err)
152 return
153 }
154
155 if !removingSpindle {
156 // add this spindle to spindle stream
157 rp.spindlestream.AddSource(
158 context.Background(),
159 eventconsumer.NewSpindleSource(newSpindle),
160 )
161 }
162
163 rp.pages.HxRefresh(w)
164}
165
166func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
167 user := rp.oauth.GetMultiAccountUser(r)
168 l := rp.logger.With("handler", "AddLabel")
169 l = l.With("did", user.Active.Did)
170
171 f, err := rp.repoResolver.Resolve(r)
172 if err != nil {
173 l.Error("failed to get repo and knot", "err", err)
174 return
175 }
176
177 errorId := "add-label-error"
178 fail := func(msg string, err error) {
179 l.Error(msg, "err", err)
180 rp.pages.Notice(w, errorId, msg)
181 }
182
183 // get form values for label definition
184 name := r.FormValue("name")
185 concreteType := r.FormValue("valueType")
186 valueFormat := r.FormValue("valueFormat")
187 enumValues := r.FormValue("enumValues")
188 scope := r.Form["scope"]
189 color := r.FormValue("color")
190 multiple := r.FormValue("multiple") == "true"
191
192 var variants []string
193 for part := range strings.SplitSeq(enumValues, ",") {
194 if part = strings.TrimSpace(part); part != "" {
195 variants = append(variants, part)
196 }
197 }
198
199 if concreteType == "" {
200 concreteType = "null"
201 }
202
203 format := models.ValueTypeFormatAny
204 if valueFormat == "did" {
205 format = models.ValueTypeFormatDid
206 }
207
208 valueType := models.ValueType{
209 Type: models.ConcreteType(concreteType),
210 Format: format,
211 Enum: variants,
212 }
213
214 label := models.LabelDefinition{
215 Did: user.Active.Did,
216 Rkey: tid.TID(),
217 Name: name,
218 ValueType: valueType,
219 Scope: scope,
220 Color: &color,
221 Multiple: multiple,
222 Created: time.Now(),
223 }
224 if err := label.Validate(); err != nil {
225 fail(err.Error(), err)
226 return
227 }
228
229 // announce this relation into the firehose, store into owners' pds
230 client, err := rp.oauth.AuthorizedClient(r)
231 if err != nil {
232 fail(err.Error(), err)
233 return
234 }
235
236 // emit a labelRecord
237 labelRecord := label.AsRecord()
238 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
239 Collection: tangled.LabelDefinitionNSID,
240 Repo: label.Did,
241 Rkey: label.Rkey,
242 Record: &lexutil.LexiconTypeDecoder{
243 Val: &labelRecord,
244 },
245 })
246 // invalid record
247 if err != nil {
248 fail("Failed to write record to PDS.", err)
249 return
250 }
251
252 aturi := resp.Uri
253 l = l.With("at-uri", aturi)
254 l.Info("wrote label record to PDS")
255
256 // update the repo to subscribe to this label
257 newRepo := *f
258 newRepo.Labels = append(newRepo.Labels, aturi)
259 repoRecord := newRepo.AsRecord()
260
261 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
262 if err != nil {
263 fail("Failed to update labels, no record found on PDS.", err)
264 return
265 }
266 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
267 Collection: tangled.RepoNSID,
268 Repo: newRepo.Did,
269 Rkey: newRepo.Rkey,
270 SwapRecord: ex.Cid,
271 Record: &lexutil.LexiconTypeDecoder{
272 Val: &repoRecord,
273 },
274 })
275 if err != nil {
276 fail("Failed to update labels for repo.", err)
277 return
278 }
279
280 tx, err := rp.db.BeginTx(r.Context(), nil)
281 if err != nil {
282 fail("Failed to add label.", err)
283 return
284 }
285
286 rollback := func() {
287 err1 := tx.Rollback()
288 err2 := rollbackRecord(context.Background(), aturi, client)
289
290 // ignore txn complete errors, this is okay
291 if errors.Is(err1, sql.ErrTxDone) {
292 err1 = nil
293 }
294
295 if errs := errors.Join(err1, err2); errs != nil {
296 l.Error("failed to rollback changes", "errs", errs)
297 return
298 }
299 }
300 defer rollback()
301
302 _, err = db.AddLabelDefinition(tx, &label)
303 if err != nil {
304 fail("Failed to add label.", err)
305 return
306 }
307
308 err = db.SubscribeLabel(tx, &models.RepoLabel{
309 RepoAt: f.RepoAt(),
310 LabelAt: label.AtUri(),
311 })
312
313 err = tx.Commit()
314 if err != nil {
315 fail("Failed to add label.", err)
316 return
317 }
318
319 // clear aturi when everything is successful
320 aturi = ""
321
322 rp.pages.HxRefresh(w)
323}
324
325func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
326 user := rp.oauth.GetMultiAccountUser(r)
327 l := rp.logger.With("handler", "DeleteLabel")
328 l = l.With("did", user.Active.Did)
329
330 f, err := rp.repoResolver.Resolve(r)
331 if err != nil {
332 l.Error("failed to get repo and knot", "err", err)
333 return
334 }
335
336 errorId := "label-operation"
337 fail := func(msg string, err error) {
338 l.Error(msg, "err", err)
339 rp.pages.Notice(w, errorId, msg)
340 }
341
342 // get form values
343 labelId := r.FormValue("label-id")
344
345 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
346 if err != nil {
347 fail("Failed to find label definition.", err)
348 return
349 }
350
351 client, err := rp.oauth.AuthorizedClient(r)
352 if err != nil {
353 fail(err.Error(), err)
354 return
355 }
356
357 // delete label record from PDS
358 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
359 Collection: tangled.LabelDefinitionNSID,
360 Repo: label.Did,
361 Rkey: label.Rkey,
362 })
363 if err != nil {
364 fail("Failed to delete label record from PDS.", err)
365 return
366 }
367
368 // update repo record to remove the label reference
369 newRepo := *f
370 var updated []string
371 removedAt := label.AtUri().String()
372 for _, l := range newRepo.Labels {
373 if l != removedAt {
374 updated = append(updated, l)
375 }
376 }
377 newRepo.Labels = updated
378 repoRecord := newRepo.AsRecord()
379
380 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
381 if err != nil {
382 fail("Failed to update labels, no record found on PDS.", err)
383 return
384 }
385 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
386 Collection: tangled.RepoNSID,
387 Repo: newRepo.Did,
388 Rkey: newRepo.Rkey,
389 SwapRecord: ex.Cid,
390 Record: &lexutil.LexiconTypeDecoder{
391 Val: &repoRecord,
392 },
393 })
394 if err != nil {
395 fail("Failed to update repo record.", err)
396 return
397 }
398
399 // transaction for DB changes
400 tx, err := rp.db.BeginTx(r.Context(), nil)
401 if err != nil {
402 fail("Failed to delete label.", err)
403 return
404 }
405 defer tx.Rollback()
406
407 err = db.UnsubscribeLabel(
408 tx,
409 orm.FilterEq("repo_at", f.RepoAt()),
410 orm.FilterEq("label_at", removedAt),
411 )
412 if err != nil {
413 fail("Failed to unsubscribe label.", err)
414 return
415 }
416
417 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
418 if err != nil {
419 fail("Failed to delete label definition.", err)
420 return
421 }
422
423 err = tx.Commit()
424 if err != nil {
425 fail("Failed to delete label.", err)
426 return
427 }
428
429 // everything succeeded
430 rp.pages.HxRefresh(w)
431}
432
433func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
434 user := rp.oauth.GetMultiAccountUser(r)
435 l := rp.logger.With("handler", "SubscribeLabel")
436 l = l.With("did", user.Active.Did)
437
438 f, err := rp.repoResolver.Resolve(r)
439 if err != nil {
440 l.Error("failed to get repo and knot", "err", err)
441 return
442 }
443
444 if err := r.ParseForm(); err != nil {
445 l.Error("invalid form", "err", err)
446 return
447 }
448
449 errorId := "default-label-operation"
450 fail := func(msg string, err error) {
451 l.Error(msg, "err", err)
452 rp.pages.Notice(w, errorId, msg)
453 }
454
455 labelAts := r.Form["label"]
456 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
457 if err != nil {
458 fail("Failed to subscribe to label.", err)
459 return
460 }
461
462 newRepo := *f
463 newRepo.Labels = append(newRepo.Labels, labelAts...)
464
465 // dedup
466 slices.Sort(newRepo.Labels)
467 newRepo.Labels = slices.Compact(newRepo.Labels)
468
469 repoRecord := newRepo.AsRecord()
470
471 client, err := rp.oauth.AuthorizedClient(r)
472 if err != nil {
473 fail(err.Error(), err)
474 return
475 }
476
477 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
478 if err != nil {
479 fail("Failed to update labels, no record found on PDS.", err)
480 return
481 }
482 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
483 Collection: tangled.RepoNSID,
484 Repo: newRepo.Did,
485 Rkey: newRepo.Rkey,
486 SwapRecord: ex.Cid,
487 Record: &lexutil.LexiconTypeDecoder{
488 Val: &repoRecord,
489 },
490 })
491
492 tx, err := rp.db.Begin()
493 if err != nil {
494 fail("Failed to subscribe to label.", err)
495 return
496 }
497 defer tx.Rollback()
498
499 for _, l := range labelAts {
500 err = db.SubscribeLabel(tx, &models.RepoLabel{
501 RepoAt: f.RepoAt(),
502 LabelAt: syntax.ATURI(l),
503 })
504 if err != nil {
505 fail("Failed to subscribe to label.", err)
506 return
507 }
508 }
509
510 if err := tx.Commit(); err != nil {
511 fail("Failed to subscribe to label.", err)
512 return
513 }
514
515 // everything succeeded
516 rp.pages.HxRefresh(w)
517}
518
519func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
520 user := rp.oauth.GetMultiAccountUser(r)
521 l := rp.logger.With("handler", "UnsubscribeLabel")
522 l = l.With("did", user.Active.Did)
523
524 f, err := rp.repoResolver.Resolve(r)
525 if err != nil {
526 l.Error("failed to get repo and knot", "err", err)
527 return
528 }
529
530 if err := r.ParseForm(); err != nil {
531 l.Error("invalid form", "err", err)
532 return
533 }
534
535 errorId := "default-label-operation"
536 fail := func(msg string, err error) {
537 l.Error(msg, "err", err)
538 rp.pages.Notice(w, errorId, msg)
539 }
540
541 labelAts := r.Form["label"]
542 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
543 if err != nil {
544 fail("Failed to unsubscribe to label.", err)
545 return
546 }
547
548 // update repo record to remove the label reference
549 newRepo := *f
550 var updated []string
551 for _, l := range newRepo.Labels {
552 if !slices.Contains(labelAts, l) {
553 updated = append(updated, l)
554 }
555 }
556 newRepo.Labels = updated
557 repoRecord := newRepo.AsRecord()
558
559 client, err := rp.oauth.AuthorizedClient(r)
560 if err != nil {
561 fail(err.Error(), err)
562 return
563 }
564
565 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
566 if err != nil {
567 fail("Failed to update labels, no record found on PDS.", err)
568 return
569 }
570 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
571 Collection: tangled.RepoNSID,
572 Repo: newRepo.Did,
573 Rkey: newRepo.Rkey,
574 SwapRecord: ex.Cid,
575 Record: &lexutil.LexiconTypeDecoder{
576 Val: &repoRecord,
577 },
578 })
579
580 err = db.UnsubscribeLabel(
581 rp.db,
582 orm.FilterEq("repo_at", f.RepoAt()),
583 orm.FilterIn("label_at", labelAts),
584 )
585 if err != nil {
586 fail("Failed to unsubscribe label.", err)
587 return
588 }
589
590 // everything succeeded
591 rp.pages.HxRefresh(w)
592}
593
594func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
595 l := rp.logger.With("handler", "LabelPanel")
596
597 f, err := rp.repoResolver.Resolve(r)
598 if err != nil {
599 l.Error("failed to get repo and knot", "err", err)
600 return
601 }
602
603 subjectStr := r.FormValue("subject")
604 subject, err := syntax.ParseATURI(subjectStr)
605 if err != nil {
606 l.Error("failed to get repo and knot", "err", err)
607 return
608 }
609
610 labelDefs, err := db.GetLabelDefinitions(
611 rp.db,
612 orm.FilterIn("at_uri", f.Labels),
613 orm.FilterContains("scope", subject.Collection().String()),
614 )
615 if err != nil {
616 l.Error("failed to fetch label defs", "err", err)
617 return
618 }
619
620 defs := make(map[string]*models.LabelDefinition)
621 for _, l := range labelDefs {
622 defs[l.AtUri().String()] = &l
623 }
624
625 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
626 if err != nil {
627 l.Error("failed to build label state", "err", err)
628 return
629 }
630 state := states[subject]
631
632 user := rp.oauth.GetMultiAccountUser(r)
633 rp.pages.LabelPanel(w, pages.LabelPanelParams{
634 LoggedInUser: user,
635 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
636 Defs: defs,
637 Subject: subject.String(),
638 State: state,
639 })
640}
641
642func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
643 l := rp.logger.With("handler", "EditLabelPanel")
644
645 f, err := rp.repoResolver.Resolve(r)
646 if err != nil {
647 l.Error("failed to get repo and knot", "err", err)
648 return
649 }
650
651 subjectStr := r.FormValue("subject")
652 subject, err := syntax.ParseATURI(subjectStr)
653 if err != nil {
654 l.Error("failed to get repo and knot", "err", err)
655 return
656 }
657
658 labelDefs, err := db.GetLabelDefinitions(
659 rp.db,
660 orm.FilterIn("at_uri", f.Labels),
661 orm.FilterContains("scope", subject.Collection().String()),
662 )
663 if err != nil {
664 l.Error("failed to fetch labels", "err", err)
665 return
666 }
667
668 defs := make(map[string]*models.LabelDefinition)
669 for _, l := range labelDefs {
670 defs[l.AtUri().String()] = &l
671 }
672
673 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
674 if err != nil {
675 l.Error("failed to build label state", "err", err)
676 return
677 }
678 state := states[subject]
679
680 user := rp.oauth.GetMultiAccountUser(r)
681 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
682 LoggedInUser: user,
683 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
684 Defs: defs,
685 Subject: subject.String(),
686 State: state,
687 })
688}
689
690func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
691 user := rp.oauth.GetMultiAccountUser(r)
692 l := rp.logger.With("handler", "AddCollaborator")
693 l = l.With("did", user.Active.Did)
694
695 f, err := rp.repoResolver.Resolve(r)
696 if err != nil {
697 l.Error("failed to get repo and knot", "err", err)
698 return
699 }
700
701 errorId := "add-collaborator-error"
702 fail := func(msg string, err error) {
703 l.Error(msg, "err", err)
704 rp.pages.Notice(w, errorId, msg)
705 }
706
707 collaborator := r.FormValue("collaborator")
708 if collaborator == "" {
709 fail("Invalid form.", nil)
710 return
711 }
712
713 // remove a single leading `@`, to make @handle work with ResolveIdent
714 collaborator = strings.TrimPrefix(collaborator, "@")
715
716 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
717 if err != nil {
718 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
719 return
720 }
721
722 if collaboratorIdent.DID.String() == user.Active.Did {
723 fail("You seem to be adding yourself as a collaborator.", nil)
724 return
725 }
726 l = l.With("collaborator", collaboratorIdent.Handle)
727 l = l.With("knot", f.Knot)
728
729 // announce this relation into the firehose, store into owners' pds
730 client, err := rp.oauth.AuthorizedClient(r)
731 if err != nil {
732 fail("Failed to write to PDS.", err)
733 return
734 }
735
736 // emit a record
737 currentUser := rp.oauth.GetMultiAccountUser(r)
738 rkey := tid.TID()
739 createdAt := time.Now()
740 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
741 Collection: tangled.RepoCollaboratorNSID,
742 Repo: currentUser.Active.Did,
743 Rkey: rkey,
744 Record: &lexutil.LexiconTypeDecoder{
745 Val: &tangled.RepoCollaborator{
746 Subject: collaboratorIdent.DID.String(),
747 Repo: string(f.RepoAt()),
748 CreatedAt: createdAt.Format(time.RFC3339),
749 }},
750 })
751 // invalid record
752 if err != nil {
753 fail("Failed to write record to PDS.", err)
754 return
755 }
756
757 aturi := resp.Uri
758 l = l.With("at-uri", aturi)
759 l.Info("wrote record to PDS")
760
761 tx, err := rp.db.BeginTx(r.Context(), nil)
762 if err != nil {
763 fail("Failed to add collaborator.", err)
764 return
765 }
766
767 rollback := func() {
768 err1 := tx.Rollback()
769 err2 := rp.enforcer.E.LoadPolicy()
770 err3 := rollbackRecord(context.Background(), aturi, client)
771
772 // ignore txn complete errors, this is okay
773 if errors.Is(err1, sql.ErrTxDone) {
774 err1 = nil
775 }
776
777 if errs := errors.Join(err1, err2, err3); errs != nil {
778 l.Error("failed to rollback changes", "errs", errs)
779 return
780 }
781 }
782 defer rollback()
783
784 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo())
785 if err != nil {
786 fail("Failed to add collaborator permissions.", err)
787 return
788 }
789
790 err = db.AddCollaborator(tx, models.Collaborator{
791 Did: syntax.DID(currentUser.Active.Did),
792 Rkey: rkey,
793 SubjectDid: collaboratorIdent.DID,
794 RepoAt: f.RepoAt(),
795 Created: createdAt,
796 })
797 if err != nil {
798 fail("Failed to add collaborator.", err)
799 return
800 }
801
802 err = tx.Commit()
803 if err != nil {
804 fail("Failed to add collaborator.", err)
805 return
806 }
807
808 err = rp.enforcer.E.SavePolicy()
809 if err != nil {
810 fail("Failed to update collaborator permissions.", err)
811 return
812 }
813
814 // clear aturi to when everything is successful
815 aturi = ""
816
817 rp.pages.HxRefresh(w)
818}
819
820func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
821 user := rp.oauth.GetMultiAccountUser(r)
822 l := rp.logger.With("handler", "DeleteRepo")
823
824 noticeId := "operation-error"
825 f, err := rp.repoResolver.Resolve(r)
826 if err != nil {
827 l.Error("failed to get repo and knot", "err", err)
828 return
829 }
830
831 // remove record from pds
832 atpClient, err := rp.oauth.AuthorizedClient(r)
833 if err != nil {
834 l.Error("failed to get authorized client", "err", err)
835 return
836 }
837 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
838 Collection: tangled.RepoNSID,
839 Repo: user.Active.Did,
840 Rkey: f.Rkey,
841 })
842 if err != nil {
843 l.Error("failed to delete record", "err", err)
844 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
845 return
846 }
847 l.Info("removed repo record", "aturi", f.RepoAt().String())
848
849 client, err := rp.oauth.ServiceClient(
850 r,
851 oauth.WithService(f.Knot),
852 oauth.WithLxm(tangled.RepoDeleteNSID),
853 oauth.WithDev(rp.config.Core.Dev),
854 )
855 if err != nil {
856 l.Error("failed to connect to knot server", "err", err)
857 return
858 }
859
860 err = tangled.RepoDelete(
861 r.Context(),
862 client,
863 &tangled.RepoDelete_Input{
864 Did: f.Did,
865 Name: f.Name,
866 Rkey: f.Rkey,
867 },
868 )
869 if err := xrpcclient.HandleXrpcErr(err); err != nil {
870 rp.pages.Notice(w, noticeId, err.Error())
871 return
872 }
873 l.Info("deleted repo from knot")
874
875 tx, err := rp.db.BeginTx(r.Context(), nil)
876 if err != nil {
877 l.Error("failed to start tx")
878 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
879 return
880 }
881 defer func() {
882 tx.Rollback()
883 err = rp.enforcer.E.LoadPolicy()
884 if err != nil {
885 l.Error("failed to rollback policies")
886 }
887 }()
888
889 // remove collaborator RBAC
890 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot)
891 if err != nil {
892 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
893 return
894 }
895 for _, c := range repoCollaborators {
896 did := c[0]
897 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo())
898 }
899 l.Info("removed collaborators")
900
901 // remove repo RBAC
902 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo())
903 if err != nil {
904 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
905 return
906 }
907
908 // remove repo from db
909 err = db.RemoveRepo(tx, f.Did, f.Name)
910 if err != nil {
911 rp.pages.Notice(w, noticeId, "Failed to update appview")
912 return
913 }
914 l.Info("removed repo from db")
915
916 err = tx.Commit()
917 if err != nil {
918 l.Error("failed to commit changes", "err", err)
919 http.Error(w, err.Error(), http.StatusInternalServerError)
920 return
921 }
922
923 err = rp.enforcer.E.SavePolicy()
924 if err != nil {
925 l.Error("failed to update ACLs", "err", err)
926 http.Error(w, err.Error(), http.StatusInternalServerError)
927 return
928 }
929
930 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
931}
932
933func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
934 l := rp.logger.With("handler", "SyncRepoFork")
935
936 ref := chi.URLParam(r, "ref")
937 ref, _ = url.PathUnescape(ref)
938
939 user := rp.oauth.GetMultiAccountUser(r)
940 f, err := rp.repoResolver.Resolve(r)
941 if err != nil {
942 l.Error("failed to resolve source repo", "err", err)
943 return
944 }
945
946 switch r.Method {
947 case http.MethodPost:
948 client, err := rp.oauth.ServiceClient(
949 r,
950 oauth.WithService(f.Knot),
951 oauth.WithLxm(tangled.RepoForkSyncNSID),
952 oauth.WithDev(rp.config.Core.Dev),
953 )
954 if err != nil {
955 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
956 return
957 }
958
959 if f.Source == "" {
960 rp.pages.Notice(w, "repo", "This repository is not a fork.")
961 return
962 }
963
964 err = tangled.RepoForkSync(
965 r.Context(),
966 client,
967 &tangled.RepoForkSync_Input{
968 Did: user.Active.Did,
969 Name: f.Name,
970 Source: f.Source,
971 Branch: ref,
972 },
973 )
974 if err := xrpcclient.HandleXrpcErr(err); err != nil {
975 rp.pages.Notice(w, "repo", err.Error())
976 return
977 }
978
979 rp.pages.HxRefresh(w)
980 return
981 }
982}
983
984func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
985 l := rp.logger.With("handler", "ForkRepo")
986
987 user := rp.oauth.GetMultiAccountUser(r)
988 f, err := rp.repoResolver.Resolve(r)
989 if err != nil {
990 l.Error("failed to resolve source repo", "err", err)
991 return
992 }
993
994 switch r.Method {
995 case http.MethodGet:
996 user := rp.oauth.GetMultiAccountUser(r)
997 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did)
998 if err != nil {
999 rp.pages.Notice(w, "repo", "Invalid user account.")
1000 return
1001 }
1002
1003 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1004 LoggedInUser: user,
1005 Knots: knots,
1006 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1007 })
1008
1009 case http.MethodPost:
1010 l := rp.logger.With("handler", "ForkRepo")
1011
1012 targetKnot := r.FormValue("knot")
1013 if targetKnot == "" {
1014 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1015 return
1016 }
1017 l = l.With("targetKnot", targetKnot)
1018
1019 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create")
1020 if err != nil || !ok {
1021 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1022 return
1023 }
1024
1025 // choose a name for a fork
1026 forkName := r.FormValue("repo_name")
1027 if forkName == "" {
1028 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1029 return
1030 }
1031
1032 // this check is *only* to see if the forked repo name already exists
1033 // in the user's account.
1034 existingRepo, err := db.GetRepo(
1035 rp.db,
1036 orm.FilterEq("did", user.Active.Did),
1037 orm.FilterEq("name", forkName),
1038 )
1039 if err != nil {
1040 if !errors.Is(err, sql.ErrNoRows) {
1041 l.Error("error fetching existing repo from db", "err", err)
1042 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1043 return
1044 }
1045 } else if existingRepo != nil {
1046 // repo with this name already exists
1047 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1048 return
1049 }
1050 l = l.With("forkName", forkName)
1051
1052 uri := "https"
1053 if rp.config.Core.Dev {
1054 uri = "http"
1055 }
1056
1057 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name)
1058 l = l.With("cloneUrl", forkSourceUrl)
1059
1060 sourceAt := f.RepoAt().String()
1061
1062 // create an atproto record for this fork
1063 rkey := tid.TID()
1064 repo := &models.Repo{
1065 Did: user.Active.Did,
1066 Name: forkName,
1067 Knot: targetKnot,
1068 Rkey: rkey,
1069 Source: sourceAt,
1070 Description: f.Description,
1071 Created: time.Now(),
1072 Labels: rp.config.Label.DefaultLabelDefs,
1073 }
1074 record := repo.AsRecord()
1075
1076 atpClient, err := rp.oauth.AuthorizedClient(r)
1077 if err != nil {
1078 l.Error("failed to create xrpcclient", "err", err)
1079 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1080 return
1081 }
1082
1083 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1084 Collection: tangled.RepoNSID,
1085 Repo: user.Active.Did,
1086 Rkey: rkey,
1087 Record: &lexutil.LexiconTypeDecoder{
1088 Val: &record,
1089 },
1090 })
1091 if err != nil {
1092 l.Error("failed to write to PDS", "err", err)
1093 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1094 return
1095 }
1096
1097 aturi := atresp.Uri
1098 l = l.With("aturi", aturi)
1099 l.Info("wrote to PDS")
1100
1101 tx, err := rp.db.BeginTx(r.Context(), nil)
1102 if err != nil {
1103 l.Info("txn failed", "err", err)
1104 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1105 return
1106 }
1107
1108 // The rollback function reverts a few things on failure:
1109 // - the pending txn
1110 // - the ACLs
1111 // - the atproto record created
1112 rollback := func() {
1113 err1 := tx.Rollback()
1114 err2 := rp.enforcer.E.LoadPolicy()
1115 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1116
1117 // ignore txn complete errors, this is okay
1118 if errors.Is(err1, sql.ErrTxDone) {
1119 err1 = nil
1120 }
1121
1122 if errs := errors.Join(err1, err2, err3); errs != nil {
1123 l.Error("failed to rollback changes", "errs", errs)
1124 return
1125 }
1126 }
1127 defer rollback()
1128
1129 // TODO: this could coordinate better with the knot to recieve a clone status
1130 client, err := rp.oauth.ServiceClient(
1131 r,
1132 oauth.WithService(targetKnot),
1133 oauth.WithLxm(tangled.RepoCreateNSID),
1134 oauth.WithDev(rp.config.Core.Dev),
1135 oauth.WithTimeout(time.Second*20), // big repos take time to clone
1136 )
1137 if err != nil {
1138 l.Error("could not create service client", "err", err)
1139 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1140 return
1141 }
1142
1143 err = tangled.RepoCreate(
1144 r.Context(),
1145 client,
1146 &tangled.RepoCreate_Input{
1147 Rkey: rkey,
1148 Source: &forkSourceUrl,
1149 },
1150 )
1151 if err := xrpcclient.HandleXrpcErr(err); err != nil {
1152 rp.pages.Notice(w, "repo", err.Error())
1153 return
1154 }
1155
1156 err = db.AddRepo(tx, repo)
1157 if err != nil {
1158 l.Error("failed to AddRepo", "err", err)
1159 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1160 return
1161 }
1162
1163 // acls
1164 p, _ := securejoin.SecureJoin(user.Active.Did, forkName)
1165 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p)
1166 if err != nil {
1167 l.Error("failed to add ACLs", "err", err)
1168 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1169 return
1170 }
1171
1172 err = tx.Commit()
1173 if err != nil {
1174 l.Error("failed to commit changes", "err", err)
1175 http.Error(w, err.Error(), http.StatusInternalServerError)
1176 return
1177 }
1178
1179 err = rp.enforcer.E.SavePolicy()
1180 if err != nil {
1181 l.Error("failed to update ACLs", "err", err)
1182 http.Error(w, err.Error(), http.StatusInternalServerError)
1183 return
1184 }
1185
1186 // reset the ATURI because the transaction completed successfully
1187 aturi = ""
1188
1189 rp.notifier.NewRepo(r.Context(), repo)
1190 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName))
1191 }
1192}
1193
1194// this is used to rollback changes made to the PDS
1195//
1196// it is a no-op if the provided ATURI is empty
1197func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
1198 if aturi == "" {
1199 return nil
1200 }
1201
1202 parsed := syntax.ATURI(aturi)
1203
1204 collection := parsed.Collection().String()
1205 repo := parsed.Authority().String()
1206 rkey := parsed.RecordKey().String()
1207
1208 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1209 Collection: collection,
1210 Repo: repo,
1211 Rkey: rkey,
1212 })
1213 return err
1214}