A vibe coded tangled fork which supports pijul.
1package repo
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "slices"
12 "strings"
13 "time"
14
15 "tangled.org/core/appview/cloudflare"
16
17 "tangled.org/core/api/tangled"
18 "tangled.org/core/appview/config"
19 "tangled.org/core/appview/db"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 "tangled.org/core/appview/oauth"
23 "tangled.org/core/appview/pages"
24 "tangled.org/core/appview/reporesolver"
25 "tangled.org/core/appview/validator"
26 xrpcclient "tangled.org/core/appview/xrpcclient"
27 "tangled.org/core/eventconsumer"
28 "tangled.org/core/idresolver"
29 "tangled.org/core/orm"
30 "tangled.org/core/rbac"
31 "tangled.org/core/tid"
32 "tangled.org/core/xrpc/serviceauth"
33
34 comatproto "github.com/bluesky-social/indigo/api/atproto"
35 atpclient "github.com/bluesky-social/indigo/atproto/client"
36 "github.com/bluesky-social/indigo/atproto/syntax"
37 lexutil "github.com/bluesky-social/indigo/lex/util"
38 securejoin "github.com/cyphar/filepath-securejoin"
39 "github.com/go-chi/chi/v5"
40)
41
42type Repo struct {
43 repoResolver *reporesolver.RepoResolver
44 idResolver *idresolver.Resolver
45 config *config.Config
46 oauth *oauth.OAuth
47 pages *pages.Pages
48 spindlestream *eventconsumer.Consumer
49 db *db.DB
50 enforcer *rbac.Enforcer
51 notifier notify.Notifier
52 logger *slog.Logger
53 serviceAuth *serviceauth.ServiceAuth
54 validator *validator.Validator
55 cfClient *cloudflare.Client
56}
57
58func New(
59 oauth *oauth.OAuth,
60 repoResolver *reporesolver.RepoResolver,
61 pages *pages.Pages,
62 spindlestream *eventconsumer.Consumer,
63 idResolver *idresolver.Resolver,
64 db *db.DB,
65 config *config.Config,
66 notifier notify.Notifier,
67 enforcer *rbac.Enforcer,
68 logger *slog.Logger,
69 validator *validator.Validator,
70 cfClient *cloudflare.Client,
71) *Repo {
72 return &Repo{
73 oauth: oauth,
74 repoResolver: repoResolver,
75 pages: pages,
76 idResolver: idResolver,
77 config: config,
78 spindlestream: spindlestream,
79 db: db,
80 notifier: notifier,
81 enforcer: enforcer,
82 logger: logger,
83 validator: validator,
84 cfClient: cfClient,
85 }
86}
87
88// modify the spindle configured for this repo
89func (rp *Repo) EditSpindle(w http.ResponseWriter, r *http.Request) {
90 user := rp.oauth.GetMultiAccountUser(r)
91 l := rp.logger.With("handler", "EditSpindle")
92 l = l.With("did", user.Active.Did)
93
94 errorId := "operation-error"
95 fail := func(msg string, err error) {
96 l.Error(msg, "err", err)
97 rp.pages.Notice(w, errorId, msg)
98 }
99
100 f, err := rp.repoResolver.Resolve(r)
101 if err != nil {
102 fail("Failed to resolve repo. Try again later", err)
103 return
104 }
105
106 newSpindle := r.FormValue("spindle")
107 removingSpindle := newSpindle == "[[none]]" // see pages/templates/repo/settings/pipelines.html for more info on why we use this value
108 client, err := rp.oauth.AuthorizedClient(r)
109 if err != nil {
110 fail("Failed to authorize. Try again later.", err)
111 return
112 }
113
114 if !removingSpindle {
115 // ensure that this is a valid spindle for this user
116 validSpindles, err := rp.enforcer.GetSpindlesForUser(user.Active.Did)
117 if err != nil {
118 fail("Failed to find spindles. Try again later.", err)
119 return
120 }
121
122 if !slices.Contains(validSpindles, newSpindle) {
123 fail("Failed to configure spindle.", fmt.Errorf("%s is not a valid spindle: %q", newSpindle, validSpindles))
124 return
125 }
126 }
127
128 newRepo := *f
129 newRepo.Spindle = newSpindle
130 record := newRepo.AsRecord()
131
132 spindlePtr := &newSpindle
133 if removingSpindle {
134 spindlePtr = nil
135 newRepo.Spindle = ""
136 }
137
138 // optimistic update
139 err = db.UpdateSpindle(rp.db, newRepo.RepoAt().String(), spindlePtr)
140 if err != nil {
141 fail("Failed to update spindle. Try again later.", err)
142 return
143 }
144
145 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
146 if err != nil {
147 fail("Failed to update spindle, no record found on PDS.", err)
148 return
149 }
150 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
151 Collection: tangled.RepoNSID,
152 Repo: newRepo.Did,
153 Rkey: newRepo.Rkey,
154 SwapRecord: ex.Cid,
155 Record: &lexutil.LexiconTypeDecoder{
156 Val: &record,
157 },
158 })
159
160 if err != nil {
161 fail("Failed to update spindle, unable to save to PDS.", err)
162 return
163 }
164
165 if !removingSpindle {
166 // add this spindle to spindle stream
167 rp.spindlestream.AddSource(
168 context.Background(),
169 eventconsumer.NewSpindleSource(newSpindle),
170 )
171 }
172
173 rp.pages.HxRefresh(w)
174}
175
176func (rp *Repo) AddLabelDef(w http.ResponseWriter, r *http.Request) {
177 user := rp.oauth.GetMultiAccountUser(r)
178 l := rp.logger.With("handler", "AddLabel")
179 l = l.With("did", user.Active.Did)
180
181 f, err := rp.repoResolver.Resolve(r)
182 if err != nil {
183 l.Error("failed to get repo and knot", "err", err)
184 return
185 }
186
187 errorId := "add-label-error"
188 fail := func(msg string, err error) {
189 l.Error(msg, "err", err)
190 rp.pages.Notice(w, errorId, msg)
191 }
192
193 // get form values for label definition
194 name := r.FormValue("name")
195 concreteType := r.FormValue("valueType")
196 valueFormat := r.FormValue("valueFormat")
197 enumValues := r.FormValue("enumValues")
198 scope := r.Form["scope"]
199 color := r.FormValue("color")
200 multiple := r.FormValue("multiple") == "true"
201
202 var variants []string
203 for part := range strings.SplitSeq(enumValues, ",") {
204 if part = strings.TrimSpace(part); part != "" {
205 variants = append(variants, part)
206 }
207 }
208
209 if concreteType == "" {
210 concreteType = "null"
211 }
212
213 format := models.ValueTypeFormatAny
214 if valueFormat == "did" {
215 format = models.ValueTypeFormatDid
216 }
217
218 valueType := models.ValueType{
219 Type: models.ConcreteType(concreteType),
220 Format: format,
221 Enum: variants,
222 }
223
224 label := models.LabelDefinition{
225 Did: user.Active.Did,
226 Rkey: tid.TID(),
227 Name: name,
228 ValueType: valueType,
229 Scope: scope,
230 Color: &color,
231 Multiple: multiple,
232 Created: time.Now(),
233 }
234 if err := rp.validator.ValidateLabelDefinition(&label); err != nil {
235 fail(err.Error(), err)
236 return
237 }
238
239 // announce this relation into the firehose, store into owners' pds
240 client, err := rp.oauth.AuthorizedClient(r)
241 if err != nil {
242 fail(err.Error(), err)
243 return
244 }
245
246 // emit a labelRecord
247 labelRecord := label.AsRecord()
248 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
249 Collection: tangled.LabelDefinitionNSID,
250 Repo: label.Did,
251 Rkey: label.Rkey,
252 Record: &lexutil.LexiconTypeDecoder{
253 Val: &labelRecord,
254 },
255 })
256 // invalid record
257 if err != nil {
258 fail("Failed to write record to PDS.", err)
259 return
260 }
261
262 aturi := resp.Uri
263 l = l.With("at-uri", aturi)
264 l.Info("wrote label record to PDS")
265
266 // update the repo to subscribe to this label
267 newRepo := *f
268 newRepo.Labels = append(newRepo.Labels, aturi)
269 repoRecord := newRepo.AsRecord()
270
271 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
272 if err != nil {
273 fail("Failed to update labels, no record found on PDS.", err)
274 return
275 }
276 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
277 Collection: tangled.RepoNSID,
278 Repo: newRepo.Did,
279 Rkey: newRepo.Rkey,
280 SwapRecord: ex.Cid,
281 Record: &lexutil.LexiconTypeDecoder{
282 Val: &repoRecord,
283 },
284 })
285 if err != nil {
286 fail("Failed to update labels for repo.", err)
287 return
288 }
289
290 tx, err := rp.db.BeginTx(r.Context(), nil)
291 if err != nil {
292 fail("Failed to add label.", err)
293 return
294 }
295
296 rollback := func() {
297 err1 := tx.Rollback()
298 err2 := rollbackRecord(context.Background(), aturi, client)
299
300 // ignore txn complete errors, this is okay
301 if errors.Is(err1, sql.ErrTxDone) {
302 err1 = nil
303 }
304
305 if errs := errors.Join(err1, err2); errs != nil {
306 l.Error("failed to rollback changes", "errs", errs)
307 return
308 }
309 }
310 defer rollback()
311
312 _, err = db.AddLabelDefinition(tx, &label)
313 if err != nil {
314 fail("Failed to add label.", err)
315 return
316 }
317
318 err = db.SubscribeLabel(tx, &models.RepoLabel{
319 RepoAt: f.RepoAt(),
320 LabelAt: label.AtUri(),
321 })
322
323 err = tx.Commit()
324 if err != nil {
325 fail("Failed to add label.", err)
326 return
327 }
328
329 // clear aturi when everything is successful
330 aturi = ""
331
332 rp.pages.HxRefresh(w)
333}
334
335func (rp *Repo) DeleteLabelDef(w http.ResponseWriter, r *http.Request) {
336 user := rp.oauth.GetMultiAccountUser(r)
337 l := rp.logger.With("handler", "DeleteLabel")
338 l = l.With("did", user.Active.Did)
339
340 f, err := rp.repoResolver.Resolve(r)
341 if err != nil {
342 l.Error("failed to get repo and knot", "err", err)
343 return
344 }
345
346 errorId := "label-operation"
347 fail := func(msg string, err error) {
348 l.Error(msg, "err", err)
349 rp.pages.Notice(w, errorId, msg)
350 }
351
352 // get form values
353 labelId := r.FormValue("label-id")
354
355 label, err := db.GetLabelDefinition(rp.db, orm.FilterEq("id", labelId))
356 if err != nil {
357 fail("Failed to find label definition.", err)
358 return
359 }
360
361 client, err := rp.oauth.AuthorizedClient(r)
362 if err != nil {
363 fail(err.Error(), err)
364 return
365 }
366
367 // delete label record from PDS
368 _, err = comatproto.RepoDeleteRecord(r.Context(), client, &comatproto.RepoDeleteRecord_Input{
369 Collection: tangled.LabelDefinitionNSID,
370 Repo: label.Did,
371 Rkey: label.Rkey,
372 })
373 if err != nil {
374 fail("Failed to delete label record from PDS.", err)
375 return
376 }
377
378 // update repo record to remove the label reference
379 newRepo := *f
380 var updated []string
381 removedAt := label.AtUri().String()
382 for _, l := range newRepo.Labels {
383 if l != removedAt {
384 updated = append(updated, l)
385 }
386 }
387 newRepo.Labels = updated
388 repoRecord := newRepo.AsRecord()
389
390 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, newRepo.Did, newRepo.Rkey)
391 if err != nil {
392 fail("Failed to update labels, no record found on PDS.", err)
393 return
394 }
395 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
396 Collection: tangled.RepoNSID,
397 Repo: newRepo.Did,
398 Rkey: newRepo.Rkey,
399 SwapRecord: ex.Cid,
400 Record: &lexutil.LexiconTypeDecoder{
401 Val: &repoRecord,
402 },
403 })
404 if err != nil {
405 fail("Failed to update repo record.", err)
406 return
407 }
408
409 // transaction for DB changes
410 tx, err := rp.db.BeginTx(r.Context(), nil)
411 if err != nil {
412 fail("Failed to delete label.", err)
413 return
414 }
415 defer tx.Rollback()
416
417 err = db.UnsubscribeLabel(
418 tx,
419 orm.FilterEq("repo_at", f.RepoAt()),
420 orm.FilterEq("label_at", removedAt),
421 )
422 if err != nil {
423 fail("Failed to unsubscribe label.", err)
424 return
425 }
426
427 err = db.DeleteLabelDefinition(tx, orm.FilterEq("id", label.Id))
428 if err != nil {
429 fail("Failed to delete label definition.", err)
430 return
431 }
432
433 err = tx.Commit()
434 if err != nil {
435 fail("Failed to delete label.", err)
436 return
437 }
438
439 // everything succeeded
440 rp.pages.HxRefresh(w)
441}
442
443func (rp *Repo) SubscribeLabel(w http.ResponseWriter, r *http.Request) {
444 user := rp.oauth.GetMultiAccountUser(r)
445 l := rp.logger.With("handler", "SubscribeLabel")
446 l = l.With("did", user.Active.Did)
447
448 f, err := rp.repoResolver.Resolve(r)
449 if err != nil {
450 l.Error("failed to get repo and knot", "err", err)
451 return
452 }
453
454 if err := r.ParseForm(); err != nil {
455 l.Error("invalid form", "err", err)
456 return
457 }
458
459 errorId := "default-label-operation"
460 fail := func(msg string, err error) {
461 l.Error(msg, "err", err)
462 rp.pages.Notice(w, errorId, msg)
463 }
464
465 labelAts := r.Form["label"]
466 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
467 if err != nil {
468 fail("Failed to subscribe to label.", err)
469 return
470 }
471
472 newRepo := *f
473 newRepo.Labels = append(newRepo.Labels, labelAts...)
474
475 // dedup
476 slices.Sort(newRepo.Labels)
477 newRepo.Labels = slices.Compact(newRepo.Labels)
478
479 repoRecord := newRepo.AsRecord()
480
481 client, err := rp.oauth.AuthorizedClient(r)
482 if err != nil {
483 fail(err.Error(), err)
484 return
485 }
486
487 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
488 if err != nil {
489 fail("Failed to update labels, no record found on PDS.", err)
490 return
491 }
492 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
493 Collection: tangled.RepoNSID,
494 Repo: newRepo.Did,
495 Rkey: newRepo.Rkey,
496 SwapRecord: ex.Cid,
497 Record: &lexutil.LexiconTypeDecoder{
498 Val: &repoRecord,
499 },
500 })
501
502 tx, err := rp.db.Begin()
503 if err != nil {
504 fail("Failed to subscribe to label.", err)
505 return
506 }
507 defer tx.Rollback()
508
509 for _, l := range labelAts {
510 err = db.SubscribeLabel(tx, &models.RepoLabel{
511 RepoAt: f.RepoAt(),
512 LabelAt: syntax.ATURI(l),
513 })
514 if err != nil {
515 fail("Failed to subscribe to label.", err)
516 return
517 }
518 }
519
520 if err := tx.Commit(); err != nil {
521 fail("Failed to subscribe to label.", err)
522 return
523 }
524
525 // everything succeeded
526 rp.pages.HxRefresh(w)
527}
528
529func (rp *Repo) UnsubscribeLabel(w http.ResponseWriter, r *http.Request) {
530 user := rp.oauth.GetMultiAccountUser(r)
531 l := rp.logger.With("handler", "UnsubscribeLabel")
532 l = l.With("did", user.Active.Did)
533
534 f, err := rp.repoResolver.Resolve(r)
535 if err != nil {
536 l.Error("failed to get repo and knot", "err", err)
537 return
538 }
539
540 if err := r.ParseForm(); err != nil {
541 l.Error("invalid form", "err", err)
542 return
543 }
544
545 errorId := "default-label-operation"
546 fail := func(msg string, err error) {
547 l.Error(msg, "err", err)
548 rp.pages.Notice(w, errorId, msg)
549 }
550
551 labelAts := r.Form["label"]
552 _, err = db.GetLabelDefinitions(rp.db, orm.FilterIn("at_uri", labelAts))
553 if err != nil {
554 fail("Failed to unsubscribe to label.", err)
555 return
556 }
557
558 // update repo record to remove the label reference
559 newRepo := *f
560 var updated []string
561 for _, l := range newRepo.Labels {
562 if !slices.Contains(labelAts, l) {
563 updated = append(updated, l)
564 }
565 }
566 newRepo.Labels = updated
567 repoRecord := newRepo.AsRecord()
568
569 client, err := rp.oauth.AuthorizedClient(r)
570 if err != nil {
571 fail(err.Error(), err)
572 return
573 }
574
575 ex, err := comatproto.RepoGetRecord(r.Context(), client, "", tangled.RepoNSID, f.Did, f.Rkey)
576 if err != nil {
577 fail("Failed to update labels, no record found on PDS.", err)
578 return
579 }
580 _, err = comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
581 Collection: tangled.RepoNSID,
582 Repo: newRepo.Did,
583 Rkey: newRepo.Rkey,
584 SwapRecord: ex.Cid,
585 Record: &lexutil.LexiconTypeDecoder{
586 Val: &repoRecord,
587 },
588 })
589
590 err = db.UnsubscribeLabel(
591 rp.db,
592 orm.FilterEq("repo_at", f.RepoAt()),
593 orm.FilterIn("label_at", labelAts),
594 )
595 if err != nil {
596 fail("Failed to unsubscribe label.", err)
597 return
598 }
599
600 // everything succeeded
601 rp.pages.HxRefresh(w)
602}
603
604func (rp *Repo) LabelPanel(w http.ResponseWriter, r *http.Request) {
605 l := rp.logger.With("handler", "LabelPanel")
606
607 f, err := rp.repoResolver.Resolve(r)
608 if err != nil {
609 l.Error("failed to get repo and knot", "err", err)
610 return
611 }
612
613 subjectStr := r.FormValue("subject")
614 subject, err := syntax.ParseATURI(subjectStr)
615 if err != nil {
616 l.Error("failed to get repo and knot", "err", err)
617 return
618 }
619
620 labelDefs, err := db.GetLabelDefinitions(
621 rp.db,
622 orm.FilterIn("at_uri", f.Labels),
623 orm.FilterContains("scope", subject.Collection().String()),
624 )
625 if err != nil {
626 l.Error("failed to fetch label defs", "err", err)
627 return
628 }
629
630 defs := make(map[string]*models.LabelDefinition)
631 for _, l := range labelDefs {
632 defs[l.AtUri().String()] = &l
633 }
634
635 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
636 if err != nil {
637 l.Error("failed to build label state", "err", err)
638 return
639 }
640 state := states[subject]
641
642 user := rp.oauth.GetMultiAccountUser(r)
643 rp.pages.LabelPanel(w, pages.LabelPanelParams{
644 LoggedInUser: user,
645 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
646 Defs: defs,
647 Subject: subject.String(),
648 State: state,
649 })
650}
651
652func (rp *Repo) EditLabelPanel(w http.ResponseWriter, r *http.Request) {
653 l := rp.logger.With("handler", "EditLabelPanel")
654
655 f, err := rp.repoResolver.Resolve(r)
656 if err != nil {
657 l.Error("failed to get repo and knot", "err", err)
658 return
659 }
660
661 subjectStr := r.FormValue("subject")
662 subject, err := syntax.ParseATURI(subjectStr)
663 if err != nil {
664 l.Error("failed to get repo and knot", "err", err)
665 return
666 }
667
668 labelDefs, err := db.GetLabelDefinitions(
669 rp.db,
670 orm.FilterIn("at_uri", f.Labels),
671 orm.FilterContains("scope", subject.Collection().String()),
672 )
673 if err != nil {
674 l.Error("failed to fetch labels", "err", err)
675 return
676 }
677
678 defs := make(map[string]*models.LabelDefinition)
679 for _, l := range labelDefs {
680 defs[l.AtUri().String()] = &l
681 }
682
683 states, err := db.GetLabels(rp.db, orm.FilterEq("subject", subject))
684 if err != nil {
685 l.Error("failed to build label state", "err", err)
686 return
687 }
688 state := states[subject]
689
690 user := rp.oauth.GetMultiAccountUser(r)
691 rp.pages.EditLabelPanel(w, pages.EditLabelPanelParams{
692 LoggedInUser: user,
693 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
694 Defs: defs,
695 Subject: subject.String(),
696 State: state,
697 })
698}
699
700func (rp *Repo) AddCollaborator(w http.ResponseWriter, r *http.Request) {
701 user := rp.oauth.GetMultiAccountUser(r)
702 l := rp.logger.With("handler", "AddCollaborator")
703 l = l.With("did", user.Active.Did)
704
705 f, err := rp.repoResolver.Resolve(r)
706 if err != nil {
707 l.Error("failed to get repo and knot", "err", err)
708 return
709 }
710
711 errorId := "add-collaborator-error"
712 fail := func(msg string, err error) {
713 l.Error(msg, "err", err)
714 rp.pages.Notice(w, errorId, msg)
715 }
716
717 collaborator := r.FormValue("collaborator")
718 if collaborator == "" {
719 fail("Invalid form.", nil)
720 return
721 }
722
723 // remove a single leading `@`, to make @handle work with ResolveIdent
724 collaborator = strings.TrimPrefix(collaborator, "@")
725
726 collaboratorIdent, err := rp.idResolver.ResolveIdent(r.Context(), collaborator)
727 if err != nil {
728 fail(fmt.Sprintf("'%s' is not a valid DID/handle.", collaborator), err)
729 return
730 }
731
732 if collaboratorIdent.DID.String() == user.Active.Did {
733 fail("You seem to be adding yourself as a collaborator.", nil)
734 return
735 }
736 l = l.With("collaborator", collaboratorIdent.Handle)
737 l = l.With("knot", f.Knot)
738
739 // announce this relation into the firehose, store into owners' pds
740 client, err := rp.oauth.AuthorizedClient(r)
741 if err != nil {
742 fail("Failed to write to PDS.", err)
743 return
744 }
745
746 // emit a record
747 currentUser := rp.oauth.GetMultiAccountUser(r)
748 rkey := tid.TID()
749 createdAt := time.Now()
750 resp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{
751 Collection: tangled.RepoCollaboratorNSID,
752 Repo: currentUser.Active.Did,
753 Rkey: rkey,
754 Record: &lexutil.LexiconTypeDecoder{
755 Val: &tangled.RepoCollaborator{
756 Subject: collaboratorIdent.DID.String(),
757 Repo: string(f.RepoAt()),
758 CreatedAt: createdAt.Format(time.RFC3339),
759 }},
760 })
761 // invalid record
762 if err != nil {
763 fail("Failed to write record to PDS.", err)
764 return
765 }
766
767 aturi := resp.Uri
768 l = l.With("at-uri", aturi)
769 l.Info("wrote record to PDS")
770
771 tx, err := rp.db.BeginTx(r.Context(), nil)
772 if err != nil {
773 fail("Failed to add collaborator.", err)
774 return
775 }
776
777 rollback := func() {
778 err1 := tx.Rollback()
779 err2 := rp.enforcer.E.LoadPolicy()
780 err3 := rollbackRecord(context.Background(), aturi, client)
781
782 // ignore txn complete errors, this is okay
783 if errors.Is(err1, sql.ErrTxDone) {
784 err1 = nil
785 }
786
787 if errs := errors.Join(err1, err2, err3); errs != nil {
788 l.Error("failed to rollback changes", "errs", errs)
789 return
790 }
791 }
792 defer rollback()
793
794 err = rp.enforcer.AddCollaborator(collaboratorIdent.DID.String(), f.Knot, f.DidSlashRepo())
795 if err != nil {
796 fail("Failed to add collaborator permissions.", err)
797 return
798 }
799 err = db.AddCollaborator(tx, models.Collaborator{
800 Did: syntax.DID(currentUser.Active.Did),
801 Rkey: rkey,
802 SubjectDid: collaboratorIdent.DID,
803 RepoAt: f.RepoAt(),
804 Created: createdAt,
805 })
806 if err != nil {
807 fail("Failed to add collaborator.", err)
808 return
809 }
810
811 err = tx.Commit()
812 if err != nil {
813 fail("Failed to add collaborator.", err)
814 return
815 }
816
817 err = rp.enforcer.E.SavePolicy()
818 if err != nil {
819 fail("Failed to update collaborator permissions.", err)
820 return
821 }
822
823 // clear aturi to when everything is successful
824 aturi = ""
825
826 rp.pages.HxRefresh(w)
827}
828
829func (rp *Repo) DeleteRepo(w http.ResponseWriter, r *http.Request) {
830 user := rp.oauth.GetMultiAccountUser(r)
831 l := rp.logger.With("handler", "DeleteRepo")
832
833 noticeId := "operation-error"
834 f, err := rp.repoResolver.Resolve(r)
835 if err != nil {
836 l.Error("failed to get repo and knot", "err", err)
837 return
838 }
839
840 // remove record from pds
841 atpClient, err := rp.oauth.AuthorizedClient(r)
842 if err != nil {
843 l.Error("failed to get authorized client", "err", err)
844 return
845 }
846 _, err = comatproto.RepoDeleteRecord(r.Context(), atpClient, &comatproto.RepoDeleteRecord_Input{
847 Collection: tangled.RepoNSID,
848 Repo: user.Active.Did,
849 Rkey: f.Rkey,
850 })
851 if err != nil {
852 l.Error("failed to delete record", "err", err)
853 rp.pages.Notice(w, noticeId, "Failed to delete repository from PDS.")
854 return
855 }
856 l.Info("removed repo record", "aturi", f.RepoAt().String())
857
858 client, err := rp.oauth.ServiceClient(
859 r,
860 oauth.WithService(f.Knot),
861 oauth.WithLxm(tangled.RepoDeleteNSID),
862 oauth.WithDev(rp.config.Core.Dev),
863 )
864 if err != nil {
865 l.Error("failed to connect to knot server", "err", err)
866 return
867 }
868
869 err = tangled.RepoDelete(
870 r.Context(),
871 client,
872 &tangled.RepoDelete_Input{
873 Did: f.Did,
874 Name: f.Name,
875 Rkey: f.Rkey,
876 },
877 )
878 if err := xrpcclient.HandleXrpcErr(err); err != nil {
879 rp.pages.Notice(w, noticeId, err.Error())
880 return
881 }
882 l.Info("deleted repo from knot")
883
884 tx, err := rp.db.BeginTx(r.Context(), nil)
885 if err != nil {
886 l.Error("failed to start tx")
887 w.Write(fmt.Append(nil, "failed to add collaborator: ", err))
888 return
889 }
890 defer func() {
891 tx.Rollback()
892 err = rp.enforcer.E.LoadPolicy()
893 if err != nil {
894 l.Error("failed to rollback policies")
895 }
896 }()
897
898 // remove collaborator RBAC
899 repoCollaborators, err := rp.enforcer.E.GetImplicitUsersForResourceByDomain(f.DidSlashRepo(), f.Knot)
900 if err != nil {
901 rp.pages.Notice(w, noticeId, "Failed to remove collaborators")
902 return
903 }
904 for _, c := range repoCollaborators {
905 did := c[0]
906 rp.enforcer.RemoveCollaborator(did, f.Knot, f.DidSlashRepo())
907 }
908 l.Info("removed collaborators")
909
910 // remove repo RBAC
911 err = rp.enforcer.RemoveRepo(f.Did, f.Knot, f.DidSlashRepo())
912 if err != nil {
913 rp.pages.Notice(w, noticeId, "Failed to update RBAC rules")
914 return
915 }
916 // remove repo from db
917 err = db.RemoveRepo(tx, f.Did, f.Name)
918 if err != nil {
919 rp.pages.Notice(w, noticeId, "Failed to update appview")
920 return
921 }
922 l.Info("removed repo from db")
923
924 err = tx.Commit()
925 if err != nil {
926 l.Error("failed to commit changes", "err", err)
927 http.Error(w, err.Error(), http.StatusInternalServerError)
928 return
929 }
930
931 err = rp.enforcer.E.SavePolicy()
932 if err != nil {
933 l.Error("failed to update ACLs", "err", err)
934 http.Error(w, err.Error(), http.StatusInternalServerError)
935 return
936 }
937
938 rp.pages.HxRedirect(w, fmt.Sprintf("/%s", f.Did))
939}
940
941func (rp *Repo) SyncRepoFork(w http.ResponseWriter, r *http.Request) {
942 l := rp.logger.With("handler", "SyncRepoFork")
943
944 ref := chi.URLParam(r, "ref")
945 ref, _ = url.PathUnescape(ref)
946
947 user := rp.oauth.GetMultiAccountUser(r)
948 f, err := rp.repoResolver.Resolve(r)
949 if err != nil {
950 l.Error("failed to resolve source repo", "err", err)
951 return
952 }
953
954 switch r.Method {
955 case http.MethodPost:
956 client, err := rp.oauth.ServiceClient(
957 r,
958 oauth.WithService(f.Knot),
959 oauth.WithLxm(tangled.RepoForkSyncNSID),
960 oauth.WithDev(rp.config.Core.Dev),
961 )
962 if err != nil {
963 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
964 return
965 }
966
967 if f.Source == "" {
968 rp.pages.Notice(w, "repo", "This repository is not a fork.")
969 return
970 }
971
972 err = tangled.RepoForkSync(
973 r.Context(),
974 client,
975 &tangled.RepoForkSync_Input{
976 Did: user.Active.Did,
977 Name: f.Name,
978 Source: f.Source,
979 Branch: ref,
980 },
981 )
982 if err := xrpcclient.HandleXrpcErr(err); err != nil {
983 rp.pages.Notice(w, "repo", err.Error())
984 return
985 }
986
987 rp.pages.HxRefresh(w)
988 return
989 }
990}
991
992func (rp *Repo) ForkRepo(w http.ResponseWriter, r *http.Request) {
993 l := rp.logger.With("handler", "ForkRepo")
994
995 user := rp.oauth.GetMultiAccountUser(r)
996 f, err := rp.repoResolver.Resolve(r)
997 if err != nil {
998 l.Error("failed to resolve source repo", "err", err)
999 return
1000 }
1001
1002 switch r.Method {
1003 case http.MethodGet:
1004 user := rp.oauth.GetMultiAccountUser(r)
1005 knots, err := rp.enforcer.GetKnotsForUser(user.Active.Did)
1006 if err != nil {
1007 rp.pages.Notice(w, "repo", "Invalid user account.")
1008 return
1009 }
1010
1011 rp.pages.ForkRepo(w, pages.ForkRepoParams{
1012 LoggedInUser: user,
1013 Knots: knots,
1014 RepoInfo: rp.repoResolver.GetRepoInfo(r, user),
1015 })
1016
1017 case http.MethodPost:
1018 l := rp.logger.With("handler", "ForkRepo")
1019
1020 targetKnot := r.FormValue("knot")
1021 if targetKnot == "" {
1022 rp.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
1023 return
1024 }
1025 l = l.With("targetKnot", targetKnot)
1026
1027 ok, err := rp.enforcer.E.Enforce(user.Active.Did, targetKnot, targetKnot, "repo:create")
1028 if err != nil || !ok {
1029 rp.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
1030 return
1031 }
1032
1033 // choose a name for a fork
1034 forkName := r.FormValue("repo_name")
1035 if forkName == "" {
1036 rp.pages.Notice(w, "repo", "Repository name cannot be empty.")
1037 return
1038 }
1039
1040 // this check is *only* to see if the forked repo name already exists
1041 // in the user's account.
1042 existingRepo, err := db.GetRepo(
1043 rp.db,
1044 orm.FilterEq("did", user.Active.Did),
1045 orm.FilterEq("name", forkName),
1046 )
1047 if err != nil {
1048 if !errors.Is(err, sql.ErrNoRows) {
1049 l.Error("error fetching existing repo from db", "err", err)
1050 rp.pages.Notice(w, "repo", "Failed to fork this repository. Try again later.")
1051 return
1052 }
1053 } else if existingRepo != nil {
1054 // repo with this name already exists
1055 rp.pages.Notice(w, "repo", "A repository with this name already exists.")
1056 return
1057 }
1058 l = l.With("forkName", forkName)
1059
1060 uri := "https"
1061 if rp.config.Core.Dev {
1062 uri = "http"
1063 }
1064
1065 forkSourceUrl := fmt.Sprintf("%s://%s/%s/%s", uri, f.Knot, f.Did, f.Name)
1066 l = l.With("cloneUrl", forkSourceUrl)
1067
1068 sourceAt := f.RepoAt().String()
1069
1070 // create an atproto record for this fork
1071 rkey := tid.TID()
1072 repo := &models.Repo{
1073 Did: user.Active.Did,
1074 Name: forkName,
1075 Knot: targetKnot,
1076 Rkey: rkey,
1077 Source: sourceAt,
1078 Description: f.Description,
1079 Created: time.Now(),
1080 Labels: rp.config.Label.DefaultLabelDefs,
1081 Vcs: f.Vcs,
1082 }
1083 record := repo.AsRecord()
1084
1085 atpClient, err := rp.oauth.AuthorizedClient(r)
1086 if err != nil {
1087 l.Error("failed to create xrpcclient", "err", err)
1088 rp.pages.Notice(w, "repo", "Failed to fork repository.")
1089 return
1090 }
1091
1092 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
1093 Collection: tangled.RepoNSID,
1094 Repo: user.Active.Did,
1095 Rkey: rkey,
1096 Record: &lexutil.LexiconTypeDecoder{
1097 Val: &record,
1098 },
1099 })
1100 if err != nil {
1101 l.Error("failed to write to PDS", "err", err)
1102 rp.pages.Notice(w, "repo", "Failed to announce repository creation.")
1103 return
1104 }
1105
1106 aturi := atresp.Uri
1107 l = l.With("aturi", aturi)
1108 l.Info("wrote to PDS")
1109
1110 tx, err := rp.db.BeginTx(r.Context(), nil)
1111 if err != nil {
1112 l.Info("txn failed", "err", err)
1113 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1114 return
1115 }
1116
1117 // The rollback function reverts a few things on failure:
1118 // - the pending txn
1119 // - the ACLs
1120 // - the atproto record created
1121 rollback := func() {
1122 err1 := tx.Rollback()
1123 err2 := rp.enforcer.E.LoadPolicy()
1124 err3 := rollbackRecord(context.Background(), aturi, atpClient)
1125
1126 // ignore txn complete errors, this is okay
1127 if errors.Is(err1, sql.ErrTxDone) {
1128 err1 = nil
1129 }
1130
1131 if errs := errors.Join(err1, err2, err3); errs != nil {
1132 l.Error("failed to rollback changes", "errs", errs)
1133 return
1134 }
1135 }
1136 defer rollback()
1137
1138 // TODO: this could coordinate better with the knot to recieve a clone status
1139 client, err := rp.oauth.ServiceClient(
1140 r,
1141 oauth.WithService(targetKnot),
1142 oauth.WithLxm(tangled.RepoCreateNSID),
1143 oauth.WithDev(rp.config.Core.Dev),
1144 oauth.WithTimeout(time.Second*20), // big repos take time to clone
1145 )
1146 if err != nil {
1147 l.Error("could not create service client", "err", err)
1148 rp.pages.Notice(w, "repo", "Failed to connect to knot server.")
1149 return
1150 }
1151
1152 err = tangled.RepoCreate(
1153 r.Context(),
1154 client,
1155 &tangled.RepoCreate_Input{
1156 Rkey: rkey,
1157 Source: &forkSourceUrl,
1158 Vcs: &f.Vcs,
1159 },
1160 )
1161 if err := xrpcclient.HandleXrpcErr(err); err != nil {
1162 rp.pages.Notice(w, "repo", err.Error())
1163 return
1164 }
1165
1166 err = db.AddRepo(tx, repo)
1167 if err != nil {
1168 l.Error("failed to AddRepo", "err", err)
1169 rp.pages.Notice(w, "repo", "Failed to save repository information.")
1170 return
1171 }
1172
1173 // acls
1174 p, _ := securejoin.SecureJoin(user.Active.Did, forkName)
1175 err = rp.enforcer.AddRepo(user.Active.Did, targetKnot, p)
1176 if err != nil {
1177 l.Error("failed to add ACLs", "err", err)
1178 rp.pages.Notice(w, "repo", "Failed to set up repository permissions.")
1179 return
1180 }
1181 err = tx.Commit()
1182 if err != nil {
1183 l.Error("failed to commit changes", "err", err)
1184 http.Error(w, err.Error(), http.StatusInternalServerError)
1185 return
1186 }
1187
1188 err = rp.enforcer.E.SavePolicy()
1189 if err != nil {
1190 l.Error("failed to update ACLs", "err", err)
1191 http.Error(w, err.Error(), http.StatusInternalServerError)
1192 return
1193 }
1194
1195 // reset the ATURI because the transaction completed successfully
1196 aturi = ""
1197
1198 rp.notifier.NewRepo(r.Context(), repo)
1199 rp.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, forkName))
1200 }
1201}
1202
1203// this is used to rollback changes made to the PDS
1204//
1205// it is a no-op if the provided ATURI is empty
1206func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
1207 if aturi == "" {
1208 return nil
1209 }
1210
1211 parsed := syntax.ATURI(aturi)
1212
1213 collection := parsed.Collection().String()
1214 repo := parsed.Authority().String()
1215 rkey := parsed.RecordKey().String()
1216
1217 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
1218 Collection: collection,
1219 Repo: repo,
1220 Rkey: rkey,
1221 })
1222 return err
1223}