A vibe coded tangled fork which supports pijul.
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/indexer"
18 "tangled.org/core/appview/mentions"
19 "tangled.org/core/appview/models"
20 "tangled.org/core/appview/notify"
21 dbnotify "tangled.org/core/appview/notify/db"
22 phnotify "tangled.org/core/appview/notify/posthog"
23 "tangled.org/core/appview/oauth"
24 "tangled.org/core/appview/pages"
25 "tangled.org/core/appview/reporesolver"
26 xrpcclient "tangled.org/core/appview/xrpcclient"
27 "tangled.org/core/eventconsumer"
28 "tangled.org/core/idresolver"
29 "tangled.org/core/jetstream"
30 "tangled.org/core/log"
31 tlog "tangled.org/core/log"
32 "tangled.org/core/orm"
33 "tangled.org/core/rbac"
34 "tangled.org/core/tid"
35
36 comatproto "github.com/bluesky-social/indigo/api/atproto"
37 atpclient "github.com/bluesky-social/indigo/atproto/client"
38 "github.com/bluesky-social/indigo/atproto/syntax"
39 lexutil "github.com/bluesky-social/indigo/lex/util"
40 securejoin "github.com/cyphar/filepath-securejoin"
41 "github.com/go-chi/chi/v5"
42 "github.com/posthog/posthog-go"
43)
44
45type State struct {
46 db *db.DB
47 notifier notify.Notifier
48 indexer *indexer.Indexer
49 oauth *oauth.OAuth
50 enforcer *rbac.Enforcer
51 pages *pages.Pages
52 idResolver *idresolver.Resolver
53 mentionsResolver *mentions.Resolver
54 posthog posthog.Client
55 jc *jetstream.JetstreamClient
56 config *config.Config
57 repoResolver *reporesolver.RepoResolver
58 knotstream *eventconsumer.Consumer
59 spindlestream *eventconsumer.Consumer
60 logger *slog.Logger
61}
62
63func Make(ctx context.Context, config *config.Config) (*State, error) {
64 logger := tlog.FromContext(ctx)
65
66 d, err := db.Make(ctx, config.Core.DbPath)
67 if err != nil {
68 return nil, fmt.Errorf("failed to create db: %w", err)
69 }
70
71 indexer := indexer.New(log.SubLogger(logger, "indexer"))
72 err = indexer.Init(ctx, d)
73 if err != nil {
74 return nil, fmt.Errorf("failed to create indexer: %w", err)
75 }
76
77 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
78 if err != nil {
79 return nil, fmt.Errorf("failed to create enforcer: %w", err)
80 }
81
82 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
83 if err != nil {
84 logger.Error("failed to create redis resolver", "err", err)
85 res = idresolver.DefaultResolver(config.Plc.PLCURL)
86 }
87
88 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
89 if err != nil {
90 return nil, fmt.Errorf("failed to create posthog client: %w", err)
91 }
92
93 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
94 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
95 if err != nil {
96 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
97 }
98
99 repoResolver := reporesolver.New(config, enforcer, d)
100
101 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
102
103 wrapper := db.DbWrapper{Execer: d}
104 jc, err := jetstream.NewJetstreamClient(
105 config.Jetstream.Endpoint,
106 "appview",
107 []string{
108 tangled.GraphFollowNSID,
109 tangled.FeedStarNSID,
110 tangled.PublicKeyNSID,
111 tangled.RepoArtifactNSID,
112 tangled.ActorProfileNSID,
113 tangled.SpindleMemberNSID,
114 tangled.SpindleNSID,
115 tangled.StringNSID,
116 tangled.RepoIssueNSID,
117 tangled.RepoIssueCommentNSID,
118 tangled.LabelDefinitionNSID,
119 tangled.LabelOpNSID,
120 },
121 nil,
122 tlog.SubLogger(logger, "jetstream"),
123 wrapper,
124 false,
125
126 // in-memory filter is inapplicalble to appview so
127 // we'll never log dids anyway.
128 false,
129 )
130 if err != nil {
131 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
132 }
133
134 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
135 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
136 }
137
138 ingester := appview.Ingester{
139 Db: wrapper,
140 Enforcer: enforcer,
141 IdResolver: res,
142 Config: config,
143 Logger: log.SubLogger(logger, "ingester"),
144 }
145 err = jc.StartJetstream(ctx, ingester.Ingest())
146 if err != nil {
147 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
148 }
149
150 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog)
151 if err != nil {
152 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
153 }
154 knotstream.Start(ctx)
155
156 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
157 if err != nil {
158 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
159 }
160 spindlestream.Start(ctx)
161
162 var notifiers []notify.Notifier
163
164 // Always add the database notifier
165 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
166
167 // Add other notifiers in production only
168 if !config.Core.Dev {
169 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
170 }
171 notifiers = append(notifiers, indexer)
172 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify"))
173
174 state := &State{
175 d,
176 notifier,
177 indexer,
178 oauth,
179 enforcer,
180 pages,
181 res,
182 mentionsResolver,
183 posthog,
184 jc,
185 config,
186 repoResolver,
187 knotstream,
188 spindlestream,
189 logger,
190 }
191
192 return state, nil
193}
194
195func (s *State) Close() error {
196 // other close up logic goes here
197 return s.db.Close()
198}
199
200func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
201 w.Header().Set("Content-Type", "text/plain")
202 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
203
204 robotsTxt := `User-agent: *
205Allow: /
206`
207 w.Write([]byte(robotsTxt))
208}
209
210func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
211 user := s.oauth.GetMultiAccountUser(r)
212 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
213 LoggedInUser: user,
214 })
215}
216
217func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
218 user := s.oauth.GetMultiAccountUser(r)
219 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
220 LoggedInUser: user,
221 })
222}
223
224func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
225 user := s.oauth.GetMultiAccountUser(r)
226 s.pages.Brand(w, pages.BrandParams{
227 LoggedInUser: user,
228 })
229}
230
231func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) {
232 if s.oauth.GetMultiAccountUser(r) != nil {
233 s.Timeline(w, r)
234 return
235 }
236 s.Home(w, r)
237}
238
239func (s *State) Timeline(w http.ResponseWriter, r *http.Request) {
240 user := s.oauth.GetMultiAccountUser(r)
241
242 // TODO: set this flag based on the UI
243 filtered := false
244
245 var userDid string
246 if user != nil && user.Active != nil {
247 userDid = user.Active.Did
248 }
249 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered)
250 if err != nil {
251 s.logger.Error("failed to make timeline", "err", err)
252 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
253 }
254
255 repos, err := db.GetTopStarredReposLastWeek(s.db)
256 if err != nil {
257 s.logger.Error("failed to get top starred repos", "err", err)
258 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
259 return
260 }
261
262 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue))
263 if err != nil {
264 // non-fatal
265 }
266
267 s.pages.Timeline(w, pages.TimelineParams{
268 LoggedInUser: user,
269 Timeline: timeline,
270 Repos: repos,
271 GfiLabel: gfiLabel,
272 })
273}
274
275func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
276 user := s.oauth.GetMultiAccountUser(r)
277 if user == nil {
278 return
279 }
280
281 l := s.logger.With("handler", "UpgradeBanner")
282 l = l.With("did", user.Active.Did)
283
284 regs, err := db.GetRegistrations(
285 s.db,
286 orm.FilterEq("did", user.Active.Did),
287 orm.FilterEq("needs_upgrade", 1),
288 )
289 if err != nil {
290 l.Error("non-fatal: failed to get registrations", "err", err)
291 }
292
293 spindles, err := db.GetSpindles(
294 s.db,
295 orm.FilterEq("owner", user.Active.Did),
296 orm.FilterEq("needs_upgrade", 1),
297 )
298 if err != nil {
299 l.Error("non-fatal: failed to get spindles", "err", err)
300 }
301
302 if regs == nil && spindles == nil {
303 return
304 }
305
306 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
307 Registrations: regs,
308 Spindles: spindles,
309 })
310}
311
312func (s *State) Home(w http.ResponseWriter, r *http.Request) {
313 // TODO: set this flag based on the UI
314 filtered := false
315
316 timeline, err := db.MakeTimeline(s.db, 5, "", filtered)
317 if err != nil {
318 s.logger.Error("failed to make timeline", "err", err)
319 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.")
320 return
321 }
322
323 repos, err := db.GetTopStarredReposLastWeek(s.db)
324 if err != nil {
325 s.logger.Error("failed to get top starred repos", "err", err)
326 s.pages.Notice(w, "topstarredrepos", "Unable to load.")
327 return
328 }
329
330 s.pages.Home(w, pages.TimelineParams{
331 LoggedInUser: nil,
332 Timeline: timeline,
333 Repos: repos,
334 })
335}
336
337func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
338 user := chi.URLParam(r, "user")
339 user = strings.TrimPrefix(user, "@")
340
341 if user == "" {
342 w.WriteHeader(http.StatusBadRequest)
343 return
344 }
345
346 id, err := s.idResolver.ResolveIdent(r.Context(), user)
347 if err != nil {
348 w.WriteHeader(http.StatusInternalServerError)
349 return
350 }
351
352 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
353 if err != nil {
354 s.logger.Error("failed to get public keys", "err", err)
355 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
356 return
357 }
358
359 if len(pubKeys) == 0 {
360 w.WriteHeader(http.StatusNoContent)
361 return
362 }
363
364 for _, k := range pubKeys {
365 key := strings.TrimRight(k.Key, "\n")
366 fmt.Fprintln(w, key)
367 }
368}
369
370func validateRepoName(name string) error {
371 // check for path traversal attempts
372 if name == "." || name == ".." ||
373 strings.Contains(name, "/") || strings.Contains(name, "\\") {
374 return fmt.Errorf("Repository name contains invalid path characters")
375 }
376
377 // check for sequences that could be used for traversal when normalized
378 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
379 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
380 return fmt.Errorf("Repository name contains invalid path sequence")
381 }
382
383 // then continue with character validation
384 for _, char := range name {
385 if !((char >= 'a' && char <= 'z') ||
386 (char >= 'A' && char <= 'Z') ||
387 (char >= '0' && char <= '9') ||
388 char == '-' || char == '_' || char == '.') {
389 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
390 }
391 }
392
393 // additional check to prevent multiple sequential dots
394 if strings.Contains(name, "..") {
395 return fmt.Errorf("Repository name cannot contain sequential dots")
396 }
397
398 // if all checks pass
399 return nil
400}
401
402func stripGitExt(name string) string {
403 return strings.TrimSuffix(name, ".git")
404}
405
406func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
407 switch r.Method {
408 case http.MethodGet:
409 user := s.oauth.GetMultiAccountUser(r)
410 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
411 if err != nil {
412 s.pages.Notice(w, "repo", "Invalid user account.")
413 return
414 }
415
416 s.pages.NewRepo(w, pages.NewRepoParams{
417 LoggedInUser: user,
418 Knots: knots,
419 })
420
421 case http.MethodPost:
422 l := s.logger.With("handler", "NewRepo")
423
424 user := s.oauth.GetMultiAccountUser(r)
425 l = l.With("did", user.Active.Did)
426
427 // form validation
428 domain := r.FormValue("domain")
429 if domain == "" {
430 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
431 return
432 }
433 l = l.With("knot", domain)
434
435 repoName := r.FormValue("name")
436 if repoName == "" {
437 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
438 return
439 }
440
441 if err := validateRepoName(repoName); err != nil {
442 s.pages.Notice(w, "repo", err.Error())
443 return
444 }
445 repoName = stripGitExt(repoName)
446 l = l.With("repoName", repoName)
447
448 defaultBranch := r.FormValue("branch")
449 if defaultBranch == "" {
450 defaultBranch = "main"
451 }
452 l = l.With("defaultBranch", defaultBranch)
453
454 description := r.FormValue("description")
455
456 // ACL validation
457 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
458 if err != nil || !ok {
459 l.Info("unauthorized")
460 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
461 return
462 }
463
464 // Check for existing repos
465 existingRepo, err := db.GetRepo(
466 s.db,
467 orm.FilterEq("did", user.Active.Did),
468 orm.FilterEq("name", repoName),
469 )
470 if err == nil && existingRepo != nil {
471 l.Info("repo exists")
472 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
473 return
474 }
475
476 // create atproto record for this repo
477 rkey := tid.TID()
478 repo := &models.Repo{
479 Did: user.Active.Did,
480 Name: repoName,
481 Knot: domain,
482 Rkey: rkey,
483 Description: description,
484 Created: time.Now(),
485 Labels: s.config.Label.DefaultLabelDefs,
486 }
487 record := repo.AsRecord()
488
489 atpClient, err := s.oauth.AuthorizedClient(r)
490 if err != nil {
491 l.Info("PDS write failed", "err", err)
492 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
493 return
494 }
495
496 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
497 Collection: tangled.RepoNSID,
498 Repo: user.Active.Did,
499 Rkey: rkey,
500 Record: &lexutil.LexiconTypeDecoder{
501 Val: &record,
502 },
503 })
504 if err != nil {
505 l.Info("PDS write failed", "err", err)
506 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
507 return
508 }
509
510 aturi := atresp.Uri
511 l = l.With("aturi", aturi)
512 l.Info("wrote to PDS")
513
514 tx, err := s.db.BeginTx(r.Context(), nil)
515 if err != nil {
516 l.Info("txn failed", "err", err)
517 s.pages.Notice(w, "repo", "Failed to save repository information.")
518 return
519 }
520
521 // The rollback function reverts a few things on failure:
522 // - the pending txn
523 // - the ACLs
524 // - the atproto record created
525 rollback := func() {
526 err1 := tx.Rollback()
527 err2 := s.enforcer.E.LoadPolicy()
528 err3 := rollbackRecord(context.Background(), aturi, atpClient)
529
530 // ignore txn complete errors, this is okay
531 if errors.Is(err1, sql.ErrTxDone) {
532 err1 = nil
533 }
534
535 if errs := errors.Join(err1, err2, err3); errs != nil {
536 l.Error("failed to rollback changes", "errs", errs)
537 return
538 }
539 }
540 defer rollback()
541
542 client, err := s.oauth.ServiceClient(
543 r,
544 oauth.WithService(domain),
545 oauth.WithLxm(tangled.RepoCreateNSID),
546 oauth.WithDev(s.config.Core.Dev),
547 )
548 if err != nil {
549 l.Error("service auth failed", "err", err)
550 s.pages.Notice(w, "repo", "Failed to reach PDS.")
551 return
552 }
553
554 xe := tangled.RepoCreate(
555 r.Context(),
556 client,
557 &tangled.RepoCreate_Input{
558 Rkey: rkey,
559 },
560 )
561 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
562 l.Error("xrpc error", "xe", xe)
563 s.pages.Notice(w, "repo", err.Error())
564 return
565 }
566
567 err = db.AddRepo(tx, repo)
568 if err != nil {
569 l.Error("db write failed", "err", err)
570 s.pages.Notice(w, "repo", "Failed to save repository information.")
571 return
572 }
573
574 // acls
575 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
576 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
577 if err != nil {
578 l.Error("acl setup failed", "err", err)
579 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
580 return
581 }
582
583 err = tx.Commit()
584 if err != nil {
585 l.Error("txn commit failed", "err", err)
586 http.Error(w, err.Error(), http.StatusInternalServerError)
587 return
588 }
589
590 err = s.enforcer.E.SavePolicy()
591 if err != nil {
592 l.Error("acl save failed", "err", err)
593 http.Error(w, err.Error(), http.StatusInternalServerError)
594 return
595 }
596
597 // reset the ATURI because the transaction completed successfully
598 aturi = ""
599
600 s.notifier.NewRepo(r.Context(), repo)
601 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
602 }
603}
604
605// this is used to rollback changes made to the PDS
606//
607// it is a no-op if the provided ATURI is empty
608func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
609 if aturi == "" {
610 return nil
611 }
612
613 parsed := syntax.ATURI(aturi)
614
615 collection := parsed.Collection().String()
616 repo := parsed.Authority().String()
617 rkey := parsed.RecordKey().String()
618
619 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
620 Collection: collection,
621 Repo: repo,
622 Rkey: rkey,
623 })
624 return err
625}
626
627func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
628 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
629 if err != nil {
630 return err
631 }
632 // already present
633 if len(defaultLabels) == len(defaults) {
634 return nil
635 }
636
637 labelDefs, err := models.FetchLabelDefs(r, defaults)
638 if err != nil {
639 return err
640 }
641
642 // Insert each label definition to the database
643 for _, labelDef := range labelDefs {
644 _, err = db.AddLabelDefinition(e, &labelDef)
645 if err != nil {
646 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
647 }
648 }
649
650 return nil
651}