A vibe coded tangled fork which supports pijul.
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/indexer"
18 "tangled.org/core/appview/mentions"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/appview/notify"
21 dbnotify "tangled.org/core/appview/notify/db"
22 phnotify "tangled.org/core/appview/notify/posthog"
23 "tangled.org/core/appview/oauth"
24 "tangled.org/core/appview/pages"
25 "tangled.org/core/appview/reporesolver"
26 xrpcclient "tangled.org/core/appview/xrpcclient"
27 "tangled.org/core/eventconsumer"
28 "tangled.org/core/idresolver"
29 "tangled.org/core/jetstream"
30 "tangled.org/core/log"
31 tlog "tangled.org/core/log"
32 "tangled.org/core/orm"
33 "tangled.org/core/rbac"
34 "tangled.org/core/tid"
35
36 comatproto "github.com/bluesky-social/indigo/api/atproto"
37 atpclient "github.com/bluesky-social/indigo/atproto/client"
38 "github.com/bluesky-social/indigo/atproto/syntax"
39 lexutil "github.com/bluesky-social/indigo/lex/util"
40 securejoin "github.com/cyphar/filepath-securejoin"
41 "github.com/go-chi/chi/v5"
42 "github.com/posthog/posthog-go"
43)
44
45type State struct {
46 db *db.DB
47 notifier notify.Notifier
48 indexer *indexer.Indexer
49 oauth *oauth.OAuth
50 enforcer *rbac.Enforcer
51 pages *pages.Pages
52 idResolver *idresolver.Resolver
53 mentionsResolver *mentions.Resolver
54 posthog posthog.Client
55 jc *jetstream.JetstreamClient
56 config *config.Config
57 repoResolver *reporesolver.RepoResolver
58 knotstream *eventconsumer.Consumer
59 spindlestream *eventconsumer.Consumer
60 logger *slog.Logger
61}
62
63func Make(ctx context.Context, config *config.Config) (*State, error) {
64 logger := tlog.FromContext(ctx)
65
66 d, err := db.Make(ctx, config.Core.DbPath)
67 if err != nil {
68 return nil, fmt.Errorf("failed to create db: %w", err)
69 }
70
71 indexer := indexer.New(log.SubLogger(logger, "indexer"))
72 err = indexer.Init(ctx, d)
73 if err != nil {
74 return nil, fmt.Errorf("failed to create indexer: %w", err)
75 }
76
77 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
78 if err != nil {
79 return nil, fmt.Errorf("failed to create enforcer: %w", err)
80 }
81
82 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
83 if err != nil {
84 logger.Error("failed to create redis resolver", "err", err)
85 res = idresolver.DefaultResolver(config.Plc.PLCURL)
86 }
87
88 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
89 if err != nil {
90 return nil, fmt.Errorf("failed to create posthog client: %w", err)
91 }
92
93 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
94 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
95 if err != nil {
96 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
97 }
98
99 repoResolver := reporesolver.New(config, enforcer, d)
100
101 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
102
103 wrapper := db.DbWrapper{Execer: d}
104 jc, err := jetstream.NewJetstreamClient(
105 config.Jetstream.Endpoint,
106 "appview",
107 []string{
108 tangled.GraphFollowNSID,
109 tangled.FeedStarNSID,
110 tangled.PublicKeyNSID,
111 tangled.RepoArtifactNSID,
112 tangled.ActorProfileNSID,
113 tangled.SpindleMemberNSID,
114 tangled.SpindleNSID,
115 tangled.StringNSID,
116 tangled.RepoIssueNSID,
117 tangled.RepoIssueCommentNSID,
118 tangled.LabelDefinitionNSID,
119 tangled.LabelOpNSID,
120 },
121 nil,
122 tlog.SubLogger(logger, "jetstream"),
123 wrapper,
124 false,
125
126 // in-memory filter is inapplicable to appview so
127 // we'll never log dids anyway.
128 false,
129 )
130 if err != nil {
131 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
132 }
133
134 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
135 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
136 }
137
138 ingester := appview.Ingester{
139 Db: wrapper,
140 Enforcer: enforcer,
141 IdResolver: res,
142 Config: config,
143 Logger: log.SubLogger(logger, "ingester"),
144 }
145 err = jc.StartJetstream(ctx, ingester.Ingest())
146 if err != nil {
147 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
148 }
149
150 var notifiers []notify.Notifier
151
152 // Always add the database notifier
153 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
154
155 // Add other notifiers in production only
156 if !config.Core.Dev {
157 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
158 }
159 notifiers = append(notifiers, indexer)
160
161 // Add webhook notifier
162 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
163
164 notifier := notify.NewMergedNotifier(notifiers)
165 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
166
167 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier)
168 if err != nil {
169 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
170 }
171 knotstream.Start(ctx)
172
173 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
174 if err != nil {
175 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
176 }
177 spindlestream.Start(ctx)
178
179 state := &State{
180 d,
181 notifier,
182 indexer,
183 oauth,
184 enforcer,
185 pages,
186 res,
187 mentionsResolver,
188 posthog,
189 jc,
190 config,
191 repoResolver,
192 knotstream,
193 spindlestream,
194 logger,
195 }
196
197 return state, nil
198}
199
200func (s *State) Close() error {
201 // other close up logic goes here
202 return s.db.Close()
203}
204
205func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
206 w.Header().Set("Content-Type", "text/plain")
207 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
208
209 robotsTxt := `# Hello, Tanglers!
210User-agent: *
211Allow: /
212Disallow: /*/*/settings
213Disallow: /settings
214Disallow: /*/*/compare
215Disallow: /*/*/fork
216
217Crawl-delay: 1
218`
219 w.Write([]byte(robotsTxt))
220}
221
222func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
223 user := s.oauth.GetMultiAccountUser(r)
224 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
225 LoggedInUser: user,
226 })
227}
228
229func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
230 user := s.oauth.GetMultiAccountUser(r)
231 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
232 LoggedInUser: user,
233 })
234}
235
236func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
237 user := s.oauth.GetMultiAccountUser(r)
238 s.pages.Brand(w, pages.BrandParams{
239 LoggedInUser: user,
240 })
241}
242
243func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
244 if s.oauth.GetMultiAccountUser(r) != nil {
245 s.Timeline(w, r)
246 return
247 }
248 s.Home(w, r)
249}
250
251func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
252 user := s.oauth.GetMultiAccountUser(r)
253
254 // TODO: set this flag based on the UI
255 filtered := false
256
257 var userDid string
258 if user != nil && user.Active != nil {
259 userDid = user.Active.Did
260 }
261 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
262 if err != nil {
263 s.logger.Error("failed to make timeline", "err", err)
264 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
265 }
266
267 repos, err := db.GetTopStarredReposLastWeek(s.db)
268 if err != nil {
269 s.logger.Error("failed to get top starred repos", "err", err)
270 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
271 return
272 }
273
274 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue))
275 if err != nil {
276 // non-fatal
277 }
278
279 s.pages.Timeline(w, pages.TimelineParams{
280 LoggedInUser: user,
281 Timeline: timeline,
282 Repos: repos,
283 GfiLabel: gfiLabel,
284 })
285}
286
287func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
288 user := s.oauth.GetMultiAccountUser(r)
289 if user == nil {
290 return
291 }
292
293 l := s.logger.With("handler", "UpgradeBanner")
294 l = l.With("did", user.Active.Did)
295
296 regs, err := db.GetRegistrations(
297 s.db,
298 orm.FilterEq("did", user.Active.Did),
299 orm.FilterEq("needs_upgrade", 1),
300 )
301 if err != nil {
302 l.Error("non-fatal: failed to get registrations", "err", err)
303 }
304
305 spindles, err := db.GetSpindles(
306 s.db,
307 orm.FilterEq("owner", user.Active.Did),
308 orm.FilterEq("needs_upgrade", 1),
309 )
310 if err != nil {
311 l.Error("non-fatal: failed to get spindles", "err", err)
312 }
313
314 if regs == nil && spindles == nil {
315 return
316 }
317
318 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
319 Registrations: regs,
320 Spindles: spindles,
321 })
322}
323
324func (s *State) Home(w http.ResponseWriter, r *http.Request) {
325 // TODO: set this flag based on the UI
326 filtered := false
327
328 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
329 if err != nil {
330 s.logger.Error("failed to make timeline", "err", err)
331 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
332 return
333 }
334
335 repos, err := db.GetTopStarredReposLastWeek(s.db)
336 if err != nil {
337 s.logger.Error("failed to get top starred repos", "err", err)
338 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
339 return
340 }
341
342 s.pages.Home(w, pages.TimelineParams{
343 LoggedInUser: nil,
344 Timeline: timeline,
345 Repos: repos,
346 })
347}
348
349func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
350 user := chi.URLParam(r, "user")
351 user = strings.TrimPrefix(user, "@")
352
353 if user == "" {
354 w.WriteHeader(http.StatusBadRequest)
355 return
356 }
357
358 id, err := s.idResolver.ResolveIdent(r.Context(), user)
359 if err != nil {
360 w.WriteHeader(http.StatusInternalServerError)
361 return
362 }
363
364 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
365 if err != nil {
366 s.logger.Error("failed to get public keys", "err", err)
367 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
368 return
369 }
370
371 if len(pubKeys) == 0 {
372 w.WriteHeader(http.StatusNoContent)
373 return
374 }
375
376 for _, k := range pubKeys {
377 key := strings.TrimRight(k.Key, "\n")
378 fmt.Fprintln(w, key)
379 }
380}
381
382func validateRepoName(name string) error {
383 // check for path traversal attempts
384 if name == "." || name == ".." ||
385 strings.Contains(name, "/") || strings.Contains(name, "\\") {
386 return fmt.Errorf("Repository name contains invalid path characters")
387 }
388
389 // check for sequences that could be used for traversal when normalized
390 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
391 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
392 return fmt.Errorf("Repository name contains invalid path sequence")
393 }
394
395 // then continue with character validation
396 for _, char := range name {
397 if !((char >= 'a' && char <= 'z') ||
398 (char >= 'A' && char <= 'Z') ||
399 (char >= '0' && char <= '9') ||
400 char == '-' || char == '_' || char == '.') {
401 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
402 }
403 }
404
405 // additional check to prevent multiple sequential dots
406 if strings.Contains(name, "..") {
407 return fmt.Errorf("Repository name cannot contain sequential dots")
408 }
409
410 // if all checks pass
411 return nil
412}
413
414func stripGitExt(name string) string {
415 return strings.TrimSuffix(name, ".git")
416}
417
418func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
419 switch r.Method {
420 case http.MethodGet:
421 user := s.oauth.GetMultiAccountUser(r)
422 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
423 if err != nil {
424 s.pages.Notice(w, "repo", "Invalid user account.")
425 return
426 }
427
428 s.pages.NewRepo(w, pages.NewRepoParams{
429 LoggedInUser: user,
430 Knots: knots,
431 })
432
433 case http.MethodPost:
434 l := s.logger.With("handler", "NewRepo")
435
436 user := s.oauth.GetMultiAccountUser(r)
437 l = l.With("did", user.Active.Did)
438
439 // form validation
440 domain := r.FormValue("domain")
441 if domain == "" {
442 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
443 return
444 }
445 l = l.With("knot", domain)
446
447 repoName := r.FormValue("name")
448 if repoName == "" {
449 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
450 return
451 }
452
453 if err := validateRepoName(repoName); err != nil {
454 s.pages.Notice(w, "repo", err.Error())
455 return
456 }
457 repoName = stripGitExt(repoName)
458 l = l.With("repoName", repoName)
459
460 defaultBranch := r.FormValue("branch")
461 if defaultBranch == "" {
462 defaultBranch = "main"
463 }
464 l = l.With("defaultBranch", defaultBranch)
465
466 description := r.FormValue("description")
467 if len([]rune(description)) > 140 {
468 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
469 return
470 }
471
472 // ACL validation
473 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
474 if err != nil || !ok {
475 l.Info("unauthorized")
476 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
477 return
478 }
479
480 // Check for existing repos
481 existingRepo, err := db.GetRepo(
482 s.db,
483 orm.FilterEq("did", user.Active.Did),
484 orm.FilterEq("name", repoName),
485 )
486 if err == nil && existingRepo != nil {
487 l.Info("repo exists")
488 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
489 return
490 }
491
492 // create atproto record for this repo
493 rkey := tid.TID()
494 repo := &models.Repo{
495 Did: user.Active.Did,
496 Name: repoName,
497 Knot: domain,
498 Rkey: rkey,
499 Description: description,
500 Created: time.Now(),
501 Labels: s.config.Label.DefaultLabelDefs,
502 }
503 record := repo.AsRecord()
504
505 atpClient, err := s.oauth.AuthorizedClient(r)
506 if err != nil {
507 l.Info("PDS write failed", "err", err)
508 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
509 return
510 }
511
512 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
513 Collection: tangled.RepoNSID,
514 Repo: user.Active.Did,
515 Rkey: rkey,
516 Record: &lexutil.LexiconTypeDecoder{
517 Val: &record,
518 },
519 })
520 if err != nil {
521 l.Info("PDS write failed", "err", err)
522 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
523 return
524 }
525
526 aturi := atresp.Uri
527 l = l.With("aturi", aturi)
528 l.Info("wrote to PDS")
529
530 tx, err := s.db.BeginTx(r.Context(), nil)
531 if err != nil {
532 l.Info("txn failed", "err", err)
533 s.pages.Notice(w, "repo", "Failed to save repository information.")
534 return
535 }
536
537 // The rollback function reverts a few things on failure:
538 // - the pending txn
539 // - the ACLs
540 // - the atproto record created
541 rollback := func() {
542 err1 := tx.Rollback()
543 err2 := s.enforcer.E.LoadPolicy()
544 err3 := rollbackRecord(context.Background(), aturi, atpClient)
545
546 // ignore txn complete errors, this is okay
547 if errors.Is(err1, sql.ErrTxDone) {
548 err1 = nil
549 }
550
551 if errs := errors.Join(err1, err2, err3); errs != nil {
552 l.Error("failed to rollback changes", "errs", errs)
553 return
554 }
555 }
556 defer rollback()
557
558 client, err := s.oauth.ServiceClient(
559 r,
560 oauth.WithService(domain),
561 oauth.WithLxm(tangled.RepoCreateNSID),
562 oauth.WithDev(s.config.Core.Dev),
563 )
564 if err != nil {
565 l.Error("service auth failed", "err", err)
566 s.pages.Notice(w, "repo", "Failed to reach PDS.")
567 return
568 }
569
570 xe := tangled.RepoCreate(
571 r.Context(),
572 client,
573 &tangled.RepoCreate_Input{
574 Rkey: rkey,
575 },
576 )
577 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
578 l.Error("xrpc error", "xe", xe)
579 s.pages.Notice(w, "repo", err.Error())
580 return
581 }
582
583 err = db.AddRepo(tx, repo)
584 if err != nil {
585 l.Error("db write failed", "err", err)
586 s.pages.Notice(w, "repo", "Failed to save repository information.")
587 return
588 }
589
590 // acls
591 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
592 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
593 if err != nil {
594 l.Error("acl setup failed", "err", err)
595 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
596 return
597 }
598
599 err = tx.Commit()
600 if err != nil {
601 l.Error("txn commit failed", "err", err)
602 http.Error(w, err.Error(), http.StatusInternalServerError)
603 return
604 }
605
606 err = s.enforcer.E.SavePolicy()
607 if err != nil {
608 l.Error("acl save failed", "err", err)
609 http.Error(w, err.Error(), http.StatusInternalServerError)
610 return
611 }
612
613 // reset the ATURI because the transaction completed successfully
614 aturi = ""
615
616 s.notifier.NewRepo(r.Context(), repo)
617 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
618 }
619}
620
621// this is used to rollback changes made to the PDS
622//
623// it is a no-op if the provided ATURI is empty
624func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
625 if aturi == "" {
626 return nil
627 }
628
629 parsed := syntax.ATURI(aturi)
630
631 collection := parsed.Collection().String()
632 repo := parsed.Authority().String()
633 rkey := parsed.RecordKey().String()
634
635 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
636 Collection: collection,
637 Repo: repo,
638 Rkey: rkey,
639 })
640 return err
641}
642
643func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
644 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
645 if err != nil {
646 return err
647 }
648 // already present
649 if len(defaultLabels) == len(defaults) {
650 return nil
651 }
652
653 labelDefs, err := models.FetchLabelDefs(r, defaults)
654 if err != nil {
655 return err
656 }
657
658 // Insert each label definition to the database
659 for _, labelDef := range labelDefs {
660 _, err = db.AddLabelDefinition(e, &labelDef)
661 if err != nil {
662 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
663 }
664 }
665
666 return nil
667}