A vibe coded tangled fork which supports pijul.
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/appview/cloudflare"
16
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 "tangled.org/core/appview/oauth"
23 "tangled.org/core/appview/pages"
24 "tangled.org/core/appview/reporesolver"
25 "tangled.org/core/appview/validator"
26 xrpcclient "tangled.org/core/appview/xrpcclient"
27 "tangled.org/core/eventconsumer"
28 "tangled.org/core/idresolver"
29 "tangled.org/core/orm"
30 "tangled.org/core/rbac"
31 "tangled.org/core/tid"
32 "tangled.org/core/xrpc/serviceauth"
33
34 comatproto "github.com/bluesky-social/indigo/api/atproto"
35 atpclient "github.com/bluesky-social/indigo/atproto/client"
36 "github.com/bluesky-social/indigo/atproto/syntax"
37 lexutil "github.com/bluesky-social/indigo/lex/util"
38 securejoin "github.com/cyphar/filepath-securejoin"
39 "github.com/go-chi/chi/v5"
40)
41
42type Repo struct {
43 repoResolver *reporesolver.RepoResolver
44 idResolver *idresolver.Resolver
45 config *config.Config
46 oauth *oauth.OAuth
47 pages *pages.Pages
48 spindlestream *eventconsumer.Consumer
49 db *db.DB
50 enforcer *rbac.Enforcer
51 notifier notify.Notifier
52 logger *slog.Logger
53 serviceAuth *serviceauth.ServiceAuth
54 validator *validator.Validator
55 cfClient *cloudflare.Client
56}
57
58func New(
59 oauth *oauth.OAuth,
60 repoResolver *reporesolver.RepoResolver,
61 pages *pages.Pages,
62 spindlestream *eventconsumer.Consumer,
63 idResolver *idresolver.Resolver,
64 db *db.DB,
65 config *config.Config,
66 notifier notify.Notifier,
67 enforcer *rbac.Enforcer,
68 logger *slog.Logger,
69 validator *validator.Validator,
70 cfClient *cloudflare.Client,
71) *Repo {
72 return &Repo{
73 oauth: oauth,
74 repoResolver: repoResolver,
75 pages: pages,
76 idResolver: idResolver,
77 config: config,
78 spindlestream: spindlestream,
79 db: db,
80 notifier: notifier,
81 enforcer: enforcer,
82 logger: logger,
83 validator: validator,
84 cfClient: cfClient,
85 }
86}
87
88// modify the spindle configured for this repo
89func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
90 user := rp.oauth.GetMultiAccountUser(r)
91 l := rp.logger.With("handler", "EditSpindle")
92 l = l.With("did", user.Active.Did)
93
94 errorId := "operation-error"
95 fail := func(msg string, err error) {
96 l.Error(msg, "err", err)
97 rp.pages.Notice(w, errorId, msg)
98 }
99
100 f, err := rp.repoResolver.Resolve(r)
101 if err != nil {
102 fail("Failed to resolve repo. Try again later", err)
103 return
104 }
105
106 newSpindle := r.FormValue("spindle")
107 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
108 client, err := rp.oauth.AuthorizedClient(r)
109 if err != nil {
110 fail("Failed to authorize. Try again later.", err)
111 return
112 }
113
114 if !removingSpindle {
115 // ensure that this is a valid spindle for this user
116 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did)
117 if err != nil {
118 fail("Failed to find spindles. Try again later.", err)
119 return
120 }
121
122 if !slices.Contains(validSpindles, newSpindle) {
123 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
124 return
125 }
126 }
127
128 newRepo := *f
129 newRepo.Spindle = newSpindle
130 record := newRepo.AsRecord()
131
132 spindlePtr := &newSpindle
133 if removingSpindle {
134 spindlePtr = nil
135 newRepo.Spindle = ""
136 }
137
138 // optimistic update
139 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
140 if err != nil {
141 fail("Failed to update spindle. Try again later.", err)
142 return
143 }
144
145 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
146 if err != nil {
147 fail("Failed to update spindle, no record found on PDS.", err)
148 return
149 }
150 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
151 Collection: tangled.RepoNSID,
152 Repo: newRepo.Did,
153 Rkey: newRepo.Rkey,
154 SwapRecord: ex.Cid,
155 Record: &lexutil.LexiconTypeDecoder{
156 Val: &record,
157 },
158 })
159
160 if err != nil {
161 fail("Failed to update spindle, unable to save to PDS.", err)
162 return
163 }
164
165 if !removingSpindle {
166 // add this spindle to spindle stream
167 rp.spindlestream.AddSource(
168 context.Background(),
169 eventconsumer.NewSpindleSource(newSpindle),
170 )
171 }
172
173 rp.pages.HxRefresh(w)
174}
175
176func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
177 user := rp.oauth.GetMultiAccountUser(r)
178 l := rp.logger.With("handler", "AddLabel")
179 l = l.With("did", user.Active.Did)
180
181 f, err := rp.repoResolver.Resolve(r)
182 if err != nil {
183 l.Error("failed to get repo and knot", "err", err)
184 return
185 }
186
187 errorId := "add-label-error"
188 fail := func(msg string, err error) {
189 l.Error(msg, "err", err)
190 rp.pages.Notice(w, errorId, msg)
191 }
192
193 // get form values for label definition
194 name := r.FormValue("name")
195 concreteType := r.FormValue("valueType")
196 valueFormat := r.FormValue("valueFormat")
197 enumValues := r.FormValue("enumValues")
198 scope := r.Form["scope"]
199 color := r.FormValue("color")
200 multiple := r.FormValue("multiple") == "true"
201
202 var variants []string
203 for part := range strings.SplitSeq(enumValues, ",") {
204 if part = strings.TrimSpace(part); part != "" {
205 variants = append(variants, part)
206 }
207 }
208
209 if concreteType == "" {
210 concreteType = "null"
211 }
212
213 format := models.ValueTypeFormatAny
214 if valueFormat == "did" {
215 format = models.ValueTypeFormatDid
216 }
217
218 valueType := models.ValueType{
219 Type: models.ConcreteType(concreteType),
220 Format: format,
221 Enum: variants,
222 }
223
224 label := models.LabelDefinition{
225 Did: user.Active.Did,
226 Rkey: tid.TID(),
227 Name: name,
228 ValueType: valueType,
229 Scope: scope,
230 Color: &color,
231 Multiple: multiple,
232 Created: time.Now(),
233 }
234 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
235 fail(err.Error(), err)
236 return
237 }
238
239 // announce this relation into the firehose, store into owners' pds
240 client, err := rp.oauth.AuthorizedClient(r)
241 if err != nil {
242 fail(err.Error(), err)
243 return
244 }
245
246 // emit a labelRecord
247 labelRecord := label.AsRecord()
248 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
249 Collection: tangled.LabelDefinitionNSID,
250 Repo: label.Did,
251 Rkey: label.Rkey,
252 Record: &lexutil.LexiconTypeDecoder{
253 Val: &labelRecord,
254 },
255 })
256 // invalid record
257 if err != nil {
258 fail("Failed to write record to PDS.", err)
259 return
260 }
261
262 aturi := resp.Uri
263 l = l.With("at-uri", aturi)
264 l.Info("wrote label record to PDS")
265
266 // update the repo to subscribe to this label
267 newRepo := *f
268 newRepo.Labels = append(newRepo.Labels, aturi)
269 repoRecord := newRepo.AsRecord()
270
271 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
272 if err != nil {
273 fail("Failed to update labels, no record found on PDS.", err)
274 return
275 }
276 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
277 Collection: tangled.RepoNSID,
278 Repo: newRepo.Did,
279 Rkey: newRepo.Rkey,
280 SwapRecord: ex.Cid,
281 Record: &lexutil.LexiconTypeDecoder{
282 Val: &repoRecord,
283 },
284 })
285 if err != nil {
286 fail("Failed to update labels for repo.", err)
287 return
288 }
289
290 tx, err := rp.db.BeginTx(r.Context(), nil)
291 if err != nil {
292 fail("Failed to add label.", err)
293 return
294 }
295
296 rollback := func() {
297 err1 := tx.Rollback()
298 err2 := rollbackRecord(context.Background(), aturi, client)
299
300 // ignore txn complete errors, this is okay
301 if errors.Is(err1, sql.ErrTxDone) {
302 err1 = nil
303 }
304
305 if errs := errors.Join(err1, err2); errs != nil {
306 l.Error("failed to rollback changes", "errs", errs)
307 return
308 }
309 }
310 defer rollback()
311
312 _, err = db.AddLabelDefinition(tx, &label)
313 if err != nil {
314 fail("Failed to add label.", err)
315 return
316 }
317
318 err = db.SubscribeLabel(tx, &models.RepoLabel{
319 RepoAt: f.RepoAt(),
320 LabelAt: label.AtUri(),
321 })
322
323 err = tx.Commit()
324 if err != nil {
325 fail("Failed to add label.", err)
326 return
327 }
328
329 // clear aturi when everything is successful
330 aturi = ""
331
332 rp.pages.HxRefresh(w)
333}
334
335func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
336 user := rp.oauth.GetMultiAccountUser(r)
337 l := rp.logger.With("handler", "DeleteLabel")
338 l = l.With("did", user.Active.Did)
339
340 f, err := rp.repoResolver.Resolve(r)
341 if err != nil {
342 l.Error("failed to get repo and knot", "err", err)
343 return
344 }
345
346 errorId := "label-operation"
347 fail := func(msg string, err error) {
348 l.Error(msg, "err", err)
349 rp.pages.Notice(w, errorId, msg)
350 }
351
352 // get form values
353 labelId := r.FormValue("label-id")
354
355 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
356 if err != nil {
357 fail("Failed to find label definition.", err)
358 return
359 }
360
361 client, err := rp.oauth.AuthorizedClient(r)
362 if err != nil {
363 fail(err.Error(), err)
364 return
365 }
366
367 // delete label record from PDS
368 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
369 Collection: tangled.LabelDefinitionNSID,
370 Repo: label.Did,
371 Rkey: label.Rkey,
372 })
373 if err != nil {
374 fail("Failed to delete label record from PDS.", err)
375 return
376 }
377
378 // update repo record to remove the label reference
379 newRepo := *f
380 var updated []string
381 removedAt := label.AtUri().String()
382 for _, l := range newRepo.Labels {
383 if l != removedAt {
384 updated = append(updated, l)
385 }
386 }
387 newRepo.Labels = updated
388 repoRecord := newRepo.AsRecord()
389
390 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
391 if err != nil {
392 fail("Failed to update labels, no record found on PDS.", err)
393 return
394 }
395 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
396 Collection: tangled.RepoNSID,
397 Repo: newRepo.Did,
398 Rkey: newRepo.Rkey,
399 SwapRecord: ex.Cid,
400 Record: &lexutil.LexiconTypeDecoder{
401 Val: &repoRecord,
402 },
403 })
404 if err != nil {
405 fail("Failed to update repo record.", err)
406 return
407 }
408
409 // transaction for DB changes
410 tx, err := rp.db.BeginTx(r.Context(), nil)
411 if err != nil {
412 fail("Failed to delete label.", err)
413 return
414 }
415 defer tx.Rollback()
416
417 err = db.UnsubscribeLabel(
418 tx,
419 orm.FilterEq("repo_at", f.RepoAt()),
420 orm.FilterEq("label_at", removedAt),
421 )
422 if err != nil {
423 fail("Failed to unsubscribe label.", err)
424 return
425 }
426
427 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
428 if err != nil {
429 fail("Failed to delete label definition.", err)
430 return
431 }
432
433 err = tx.Commit()
434 if err != nil {
435 fail("Failed to delete label.", err)
436 return
437 }
438
439 // everything succeeded
440 rp.pages.HxRefresh(w)
441}
442
443func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
444 user := rp.oauth.GetMultiAccountUser(r)
445 l := rp.logger.With("handler", "SubscribeLabel")
446 l = l.With("did", user.Active.Did)
447
448 f, err := rp.repoResolver.Resolve(r)
449 if err != nil {
450 l.Error("failed to get repo and knot", "err", err)
451 return
452 }
453
454 if err := r.ParseForm(); err != nil {
455 l.Error("invalid form", "err", err)
456 return
457 }
458
459 errorId := "default-label-operation"
460 fail := func(msg string, err error) {
461 l.Error(msg, "err", err)
462 rp.pages.Notice(w, errorId, msg)
463 }
464
465 labelAts := r.Form["label"]
466 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
467 if err != nil {
468 fail("Failed to subscribe to label.", err)
469 return
470 }
471
472 newRepo := *f
473 newRepo.Labels = append(newRepo.Labels, labelAts...)
474
475 // dedup
476 slices.Sort(newRepo.Labels)
477 newRepo.Labels = slices.Compact(newRepo.Labels)
478
479 repoRecord := newRepo.AsRecord()
480
481 client, err := rp.oauth.AuthorizedClient(r)
482 if err != nil {
483 fail(err.Error(), err)
484 return
485 }
486
487 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
488 if err != nil {
489 fail("Failed to update labels, no record found on PDS.", err)
490 return
491 }
492 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
493 Collection: tangled.RepoNSID,
494 Repo: newRepo.Did,
495 Rkey: newRepo.Rkey,
496 SwapRecord: ex.Cid,
497 Record: &lexutil.LexiconTypeDecoder{
498 Val: &repoRecord,
499 },
500 })
501
502 tx, err := rp.db.Begin()
503 if err != nil {
504 fail("Failed to subscribe to label.", err)
505 return
506 }
507 defer tx.Rollback()
508
509 for _, l := range labelAts {
510 err = db.SubscribeLabel(tx, &models.RepoLabel{
511 RepoAt: f.RepoAt(),
512 LabelAt: syntax.ATURI(l),
513 })
514 if err != nil {
515 fail("Failed to subscribe to label.", err)
516 return
517 }
518 }
519
520 if err := tx.Commit(); err != nil {
521 fail("Failed to subscribe to label.", err)
522 return
523 }
524
525 // everything succeeded
526 rp.pages.HxRefresh(w)
527}
528
529func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
530 user := rp.oauth.GetMultiAccountUser(r)
531 l := rp.logger.With("handler", "UnsubscribeLabel")
532 l = l.With("did", user.Active.Did)
533
534 f, err := rp.repoResolver.Resolve(r)
535 if err != nil {
536 l.Error("failed to get repo and knot", "err", err)
537 return
538 }
539
540 if err := r.ParseForm(); err != nil {
541 l.Error("invalid form", "err", err)
542 return
543 }
544
545 errorId := "default-label-operation"
546 fail := func(msg string, err error) {
547 l.Error(msg, "err", err)
548 rp.pages.Notice(w, errorId, msg)
549 }
550
551 labelAts := r.Form["label"]
552 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
553 if err != nil {
554 fail("Failed to unsubscribe to label.", err)
555 return
556 }
557
558 // update repo record to remove the label reference
559 newRepo := *f
560 var updated []string
561 for _, l := range newRepo.Labels {
562 if !slices.Contains(labelAts, l) {
563 updated = append(updated, l)
564 }
565 }
566 newRepo.Labels = updated
567 repoRecord := newRepo.AsRecord()
568
569 client, err := rp.oauth.AuthorizedClient(r)
570 if err != nil {
571 fail(err.Error(), err)
572 return
573 }
574
575 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
576 if err != nil {
577 fail("Failed to update labels, no record found on PDS.", err)
578 return
579 }
580 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
581 Collection: tangled.RepoNSID,
582 Repo: newRepo.Did,
583 Rkey: newRepo.Rkey,
584 SwapRecord: ex.Cid,
585 Record: &lexutil.LexiconTypeDecoder{
586 Val: &repoRecord,
587 },
588 })
589
590 err = db.UnsubscribeLabel(
591 rp.db,
592 orm.FilterEq("repo_at", f.RepoAt()),
593 orm.FilterIn("label_at", labelAts),
594 )
595 if err != nil {
596 fail("Failed to unsubscribe label.", err)
597 return
598 }
599
600 // everything succeeded
601 rp.pages.HxRefresh(w)
602}
603
604func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
605 l := rp.logger.With("handler", "LabelPanel")
606
607 f, err := rp.repoResolver.Resolve(r)
608 if err != nil {
609 l.Error("failed to get repo and knot", "err", err)
610 return
611 }
612
613 subjectStr := r.FormValue("subject")
614 subject, err := syntax.ParseATURI(subjectStr)
615 if err != nil {
616 l.Error("failed to get repo and knot", "err", err)
617 return
618 }
619
620 labelDefs, err := db.GetLabelDefinitions(
621 rp.db,
622 orm.FilterIn("at_uri", f.Labels),
623 orm.FilterContains("scope", subject.Collection().String()),
624 )
625 if err != nil {
626 l.Error("failed to fetch label defs", "err", err)
627 return
628 }
629
630 defs := make(map[string]*models.LabelDefinition)
631 for _, l := range labelDefs {
632 defs[l.AtUri().String()] = &l
633 }
634
635 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
636 if err != nil {
637 l.Error("failed to build label state", "err", err)
638 return
639 }
640 state := states[subject]
641
642 user := rp.oauth.GetMultiAccountUser(r)
643 rp.pages.LabelPanel(w, pages.LabelPanelParams{
644 LoggedInUser: user,
645 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
646 Defs: defs,
647 Subject: subject.String(),
648 State: state,
649 })
650}
651
652func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
653 l := rp.logger.With("handler", "EditLabelPanel")
654
655 f, err := rp.repoResolver.Resolve(r)
656 if err != nil {
657 l.Error("failed to get repo and knot", "err", err)
658 return
659 }
660
661 subjectStr := r.FormValue("subject")
662 subject, err := syntax.ParseATURI(subjectStr)
663 if err != nil {
664 l.Error("failed to get repo and knot", "err", err)
665 return
666 }
667
668 labelDefs, err := db.GetLabelDefinitions(
669 rp.db,
670 orm.FilterIn("at_uri", f.Labels),
671 orm.FilterContains("scope", subject.Collection().String()),
672 )
673 if err != nil {
674 l.Error("failed to fetch labels", "err", err)
675 return
676 }
677
678 defs := make(map[string]*models.LabelDefinition)
679 for _, l := range labelDefs {
680 defs[l.AtUri().String()] = &l
681 }
682
683 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
684 if err != nil {
685 l.Error("failed to build label state", "err", err)
686 return
687 }
688 state := states[subject]
689
690 user := rp.oauth.GetMultiAccountUser(r)
691 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
692 LoggedInUser: user,
693 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
694 Defs: defs,
695 Subject: subject.String(),
696 State: state,
697 })
698}
699
700func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
701 user := rp.oauth.GetMultiAccountUser(r)
702 l := rp.logger.With("handler", "AddCollaborator")
703 l = l.With("did", user.Active.Did)
704
705 f, err := rp.repoResolver.Resolve(r)
706 if err != nil {
707 l.Error("failed to get repo and knot", "err", err)
708 return
709 }
710
711 errorId := "add-collaborator-error"
712 fail := func(msg string, err error) {
713 l.Error(msg, "err", err)
714 rp.pages.Notice(w, errorId, msg)
715 }
716
717 collaborator := r.FormValue("collaborator")
718 if collaborator == "" {
719 fail("Invalid form.", nil)
720 return
721 }
722
723 // remove a single leading `@`, to make @handle work with ResolveIdent
724 collaborator = strings.TrimPrefix(collaborator, "@")
725
726 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
727 if err != nil {
728 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
729 return
730 }
731
732 if collaboratorIdent.DID.String() == user.Active.Did {
733 fail("You seem to be adding yourself as a collaborator.", nil)
734 return
735 }
736 l = l.With("collaborator", collaboratorIdent.Handle)
737 l = l.With("knot", f.Knot)
738
739 // announce this relation into the firehose, store into owners' pds
740 client, err := rp.oauth.AuthorizedClient(r)
741 if err != nil {
742 fail("Failed to write to PDS.", err)
743 return
744 }
745
746 // emit a record
747 currentUser := rp.oauth.GetMultiAccountUser(r)
748 rkey := tid.TID()
749 createdAt := time.Now()
750 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
751 Collection: tangled.RepoCollaboratorNSID,
752 Repo: currentUser.Active.Did,
753 Rkey: rkey,
754 Record: &lexutil.LexiconTypeDecoder{
755 Val: &tangled.RepoCollaborator{
756 Subject: collaboratorIdent.DID.String(),
757 Repo: string(f.RepoAt()),
758 CreatedAt: createdAt.Format(time.RFC3339),
759 }},
760 })
761 // invalid record
762 if err != nil {
763 fail("Failed to write record to PDS.", err)
764 return
765 }
766
767 aturi := resp.Uri
768 l = l.With("at-uri", aturi)
769 l.Info("wrote record to PDS")
770
771 tx, err := rp.db.BeginTx(r.Context(), nil)
772 if err != nil {
773 fail("Failed to add collaborator.", err)
774 return
775 }
776
777 rollback := func() {
778 err1 := tx.Rollback()
779 err2 := rp.enforcer.E.LoadPolicy()
780 err3 := rollbackRecord(context.Background(), aturi, client)
781
782 // ignore txn complete errors, this is okay
783 if errors.Is(err1, sql.ErrTxDone) {
784 err1 = nil
785 }
786
787 if errs := errors.Join(err1, err2, err3); errs != nil {
788 l.Error("failed to rollback changes", "errs", errs)
789 return
790 }
791 }
792 defer rollback()
793
794 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo())
795 if err != nil {
796 fail("Failed to add collaborator permissions.", err)
797 return
798 }
799
800 err = db.AddCollaborator(tx, models.Collaborator{
801 Did: syntax.DID(currentUser.Active.Did),
802 Rkey: rkey,
803 SubjectDid: collaboratorIdent.DID,
804 RepoAt: f.RepoAt(),
805 Created: createdAt,
806 })
807 if err != nil {
808 fail("Failed to add collaborator.", err)
809 return
810 }
811
812 err = tx.Commit()
813 if err != nil {
814 fail("Failed to add collaborator.", err)
815 return
816 }
817
818 err = rp.enforcer.E.SavePolicy()
819 if err != nil {
820 fail("Failed to update collaborator permissions.", err)
821 return
822 }
823
824 // clear aturi to when everything is successful
825 aturi = ""
826
827 rp.pages.HxRefresh(w)
828}
829
830func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
831 user := rp.oauth.GetMultiAccountUser(r)
832 l := rp.logger.With("handler", "DeleteRepo")
833
834 noticeId := "operation-error"
835 f, err := rp.repoResolver.Resolve(r)
836 if err != nil {
837 l.Error("failed to get repo and knot", "err", err)
838 return
839 }
840
841 // remove record from pds
842 atpClient, err := rp.oauth.AuthorizedClient(r)
843 if err != nil {
844 l.Error("failed to get authorized client", "err", err)
845 return
846 }
847 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
848 Collection: tangled.RepoNSID,
849 Repo: user.Active.Did,
850 Rkey: f.Rkey,
851 })
852 if err != nil {
853 l.Error("failed to delete record", "err", err)
854 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
855 return
856 }
857 l.Info("removed repo record", "aturi", f.RepoAt().String())
858
859 client, err := rp.oauth.ServiceClient(
860 r,
861 oauth.WithService(f.Knot),
862 oauth.WithLxm(tangled.RepoDeleteNSID),
863 oauth.WithDev(rp.config.Core.Dev),
864 )
865 if err != nil {
866 l.Error("failed to connect to knot server", "err", err)
867 return
868 }
869
870 err = tangled.RepoDelete(
871 r.Context(),
872 client,
873 &tangled.RepoDelete_Input{
874 Did: f.Did,
875 Name: f.Name,
876 Rkey: f.Rkey,
877 },
878 )
879 if err := xrpcclient.HandleXrpcErr(err); err != nil {
880 rp.pages.Notice(w, noticeId, err.Error())
881 return
882 }
883 l.Info("deleted repo from knot")
884
885 tx, err := rp.db.BeginTx(r.Context(), nil)
886 if err != nil {
887 l.Error("failed to start tx")
888 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
889 return
890 }
891 defer func() {
892 tx.Rollback()
893 err = rp.enforcer.E.LoadPolicy()
894 if err != nil {
895 l.Error("failed to rollback policies")
896 }
897 }()
898
899 // remove collaborator RBAC
900 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot)
901 if err != nil {
902 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
903 return
904 }
905 for _, c := range repoCollaborators {
906 did := c[0]
907 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo())
908 }
909 l.Info("removed collaborators")
910
911 // remove repo RBAC
912 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo())
913 if err != nil {
914 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
915 return
916 }
917
918 // remove repo from db
919 err = db.RemoveRepo(tx, f.Did, f.Name)
920 if err != nil {
921 rp.pages.Notice(w, noticeId, "Failed to update appview")
922 return
923 }
924 l.Info("removed repo from db")
925
926 err = tx.Commit()
927 if err != nil {
928 l.Error("failed to commit changes", "err", err)
929 http.Error(w, err.Error(), http.StatusInternalServerError)
930 return
931 }
932
933 err = rp.enforcer.E.SavePolicy()
934 if err != nil {
935 l.Error("failed to update ACLs", "err", err)
936 http.Error(w, err.Error(), http.StatusInternalServerError)
937 return
938 }
939
940 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
941}
942
943func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
944 l := rp.logger.With("handler", "SyncRepoFork")
945
946 ref := chi.URLParam(r, "ref")
947 ref, _ = url.PathUnescape(ref)
948
949 user := rp.oauth.GetMultiAccountUser(r)
950 f, err := rp.repoResolver.Resolve(r)
951 if err != nil {
952 l.Error("failed to resolve source repo", "err", err)
953 return
954 }
955
956 switch r.Method {
957 case http.MethodPost:
958 client, err := rp.oauth.ServiceClient(
959 r,
960 oauth.WithService(f.Knot),
961 oauth.WithLxm(tangled.RepoForkSyncNSID),
962 oauth.WithDev(rp.config.Core.Dev),
963 )
964 if err != nil {
965 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
966 return
967 }
968
969 if f.Source == "" {
970 rp.pages.Notice(w, "repo", "This repository is not a fork.")
971 return
972 }
973
974 err = tangled.RepoForkSync(
975 r.Context(),
976 client,
977 &tangled.RepoForkSync_Input{
978 Did: user.Active.Did,
979 Name: f.Name,
980 Source: f.Source,
981 Branch: ref,
982 },
983 )
984 if err := xrpcclient.HandleXrpcErr(err); err != nil {
985 rp.pages.Notice(w, "repo", err.Error())
986 return
987 }
988
989 rp.pages.HxRefresh(w)
990 return
991 }
992}
993
994func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
995 l := rp.logger.With("handler", "ForkRepo")
996
997 user := rp.oauth.GetMultiAccountUser(r)
998 f, err := rp.repoResolver.Resolve(r)
999 if err != nil {
1000 l.Error("failed to resolve source repo", "err", err)
1001 return
1002 }
1003
1004 switch r.Method {
1005 case http.MethodGet:
1006 user := rp.oauth.GetMultiAccountUser(r)
1007 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did)
1008 if err != nil {
1009 rp.pages.Notice(w, "repo", "Invalid user account.")
1010 return
1011 }
1012
1013 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1014 LoggedInUser: user,
1015 Knots: knots,
1016 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1017 })
1018
1019 case http.MethodPost:
1020 l := rp.logger.With("handler", "ForkRepo")
1021
1022 targetKnot := r.FormValue("knot")
1023 if targetKnot == "" {
1024 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1025 return
1026 }
1027 l = l.With("targetKnot", targetKnot)
1028
1029 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create")
1030 if err != nil || !ok {
1031 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1032 return
1033 }
1034
1035 // choose a name for a fork
1036 forkName := r.FormValue("repo_name")
1037 if forkName == "" {
1038 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1039 return
1040 }
1041
1042 // this check is *only* to see if the forked repo name already exists
1043 // in the user's account.
1044 existingRepo, err := db.GetRepo(
1045 rp.db,
1046 orm.FilterEq("did", user.Active.Did),
1047 orm.FilterEq("name", forkName),
1048 )
1049 if err != nil {
1050 if !errors.Is(err, sql.ErrNoRows) {
1051 l.Error("error fetching existing repo from db", "err", err)
1052 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1053 return
1054 }
1055 } else if existingRepo != nil {
1056 // repo with this name already exists
1057 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1058 return
1059 }
1060 l = l.With("forkName", forkName)
1061
1062 uri := "https"
1063 if rp.config.Core.Dev {
1064 uri = "http"
1065 }
1066
1067 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name)
1068 l = l.With("cloneUrl", forkSourceUrl)
1069
1070 sourceAt := f.RepoAt().String()
1071
1072 // create an atproto record for this fork
1073 rkey := tid.TID()
1074 repo := &models.Repo{
1075 Did: user.Active.Did,
1076 Name: forkName,
1077 Knot: targetKnot,
1078 Rkey: rkey,
1079 Source: sourceAt,
1080 Description: f.Description,
1081 Created: time.Now(),
1082 Labels: rp.config.Label.DefaultLabelDefs,
1083 }
1084 record := repo.AsRecord()
1085
1086 atpClient, err := rp.oauth.AuthorizedClient(r)
1087 if err != nil {
1088 l.Error("failed to create xrpcclient", "err", err)
1089 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1090 return
1091 }
1092
1093 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1094 Collection: tangled.RepoNSID,
1095 Repo: user.Active.Did,
1096 Rkey: rkey,
1097 Record: &lexutil.LexiconTypeDecoder{
1098 Val: &record,
1099 },
1100 })
1101 if err != nil {
1102 l.Error("failed to write to PDS", "err", err)
1103 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1104 return
1105 }
1106
1107 aturi := atresp.Uri
1108 l = l.With("aturi", aturi)
1109 l.Info("wrote to PDS")
1110
1111 tx, err := rp.db.BeginTx(r.Context(), nil)
1112 if err != nil {
1113 l.Info("txn failed", "err", err)
1114 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1115 return
1116 }
1117
1118 // The rollback function reverts a few things on failure:
1119 // - the pending txn
1120 // - the ACLs
1121 // - the atproto record created
1122 rollback := func() {
1123 err1 := tx.Rollback()
1124 err2 := rp.enforcer.E.LoadPolicy()
1125 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1126
1127 // ignore txn complete errors, this is okay
1128 if errors.Is(err1, sql.ErrTxDone) {
1129 err1 = nil
1130 }
1131
1132 if errs := errors.Join(err1, err2, err3); errs != nil {
1133 l.Error("failed to rollback changes", "errs", errs)
1134 return
1135 }
1136 }
1137 defer rollback()
1138
1139 // TODO: this could coordinate better with the knot to recieve a clone status
1140 client, err := rp.oauth.ServiceClient(
1141 r,
1142 oauth.WithService(targetKnot),
1143 oauth.WithLxm(tangled.RepoCreateNSID),
1144 oauth.WithDev(rp.config.Core.Dev),
1145 oauth.WithTimeout(time.Second*20), // big repos take time to clone
1146 )
1147 if err != nil {
1148 l.Error("could not create service client", "err", err)
1149 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1150 return
1151 }
1152
1153 err = tangled.RepoCreate(
1154 r.Context(),
1155 client,
1156 &tangled.RepoCreate_Input{
1157 Rkey: rkey,
1158 Source: &forkSourceUrl,
1159 },
1160 )
1161 if err := xrpcclient.HandleXrpcErr(err); err != nil {
1162 rp.pages.Notice(w, "repo", err.Error())
1163 return
1164 }
1165
1166 err = db.AddRepo(tx, repo)
1167 if err != nil {
1168 l.Error("failed to AddRepo", "err", err)
1169 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1170 return
1171 }
1172
1173 // acls
1174 p, _ := securejoin.SecureJoin(user.Active.Did, forkName)
1175 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p)
1176 if err != nil {
1177 l.Error("failed to add ACLs", "err", err)
1178 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1179 return
1180 }
1181
1182 err = tx.Commit()
1183 if err != nil {
1184 l.Error("failed to commit changes", "err", err)
1185 http.Error(w, err.Error(), http.StatusInternalServerError)
1186 return
1187 }
1188
1189 err = rp.enforcer.E.SavePolicy()
1190 if err != nil {
1191 l.Error("failed to update ACLs", "err", err)
1192 http.Error(w, err.Error(), http.StatusInternalServerError)
1193 return
1194 }
1195
1196 // reset the ATURI because the transaction completed successfully
1197 aturi = ""
1198
1199 rp.notifier.NewRepo(r.Context(), repo)
1200 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName))
1201 }
1202}
1203
1204// this is used to rollback changes made to the PDS
1205//
1206// it is a no-op if the provided ATURI is empty
1207func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
1208 if aturi == "" {
1209 return nil
1210 }
1211
1212 parsed := syntax.ATURI(aturi)
1213
1214 collection := parsed.Collection().String()
1215 repo := parsed.Authority().String()
1216 rkey := parsed.RecordKey().String()
1217
1218 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1219 Collection: collection,
1220 Repo: repo,
1221 Rkey: rkey,
1222 })
1223 return err
1224}