A vibe coded tangled fork which supports pijul.
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/indexer"
19 "tangled.org/core/appview/mentions"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 dbnotify "tangled.org/core/appview/notify/db"
23 phnotify "tangled.org/core/appview/notify/posthog"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/validator"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/consts"
30 "tangled.org/core/eventconsumer"
31 "tangled.org/core/idresolver"
32 "tangled.org/core/jetstream"
33 "tangled.org/core/log"
34 tlog "tangled.org/core/log"
35 "tangled.org/core/orm"
36 "tangled.org/core/rbac"
37 "tangled.org/core/tid"
38
39 comatproto "github.com/bluesky-social/indigo/api/atproto"
40 atpclient "github.com/bluesky-social/indigo/atproto/client"
41 "github.com/bluesky-social/indigo/atproto/syntax"
42 lexutil "github.com/bluesky-social/indigo/lex/util"
43 "github.com/bluesky-social/indigo/xrpc"
44 securejoin "github.com/cyphar/filepath-securejoin"
45 "github.com/go-chi/chi/v5"
46 "github.com/posthog/posthog-go"
47)
48
49type State struct {
50 db *db.DB
51 notifier notify.Notifier
52 indexer *indexer.Indexer
53 oauth *oauth.OAuth
54 enforcer *rbac.Enforcer
55 pages *pages.Pages
56 idResolver *idresolver.Resolver
57 mentionsResolver *mentions.Resolver
58 posthog posthog.Client
59 jc *jetstream.JetstreamClient
60 config *config.Config
61 repoResolver *reporesolver.RepoResolver
62 knotstream *eventconsumer.Consumer
63 spindlestream *eventconsumer.Consumer
64 logger *slog.Logger
65 validator *validator.Validator
66}
67
68func Make(ctx context.Context, config *config.Config) (*State, error) {
69 logger := tlog.FromContext(ctx)
70
71 d, err := db.Make(ctx, config.Core.DbPath)
72 if err != nil {
73 return nil, fmt.Errorf("failed to create db: %w", err)
74 }
75
76 indexer := indexer.New(log.SubLogger(logger, "indexer"))
77 err = indexer.Init(ctx, d)
78 if err != nil {
79 return nil, fmt.Errorf("failed to create indexer: %w", err)
80 }
81
82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
83 if err != nil {
84 return nil, fmt.Errorf("failed to create enforcer: %w", err)
85 }
86
87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
88 if err != nil {
89 logger.Error("failed to create redis resolver", "err", err)
90 res = idresolver.DefaultResolver(config.Plc.PLCURL)
91 }
92
93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
94 if err != nil {
95 return nil, fmt.Errorf("failed to create posthog client: %w", err)
96 }
97
98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
100 if err != nil {
101 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
102 }
103 validator := validator.New(d, res, enforcer)
104
105 repoResolver := reporesolver.New(config, enforcer, d)
106
107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
108
109 wrapper := db.DbWrapper{Execer: d}
110 jc, err := jetstream.NewJetstreamClient(
111 config.Jetstream.Endpoint,
112 "appview",
113 []string{
114 tangled.GraphFollowNSID,
115 tangled.FeedStarNSID,
116 tangled.PublicKeyNSID,
117 tangled.RepoArtifactNSID,
118 tangled.ActorProfileNSID,
119 tangled.SpindleMemberNSID,
120 tangled.SpindleNSID,
121 tangled.StringNSID,
122 tangled.RepoIssueNSID,
123 tangled.RepoIssueCommentNSID,
124 tangled.LabelDefinitionNSID,
125 tangled.LabelOpNSID,
126 },
127 nil,
128 tlog.SubLogger(logger, "jetstream"),
129 wrapper,
130 false,
131
132 // in-memory filter is inapplicable to appview so
133 // we'll never log dids anyway.
134 false,
135 )
136 if err != nil {
137 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
138 }
139
140 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
141 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
142 }
143
144 ingester := appview.Ingester{
145 Db: wrapper,
146 Enforcer: enforcer,
147 IdResolver: res,
148 Config: config,
149 Logger: log.SubLogger(logger, "ingester"),
150 Validator: validator,
151 }
152 err = jc.StartJetstream(ctx, ingester.Ingest())
153 if err != nil {
154 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
155 }
156
157 var notifiers []notify.Notifier
158
159 // Always add the database notifier
160 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
161
162 // Add other notifiers in production only
163 if !config.Core.Dev {
164 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
165 }
166 notifiers = append(notifiers, indexer)
167
168 // Add webhook notifier
169 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
170
171 notifier := notify.NewMergedNotifier(notifiers)
172 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
173
174 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier)
175 if err != nil {
176 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
177 }
178 knotstream.Start(ctx)
179
180 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
181 if err != nil {
182 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
183 }
184 spindlestream.Start(ctx)
185
186 state := &State{
187 d,
188 notifier,
189 indexer,
190 oauth,
191 enforcer,
192 pages,
193 res,
194 mentionsResolver,
195 posthog,
196 jc,
197 config,
198 repoResolver,
199 knotstream,
200 spindlestream,
201 logger,
202 validator,
203 }
204
205 // fetch initial bluesky posts if configured
206 go fetchBskyPosts(ctx, res, config, d, logger)
207
208 return state, nil
209}
210
211func (s *State) Close() error {
212 // other close up logic goes here
213 return s.db.Close()
214}
215
216func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
217 w.Header().Set("Content-Type", "text/plain")
218 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
219
220 robotsTxt := `# Hello, Tanglers!
221User-agent: *
222Allow: /
223Disallow: /*/*/settings
224Disallow: /settings
225Disallow: /*/*/compare
226Disallow: /*/*/fork
227
228Crawl-delay: 1
229`
230 w.Write([]byte(robotsTxt))
231}
232
233func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
234 user := s.oauth.GetMultiAccountUser(r)
235 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
236 LoggedInUser: user,
237 })
238}
239
240func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
241 user := s.oauth.GetMultiAccountUser(r)
242 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
243 LoggedInUser: user,
244 })
245}
246
247func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
248 user := s.oauth.GetMultiAccountUser(r)
249 s.pages.Brand(w, pages.BrandParams{
250 LoggedInUser: user,
251 })
252}
253
254func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
255 if s.oauth.GetMultiAccountUser(r) != nil {
256 s.Timeline(w, r)
257 return
258 }
259 s.Home(w, r)
260}
261
262func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
263 user := s.oauth.GetMultiAccountUser(r)
264
265 // TODO: set this flag based on the UI
266 filtered := false
267
268 var userDid string
269 if user != nil && user.Active != nil {
270 userDid = user.Active.Did
271 }
272 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
273 if err != nil {
274 s.logger.Error("failed to make timeline", "err", err)
275 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
276 }
277
278 repos, err := db.GetTopStarredReposLastWeek(s.db)
279 if err != nil {
280 s.logger.Error("failed to get top starred repos", "err", err)
281 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
282 return
283 }
284
285 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue))
286 if err != nil {
287 // non-fatal
288 }
289
290 s.pages.Timeline(w, pages.TimelineParams{
291 LoggedInUser: user,
292 Timeline: timeline,
293 Repos: repos,
294 GfiLabel: gfiLabel,
295 })
296}
297
298func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
299 user := s.oauth.GetMultiAccountUser(r)
300 if user == nil {
301 return
302 }
303
304 l := s.logger.With("handler", "UpgradeBanner")
305 l = l.With("did", user.Active.Did)
306
307 regs, err := db.GetRegistrations(
308 s.db,
309 orm.FilterEq("did", user.Active.Did),
310 orm.FilterEq("needs_upgrade", 1),
311 )
312 if err != nil {
313 l.Error("non-fatal: failed to get registrations", "err", err)
314 }
315
316 spindles, err := db.GetSpindles(
317 s.db,
318 orm.FilterEq("owner", user.Active.Did),
319 orm.FilterEq("needs_upgrade", 1),
320 )
321 if err != nil {
322 l.Error("non-fatal: failed to get spindles", "err", err)
323 }
324
325 if regs == nil && spindles == nil {
326 return
327 }
328
329 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
330 Registrations: regs,
331 Spindles: spindles,
332 })
333}
334
335func (s *State) Home(w http.ResponseWriter, r *http.Request) {
336 // TODO: set this flag based on the UI
337 filtered := false
338
339 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
340 if err != nil {
341 s.logger.Error("failed to make timeline", "err", err)
342 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
343 return
344 }
345
346 repos, err := db.GetTopStarredReposLastWeek(s.db)
347 if err != nil {
348 s.logger.Error("failed to get top starred repos", "err", err)
349 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
350 return
351 }
352
353 s.pages.Home(w, pages.TimelineParams{
354 LoggedInUser: nil,
355 Timeline: timeline,
356 Repos: repos,
357 })
358}
359
360func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
361 user := chi.URLParam(r, "user")
362 user = strings.TrimPrefix(user, "@")
363
364 if user == "" {
365 w.WriteHeader(http.StatusBadRequest)
366 return
367 }
368
369 id, err := s.idResolver.ResolveIdent(r.Context(), user)
370 if err != nil {
371 w.WriteHeader(http.StatusInternalServerError)
372 return
373 }
374
375 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
376 if err != nil {
377 s.logger.Error("failed to get public keys", "err", err)
378 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
379 return
380 }
381
382 if len(pubKeys) == 0 {
383 w.WriteHeader(http.StatusNoContent)
384 return
385 }
386
387 for _, k := range pubKeys {
388 key := strings.TrimRight(k.Key, "\n")
389 fmt.Fprintln(w, key)
390 }
391}
392
393func validateRepoName(name string) error {
394 // check for path traversal attempts
395 if name == "." || name == ".." ||
396 strings.Contains(name, "/") || strings.Contains(name, "\\") {
397 return fmt.Errorf("Repository name contains invalid path characters")
398 }
399
400 // check for sequences that could be used for traversal when normalized
401 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
402 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
403 return fmt.Errorf("Repository name contains invalid path sequence")
404 }
405
406 // then continue with character validation
407 for _, char := range name {
408 if !((char >= 'a' && char <= 'z') ||
409 (char >= 'A' && char <= 'Z') ||
410 (char >= '0' && char <= '9') ||
411 char == '-' || char == '_' || char == '.') {
412 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
413 }
414 }
415
416 // additional check to prevent multiple sequential dots
417 if strings.Contains(name, "..") {
418 return fmt.Errorf("Repository name cannot contain sequential dots")
419 }
420
421 // if all checks pass
422 return nil
423}
424
425func stripGitExt(name string) string {
426 return strings.TrimSuffix(name, ".git")
427}
428
429func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
430 switch r.Method {
431 case http.MethodGet:
432 user := s.oauth.GetMultiAccountUser(r)
433 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
434 if err != nil {
435 s.pages.Notice(w, "repo", "Invalid user account.")
436 return
437 }
438
439 s.pages.NewRepo(w, pages.NewRepoParams{
440 LoggedInUser: user,
441 Knots: knots,
442 })
443
444 case http.MethodPost:
445 l := s.logger.With("handler", "NewRepo")
446
447 user := s.oauth.GetMultiAccountUser(r)
448 l = l.With("did", user.Active.Did)
449
450 // form validation
451 domain := r.FormValue("domain")
452 if domain == "" {
453 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
454 return
455 }
456 l = l.With("knot", domain)
457
458 repoName := r.FormValue("name")
459 if repoName == "" {
460 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
461 return
462 }
463
464 if err := validateRepoName(repoName); err != nil {
465 s.pages.Notice(w, "repo", err.Error())
466 return
467 }
468 repoName = stripGitExt(repoName)
469 l = l.With("repoName", repoName)
470
471 defaultBranch := r.FormValue("branch")
472 if defaultBranch == "" {
473 defaultBranch = "main"
474 }
475 l = l.With("defaultBranch", defaultBranch)
476
477 description := r.FormValue("description")
478 if len([]rune(description)) > 140 {
479 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
480 return
481 }
482
483 // ACL validation
484 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
485 if err != nil || !ok {
486 l.Info("unauthorized")
487 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
488 return
489 }
490
491 // Check for existing repos
492 existingRepo, err := db.GetRepo(
493 s.db,
494 orm.FilterEq("did", user.Active.Did),
495 orm.FilterEq("name", repoName),
496 )
497 if err == nil && existingRepo != nil {
498 l.Info("repo exists")
499 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
500 return
501 }
502
503 // create atproto record for this repo
504 rkey := tid.TID()
505 repo := &models.Repo{
506 Did: user.Active.Did,
507 Name: repoName,
508 Knot: domain,
509 Rkey: rkey,
510 Description: description,
511 Created: time.Now(),
512 Labels: s.config.Label.DefaultLabelDefs,
513 }
514 record := repo.AsRecord()
515
516 atpClient, err := s.oauth.AuthorizedClient(r)
517 if err != nil {
518 l.Info("PDS write failed", "err", err)
519 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
520 return
521 }
522
523 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
524 Collection: tangled.RepoNSID,
525 Repo: user.Active.Did,
526 Rkey: rkey,
527 Record: &lexutil.LexiconTypeDecoder{
528 Val: &record,
529 },
530 })
531 if err != nil {
532 l.Info("PDS write failed", "err", err)
533 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
534 return
535 }
536
537 aturi := atresp.Uri
538 l = l.With("aturi", aturi)
539 l.Info("wrote to PDS")
540
541 tx, err := s.db.BeginTx(r.Context(), nil)
542 if err != nil {
543 l.Info("txn failed", "err", err)
544 s.pages.Notice(w, "repo", "Failed to save repository information.")
545 return
546 }
547
548 // The rollback function reverts a few things on failure:
549 // - the pending txn
550 // - the ACLs
551 // - the atproto record created
552 rollback := func() {
553 err1 := tx.Rollback()
554 err2 := s.enforcer.E.LoadPolicy()
555 err3 := rollbackRecord(context.Background(), aturi, atpClient)
556
557 // ignore txn complete errors, this is okay
558 if errors.Is(err1, sql.ErrTxDone) {
559 err1 = nil
560 }
561
562 if errs := errors.Join(err1, err2, err3); errs != nil {
563 l.Error("failed to rollback changes", "errs", errs)
564 return
565 }
566 }
567 defer rollback()
568
569 client, err := s.oauth.ServiceClient(
570 r,
571 oauth.WithService(domain),
572 oauth.WithLxm(tangled.RepoCreateNSID),
573 oauth.WithDev(s.config.Core.Dev),
574 )
575 if err != nil {
576 l.Error("service auth failed", "err", err)
577 s.pages.Notice(w, "repo", "Failed to reach PDS.")
578 return
579 }
580
581 xe := tangled.RepoCreate(
582 r.Context(),
583 client,
584 &tangled.RepoCreate_Input{
585 Rkey: rkey,
586 },
587 )
588 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
589 l.Error("xrpc error", "xe", xe)
590 s.pages.Notice(w, "repo", err.Error())
591 return
592 }
593
594 err = db.AddRepo(tx, repo)
595 if err != nil {
596 l.Error("db write failed", "err", err)
597 s.pages.Notice(w, "repo", "Failed to save repository information.")
598 return
599 }
600
601 // acls
602 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
603 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
604 if err != nil {
605 l.Error("acl setup failed", "err", err)
606 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
607 return
608 }
609
610 err = tx.Commit()
611 if err != nil {
612 l.Error("txn commit failed", "err", err)
613 http.Error(w, err.Error(), http.StatusInternalServerError)
614 return
615 }
616
617 err = s.enforcer.E.SavePolicy()
618 if err != nil {
619 l.Error("acl save failed", "err", err)
620 http.Error(w, err.Error(), http.StatusInternalServerError)
621 return
622 }
623
624 // reset the ATURI because the transaction completed successfully
625 aturi = ""
626
627 s.notifier.NewRepo(r.Context(), repo)
628 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
629 }
630}
631
632// this is used to rollback changes made to the PDS
633//
634// it is a no-op if the provided ATURI is empty
635func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
636 if aturi == "" {
637 return nil
638 }
639
640 parsed := syntax.ATURI(aturi)
641
642 collection := parsed.Collection().String()
643 repo := parsed.Authority().String()
644 rkey := parsed.RecordKey().String()
645
646 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
647 Collection: collection,
648 Repo: repo,
649 Rkey: rkey,
650 })
651 return err
652}
653
654func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
655 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
656 if err != nil {
657 return err
658 }
659 // already present
660 if len(defaultLabels) == len(defaults) {
661 return nil
662 }
663
664 labelDefs, err := models.FetchLabelDefs(r, defaults)
665 if err != nil {
666 return err
667 }
668
669 // Insert each label definition to the database
670 for _, labelDef := range labelDefs {
671 _, err = db.AddLabelDefinition(e, &labelDef)
672 if err != nil {
673 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
674 }
675 }
676
677 return nil
678}