A vibe coded tangled fork which supports pijul.
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cloudflare"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/indexer"
20 "tangled.org/core/appview/mentions"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/notify"
23 dbnotify "tangled.org/core/appview/notify/db"
24 phnotify "tangled.org/core/appview/notify/posthog"
25 "tangled.org/core/appview/oauth"
26 "tangled.org/core/appview/pages"
27 "tangled.org/core/appview/reporesolver"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/consts"
30 "tangled.org/core/eventconsumer"
31 "tangled.org/core/idresolver"
32 "tangled.org/core/jetstream"
33 "tangled.org/core/log"
34 tlog "tangled.org/core/log"
35 "tangled.org/core/orm"
36 "tangled.org/core/rbac"
37 "tangled.org/core/tid"
38
39 comatproto "github.com/bluesky-social/indigo/api/atproto"
40 atpclient "github.com/bluesky-social/indigo/atproto/client"
41 "github.com/bluesky-social/indigo/atproto/syntax"
42 lexutil "github.com/bluesky-social/indigo/lex/util"
43 "github.com/bluesky-social/indigo/xrpc"
44 securejoin "github.com/cyphar/filepath-securejoin"
45 "github.com/go-chi/chi/v5"
46 "github.com/posthog/posthog-go"
47)
48
49type State struct {
50 db *db.DB
51 notifier notify.Notifier
52 indexer *indexer.Indexer
53 oauth *oauth.OAuth
54 enforcer *rbac.Enforcer
55 pages *pages.Pages
56 idResolver *idresolver.Resolver
57 mentionsResolver *mentions.Resolver
58 posthog posthog.Client
59 jc *jetstream.JetstreamClient
60 config *config.Config
61 repoResolver *reporesolver.RepoResolver
62 knotstream *eventconsumer.Consumer
63 spindlestream *eventconsumer.Consumer
64 logger *slog.Logger
65 cfClient *cloudflare.Client
66}
67
68func Make(ctx context.Context, config *config.Config) (*State, error) {
69 logger := tlog.FromContext(ctx)
70
71 d, err := db.Make(ctx, config.Core.DbPath)
72 if err != nil {
73 return nil, fmt.Errorf("failed to create db: %w", err)
74 }
75
76 indexer := indexer.New(log.SubLogger(logger, "indexer"))
77 err = indexer.Init(ctx, d)
78 if err != nil {
79 return nil, fmt.Errorf("failed to create indexer: %w", err)
80 }
81
82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
83 if err != nil {
84 return nil, fmt.Errorf("failed to create enforcer: %w", err)
85 }
86
87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
88 if err != nil {
89 logger.Error("failed to create redis resolver", "err", err)
90 res = idresolver.DefaultResolver(config.Plc.PLCURL)
91 }
92
93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
94 if err != nil {
95 return nil, fmt.Errorf("failed to create posthog client: %w", err)
96 }
97
98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
100 if err != nil {
101 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
102 }
103
104 repoResolver := reporesolver.New(config, enforcer, d)
105
106 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
107
108 wrapper := db.DbWrapper{Execer: d}
109 jc, err := jetstream.NewJetstreamClient(
110 config.Jetstream.Endpoint,
111 "appview",
112 []string{
113 tangled.GraphFollowNSID,
114 tangled.FeedStarNSID,
115 tangled.PublicKeyNSID,
116 tangled.RepoArtifactNSID,
117 tangled.ActorProfileNSID,
118 tangled.KnotMemberNSID,
119 tangled.SpindleMemberNSID,
120 tangled.SpindleNSID,
121 tangled.StringNSID,
122 tangled.RepoIssueNSID,
123 tangled.RepoIssueCommentNSID,
124 tangled.LabelDefinitionNSID,
125 tangled.LabelOpNSID,
126 },
127 nil,
128 tlog.SubLogger(logger, "jetstream"),
129 wrapper,
130 false,
131
132 // in-memory filter is inapplicable to appview so
133 // we'll never log dids anyway.
134 false,
135 )
136 if err != nil {
137 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
138 }
139
140 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
141 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
142 }
143
144 ingester := appview.Ingester{
145 Db: wrapper,
146 Enforcer: enforcer,
147 IdResolver: res,
148 Config: config,
149 Logger: log.SubLogger(logger, "ingester"),
150 }
151 err = jc.StartJetstream(ctx, ingester.Ingest())
152 if err != nil {
153 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
154 }
155
156 var notifiers []notify.Notifier
157
158 // Always add the database notifier
159 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
160
161 // Add other notifiers in production only
162 if !config.Core.Dev {
163 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
164 }
165 notifiers = append(notifiers, indexer)
166
167 // Add webhook notifier
168 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
169
170 notifier := notify.NewMergedNotifier(notifiers)
171 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
172
173 var cfClient *cloudflare.Client
174 if config.Cloudflare.ApiToken != "" {
175 cfClient, err = cloudflare.New(config)
176 if err != nil {
177 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
178 cfClient = nil
179 }
180 }
181
182 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
183 if err != nil {
184 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
185 }
186 knotstream.Start(ctx)
187
188 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
189 if err != nil {
190 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
191 }
192 spindlestream.Start(ctx)
193
194 state := &State{
195 db: d,
196 notifier: notifier,
197 indexer: indexer,
198 oauth: oauth,
199 enforcer: enforcer,
200 pages: pages,
201 idResolver: res,
202 mentionsResolver: mentionsResolver,
203 posthog: posthog,
204 jc: jc,
205 config: config,
206 repoResolver: repoResolver,
207 knotstream: knotstream,
208 spindlestream: spindlestream,
209 logger: logger,
210 cfClient: cfClient,
211 }
212
213 // fetch initial bluesky posts if configured
214 go fetchBskyPosts(ctx, res, config, d, logger)
215
216 return state, nil
217}
218
219func (s *State) Close() error {
220 // other close up logic goes here
221 return s.db.Close()
222}
223
224func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
225 w.Header().Set("Content-Type", "text/plain")
226 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
227
228 robotsTxt := `# Hello, Tanglers!
229User-agent: *
230Allow: /
231Disallow: /*/*/settings
232Disallow: /settings
233Disallow: /*/*/compare
234Disallow: /*/*/fork
235
236Crawl-delay: 1
237`
238 w.Write([]byte(robotsTxt))
239}
240
241func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
242 user := s.oauth.GetMultiAccountUser(r)
243 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
244 LoggedInUser: user,
245 })
246}
247
248func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
249 user := s.oauth.GetMultiAccountUser(r)
250 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
251 LoggedInUser: user,
252 })
253}
254
255func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
256 user := s.oauth.GetMultiAccountUser(r)
257 s.pages.Brand(w, pages.BrandParams{
258 LoggedInUser: user,
259 })
260}
261
262func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
263 user := s.oauth.GetMultiAccountUser(r)
264 if user == nil {
265 return
266 }
267
268 l := s.logger.With("handler", "UpgradeBanner")
269 l = l.With("did", user.Active.Did)
270
271 regs, err := db.GetRegistrations(
272 s.db,
273 orm.FilterEq("did", user.Active.Did),
274 orm.FilterEq("needs_upgrade", 1),
275 )
276 if err != nil {
277 l.Error("non-fatal: failed to get registrations", "err", err)
278 }
279
280 spindles, err := db.GetSpindles(
281 s.db,
282 orm.FilterEq("owner", user.Active.Did),
283 orm.FilterEq("needs_upgrade", 1),
284 )
285 if err != nil {
286 l.Error("non-fatal: failed to get spindles", "err", err)
287 }
288
289 if regs == nil && spindles == nil {
290 return
291 }
292
293 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
294 Registrations: regs,
295 Spindles: spindles,
296 })
297}
298
299func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
300 user := chi.URLParam(r, "user")
301 user = strings.TrimPrefix(user, "@")
302
303 if user == "" {
304 w.WriteHeader(http.StatusBadRequest)
305 return
306 }
307
308 id, err := s.idResolver.ResolveIdent(r.Context(), user)
309 if err != nil {
310 w.WriteHeader(http.StatusInternalServerError)
311 return
312 }
313
314 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
315 if err != nil {
316 s.logger.Error("failed to get public keys", "err", err)
317 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
318 return
319 }
320
321 if len(pubKeys) == 0 {
322 w.WriteHeader(http.StatusNoContent)
323 return
324 }
325
326 for _, k := range pubKeys {
327 key := strings.TrimRight(k.Key, "\n")
328 fmt.Fprintln(w, key)
329 }
330}
331
332func validateRepoName(name string) error {
333 // check for path traversal attempts
334 if name == "." || name == ".." ||
335 strings.Contains(name, "/") || strings.Contains(name, "\\") {
336 return fmt.Errorf("Repository name contains invalid path characters")
337 }
338
339 // check for sequences that could be used for traversal when normalized
340 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
341 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
342 return fmt.Errorf("Repository name contains invalid path sequence")
343 }
344
345 // then continue with character validation
346 for _, char := range name {
347 if !((char >= 'a' && char <= 'z') ||
348 (char >= 'A' && char <= 'Z') ||
349 (char >= '0' && char <= '9') ||
350 char == '-' || char == '_' || char == '.') {
351 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
352 }
353 }
354
355 // additional check to prevent multiple sequential dots
356 if strings.Contains(name, "..") {
357 return fmt.Errorf("Repository name cannot contain sequential dots")
358 }
359
360 // if all checks pass
361 return nil
362}
363
364func stripGitExt(name string) string {
365 return strings.TrimSuffix(name, ".git")
366}
367
368func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
369 switch r.Method {
370 case http.MethodGet:
371 user := s.oauth.GetMultiAccountUser(r)
372 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
373 if err != nil {
374 s.pages.Notice(w, "repo", "Invalid user account.")
375 return
376 }
377
378 s.pages.NewRepo(w, pages.NewRepoParams{
379 LoggedInUser: user,
380 Knots: knots,
381 })
382
383 case http.MethodPost:
384 l := s.logger.With("handler", "NewRepo")
385
386 user := s.oauth.GetMultiAccountUser(r)
387 l = l.With("did", user.Active.Did)
388
389 // form validation
390 domain := r.FormValue("domain")
391 if domain == "" {
392 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
393 return
394 }
395 l = l.With("knot", domain)
396
397 repoName := r.FormValue("name")
398 if repoName == "" {
399 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
400 return
401 }
402
403 if err := validateRepoName(repoName); err != nil {
404 s.pages.Notice(w, "repo", err.Error())
405 return
406 }
407 repoName = stripGitExt(repoName)
408 l = l.With("repoName", repoName)
409
410 defaultBranch := r.FormValue("branch")
411 if defaultBranch == "" {
412 defaultBranch = "main"
413 }
414 l = l.With("defaultBranch", defaultBranch)
415
416 description := r.FormValue("description")
417 if len([]rune(description)) > 140 {
418 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
419 return
420 }
421
422 // ACL validation
423 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
424 if err != nil || !ok {
425 l.Info("unauthorized")
426 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
427 return
428 }
429
430 // Check for existing repos
431 existingRepo, err := db.GetRepo(
432 s.db,
433 orm.FilterEq("did", user.Active.Did),
434 orm.FilterEq("name", repoName),
435 )
436 if err == nil && existingRepo != nil {
437 l.Info("repo exists")
438 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
439 return
440 }
441
442 // create atproto record for this repo
443 rkey := tid.TID()
444 repo := &models.Repo{
445 Did: user.Active.Did,
446 Name: repoName,
447 Knot: domain,
448 Rkey: rkey,
449 Description: description,
450 Created: time.Now(),
451 Labels: s.config.Label.DefaultLabelDefs,
452 }
453 record := repo.AsRecord()
454
455 atpClient, err := s.oauth.AuthorizedClient(r)
456 if err != nil {
457 l.Info("PDS write failed", "err", err)
458 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
459 return
460 }
461
462 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
463 Collection: tangled.RepoNSID,
464 Repo: user.Active.Did,
465 Rkey: rkey,
466 Record: &lexutil.LexiconTypeDecoder{
467 Val: &record,
468 },
469 })
470 if err != nil {
471 l.Info("PDS write failed", "err", err)
472 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
473 return
474 }
475
476 aturi := atresp.Uri
477 l = l.With("aturi", aturi)
478 l.Info("wrote to PDS")
479
480 tx, err := s.db.BeginTx(r.Context(), nil)
481 if err != nil {
482 l.Info("txn failed", "err", err)
483 s.pages.Notice(w, "repo", "Failed to save repository information.")
484 return
485 }
486
487 // The rollback function reverts a few things on failure:
488 // - the pending txn
489 // - the ACLs
490 // - the atproto record created
491 rollback := func() {
492 err1 := tx.Rollback()
493 err2 := s.enforcer.E.LoadPolicy()
494 err3 := rollbackRecord(context.Background(), aturi, atpClient)
495
496 // ignore txn complete errors, this is okay
497 if errors.Is(err1, sql.ErrTxDone) {
498 err1 = nil
499 }
500
501 if errs := errors.Join(err1, err2, err3); errs != nil {
502 l.Error("failed to rollback changes", "errs", errs)
503 return
504 }
505 }
506 defer rollback()
507
508 client, err := s.oauth.ServiceClient(
509 r,
510 oauth.WithService(domain),
511 oauth.WithLxm(tangled.RepoCreateNSID),
512 oauth.WithDev(s.config.Core.Dev),
513 )
514 if err != nil {
515 l.Error("service auth failed", "err", err)
516 s.pages.Notice(w, "repo", "Failed to reach PDS.")
517 return
518 }
519
520 xe := tangled.RepoCreate(
521 r.Context(),
522 client,
523 &tangled.RepoCreate_Input{
524 Rkey: rkey,
525 },
526 )
527 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
528 l.Error("xrpc error", "xe", xe)
529 s.pages.Notice(w, "repo", err.Error())
530 return
531 }
532
533 err = db.AddRepo(tx, repo)
534 if err != nil {
535 l.Error("db write failed", "err", err)
536 s.pages.Notice(w, "repo", "Failed to save repository information.")
537 return
538 }
539
540 // acls
541 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
542 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
543 if err != nil {
544 l.Error("acl setup failed", "err", err)
545 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
546 return
547 }
548
549 err = tx.Commit()
550 if err != nil {
551 l.Error("txn commit failed", "err", err)
552 http.Error(w, err.Error(), http.StatusInternalServerError)
553 return
554 }
555
556 err = s.enforcer.E.SavePolicy()
557 if err != nil {
558 l.Error("acl save failed", "err", err)
559 http.Error(w, err.Error(), http.StatusInternalServerError)
560 return
561 }
562
563 // reset the ATURI because the transaction completed successfully
564 aturi = ""
565
566 s.notifier.NewRepo(r.Context(), repo)
567 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
568 }
569}
570
571// this is used to rollback changes made to the PDS
572//
573// it is a no-op if the provided ATURI is empty
574func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
575 if aturi == "" {
576 return nil
577 }
578
579 parsed := syntax.ATURI(aturi)
580
581 collection := parsed.Collection().String()
582 repo := parsed.Authority().String()
583 rkey := parsed.RecordKey().String()
584
585 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
586 Collection: collection,
587 Repo: repo,
588 Rkey: rkey,
589 })
590 return err
591}
592
593func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
594 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
595 if err != nil {
596 return err
597 }
598 // already present
599 if len(defaultLabels) == len(defaults) {
600 return nil
601 }
602
603 labelDefs, err := models.FetchLabelDefs(r, defaults)
604 if err != nil {
605 return err
606 }
607
608 // Insert each label definition to the database
609 for _, labelDef := range labelDefs {
610 _, err = db.AddLabelDefinition(e, &labelDef)
611 if err != nil {
612 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
613 }
614 }
615
616 return nil
617}
618
619func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
620 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
621 if err != nil {
622 logger.Error("failed to resolve tangled.org DID", "err", err)
623 return
624 }
625
626 pdsEndpoint := resolved.PDSEndpoint()
627 if pdsEndpoint == "" {
628 logger.Error("no PDS endpoint found for tangled.sh DID")
629 return
630 }
631
632 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
633 if err != nil {
634 logger.Error("failed to create appassword session... skipping fetch", "err", err)
635 return
636 }
637
638 client := xrpc.Client{
639 Auth: &xrpc.AuthInfo{
640 AccessJwt: session.AccessJwt,
641 Did: session.Did,
642 },
643 Host: session.PdsEndpoint,
644 }
645
646 l := log.SubLogger(logger, "bluesky")
647
648 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
649 defer ticker.Stop()
650
651 for {
652 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
653 if err != nil {
654 l.Error("failed to fetch bluesky posts", "err", err)
655 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
656 l.Error("failed to insert bluesky posts", "err", err)
657 } else {
658 l.Info("inserted bluesky posts", "count", len(posts))
659 }
660
661 select {
662 case <-ticker.C:
663 case <-ctx.Done():
664 l.Info("stopping bluesky updater")
665 return
666 }
667 }
668}