A vibe coded tangled fork which supports pijul.
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/cloudflare"
17 "tangled.org/core/appview/config"
18 "tangled.org/core/appview/db"
19 "tangled.org/core/appview/indexer"
20 "tangled.org/core/appview/mentions"
21 "tangled.org/core/appview/models"
22 "tangled.org/core/appview/notify"
23 dbnotify "tangled.org/core/appview/notify/db"
24 phnotify "tangled.org/core/appview/notify/posthog"
25 "tangled.org/core/appview/oauth"
26 "tangled.org/core/appview/pages"
27 "tangled.org/core/appview/reporesolver"
28 "tangled.org/core/appview/validator"
29 xrpcclient "tangled.org/core/appview/xrpcclient"
30 "tangled.org/core/consts"
31 "tangled.org/core/eventconsumer"
32 "tangled.org/core/idresolver"
33 "tangled.org/core/jetstream"
34 "tangled.org/core/log"
35 tlog "tangled.org/core/log"
36 "tangled.org/core/orm"
37 "tangled.org/core/rbac"
38 "tangled.org/core/tid"
39
40 comatproto "github.com/bluesky-social/indigo/api/atproto"
41 atpclient "github.com/bluesky-social/indigo/atproto/client"
42 "github.com/bluesky-social/indigo/atproto/syntax"
43 lexutil "github.com/bluesky-social/indigo/lex/util"
44 "github.com/bluesky-social/indigo/xrpc"
45 securejoin "github.com/cyphar/filepath-securejoin"
46 "github.com/go-chi/chi/v5"
47 "github.com/posthog/posthog-go"
48)
49
50type State struct {
51 db *db.DB
52 notifier notify.Notifier
53 indexer *indexer.Indexer
54 oauth *oauth.OAuth
55 enforcer *rbac.Enforcer
56 pages *pages.Pages
57 idResolver *idresolver.Resolver
58 mentionsResolver *mentions.Resolver
59 posthog posthog.Client
60 jc *jetstream.JetstreamClient
61 config *config.Config
62 repoResolver *reporesolver.RepoResolver
63 knotstream *eventconsumer.Consumer
64 spindlestream *eventconsumer.Consumer
65 logger *slog.Logger
66 validator *validator.Validator
67 cfClient *cloudflare.Client
68}
69
70func Make(ctx context.Context, config *config.Config) (*State, error) {
71 logger := tlog.FromContext(ctx)
72
73 d, err := db.Make(ctx, config.Core.DbPath)
74 if err != nil {
75 return nil, fmt.Errorf("failed to create db: %w", err)
76 }
77
78 indexer := indexer.New(log.SubLogger(logger, "indexer"))
79 err = indexer.Init(ctx, d)
80 if err != nil {
81 return nil, fmt.Errorf("failed to create indexer: %w", err)
82 }
83
84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
85 if err != nil {
86 return nil, fmt.Errorf("failed to create enforcer: %w", err)
87 }
88
89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
90 if err != nil {
91 logger.Error("failed to create redis resolver", "err", err)
92 res = idresolver.DefaultResolver(config.Plc.PLCURL)
93 }
94
95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
96 if err != nil {
97 return nil, fmt.Errorf("failed to create posthog client: %w", err)
98 }
99
100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
102 if err != nil {
103 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
104 }
105 validator := validator.New(d, res, enforcer)
106
107 repoResolver := reporesolver.New(config, enforcer, d)
108
109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
110
111 wrapper := db.DbWrapper{Execer: d}
112 jc, err := jetstream.NewJetstreamClient(
113 config.Jetstream.Endpoint,
114 "appview",
115 []string{
116 tangled.GraphFollowNSID,
117 tangled.FeedStarNSID,
118 tangled.PublicKeyNSID,
119 tangled.RepoArtifactNSID,
120 tangled.ActorProfileNSID,
121 tangled.KnotMemberNSID,
122 tangled.SpindleMemberNSID,
123 tangled.SpindleNSID,
124 tangled.StringNSID,
125 tangled.RepoIssueNSID,
126 tangled.RepoIssueCommentNSID,
127 tangled.LabelDefinitionNSID,
128 tangled.LabelOpNSID,
129 },
130 nil,
131 tlog.SubLogger(logger, "jetstream"),
132 wrapper,
133 false,
134
135 // in-memory filter is inapplicable to appview so
136 // we'll never log dids anyway.
137 false,
138 )
139 if err != nil {
140 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
141 }
142
143 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
144 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
145 }
146
147 ingester := appview.Ingester{
148 Db: wrapper,
149 Enforcer: enforcer,
150 IdResolver: res,
151 Config: config,
152 Logger: log.SubLogger(logger, "ingester"),
153 Validator: validator,
154 }
155 err = jc.StartJetstream(ctx, ingester.Ingest())
156 if err != nil {
157 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
158 }
159
160 var notifiers []notify.Notifier
161
162 // Always add the database notifier
163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
164
165 // Add other notifiers in production only
166 if !config.Core.Dev {
167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
168 }
169 notifiers = append(notifiers, indexer)
170
171 // Add webhook notifier
172 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
173
174 notifier := notify.NewMergedNotifier(notifiers)
175 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
176
177 var cfClient *cloudflare.Client
178 if config.Cloudflare.ApiToken != "" {
179 cfClient, err = cloudflare.New(config)
180 if err != nil {
181 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err)
182 cfClient = nil
183 }
184 }
185
186 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient)
187 if err != nil {
188 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
189 }
190 knotstream.Start(ctx)
191
192 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
193 if err != nil {
194 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
195 }
196 spindlestream.Start(ctx)
197
198 state := &State{
199 db: d,
200 notifier: notifier,
201 indexer: indexer,
202 oauth: oauth,
203 enforcer: enforcer,
204 pages: pages,
205 idResolver: res,
206 mentionsResolver: mentionsResolver,
207 posthog: posthog,
208 jc: jc,
209 config: config,
210 repoResolver: repoResolver,
211 knotstream: knotstream,
212 spindlestream: spindlestream,
213 logger: logger,
214 validator: validator,
215 cfClient: cfClient,
216 }
217
218 // fetch initial bluesky posts if configured
219 go fetchBskyPosts(ctx, res, config, d, logger)
220
221 return state, nil
222}
223
224func (s *State) Close() error {
225 // other close up logic goes here
226 return s.db.Close()
227}
228
229func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
230 w.Header().Set("Content-Type", "text/plain")
231 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
232
233 robotsTxt := `# Hello, Tanglers!
234User-agent: *
235Allow: /
236Disallow: /*/*/settings
237Disallow: /settings
238Disallow: /*/*/compare
239Disallow: /*/*/fork
240
241Crawl-delay: 1
242`
243 w.Write([]byte(robotsTxt))
244}
245
246func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
247 user := s.oauth.GetMultiAccountUser(r)
248 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
249 LoggedInUser: user,
250 })
251}
252
253func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
254 user := s.oauth.GetMultiAccountUser(r)
255 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
256 LoggedInUser: user,
257 })
258}
259
260func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
261 user := s.oauth.GetMultiAccountUser(r)
262 s.pages.Brand(w, pages.BrandParams{
263 LoggedInUser: user,
264 })
265}
266
267func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
268 user := s.oauth.GetMultiAccountUser(r)
269 if user == nil {
270 return
271 }
272
273 l := s.logger.With("handler", "UpgradeBanner")
274 l = l.With("did", user.Active.Did)
275
276 regs, err := db.GetRegistrations(
277 s.db,
278 orm.FilterEq("did", user.Active.Did),
279 orm.FilterEq("needs_upgrade", 1),
280 )
281 if err != nil {
282 l.Error("non-fatal: failed to get registrations", "err", err)
283 }
284
285 spindles, err := db.GetSpindles(
286 s.db,
287 orm.FilterEq("owner", user.Active.Did),
288 orm.FilterEq("needs_upgrade", 1),
289 )
290 if err != nil {
291 l.Error("non-fatal: failed to get spindles", "err", err)
292 }
293
294 if regs == nil && spindles == nil {
295 return
296 }
297
298 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
299 Registrations: regs,
300 Spindles: spindles,
301 })
302}
303
304func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
305 user := chi.URLParam(r, "user")
306 user = strings.TrimPrefix(user, "@")
307
308 if user == "" {
309 w.WriteHeader(http.StatusBadRequest)
310 return
311 }
312
313 id, err := s.idResolver.ResolveIdent(r.Context(), user)
314 if err != nil {
315 w.WriteHeader(http.StatusInternalServerError)
316 return
317 }
318
319 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
320 if err != nil {
321 s.logger.Error("failed to get public keys", "err", err)
322 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
323 return
324 }
325
326 if len(pubKeys) == 0 {
327 w.WriteHeader(http.StatusNoContent)
328 return
329 }
330
331 for _, k := range pubKeys {
332 key := strings.TrimRight(k.Key, "\n")
333 fmt.Fprintln(w, key)
334 }
335}
336
337func validateRepoName(name string) error {
338 // check for path traversal attempts
339 if name == "." || name == ".." ||
340 strings.Contains(name, "/") || strings.Contains(name, "\\") {
341 return fmt.Errorf("Repository name contains invalid path characters")
342 }
343
344 // check for sequences that could be used for traversal when normalized
345 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
346 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
347 return fmt.Errorf("Repository name contains invalid path sequence")
348 }
349
350 // then continue with character validation
351 for _, char := range name {
352 if !((char >= 'a' && char <= 'z') ||
353 (char >= 'A' && char <= 'Z') ||
354 (char >= '0' && char <= '9') ||
355 char == '-' || char == '_' || char == '.') {
356 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
357 }
358 }
359
360 // additional check to prevent multiple sequential dots
361 if strings.Contains(name, "..") {
362 return fmt.Errorf("Repository name cannot contain sequential dots")
363 }
364
365 // if all checks pass
366 return nil
367}
368
369func stripGitExt(name string) string {
370 return strings.TrimSuffix(name, ".git")
371}
372
373func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
374 switch r.Method {
375 case http.MethodGet:
376 user := s.oauth.GetMultiAccountUser(r)
377 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
378 if err != nil {
379 s.pages.Notice(w, "repo", "Invalid user account.")
380 return
381 }
382
383 s.pages.NewRepo(w, pages.NewRepoParams{
384 LoggedInUser: user,
385 Knots: knots,
386 })
387
388 case http.MethodPost:
389 l := s.logger.With("handler", "NewRepo")
390
391 user := s.oauth.GetMultiAccountUser(r)
392 l = l.With("did", user.Active.Did)
393
394 // form validation
395 domain := r.FormValue("domain")
396 if domain == "" {
397 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
398 return
399 }
400 l = l.With("knot", domain)
401
402 repoName := r.FormValue("name")
403 if repoName == "" {
404 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
405 return
406 }
407
408 if err := validateRepoName(repoName); err != nil {
409 s.pages.Notice(w, "repo", err.Error())
410 return
411 }
412 repoName = stripGitExt(repoName)
413 l = l.With("repoName", repoName)
414
415 defaultBranch := r.FormValue("branch")
416 if defaultBranch == "" {
417 defaultBranch = "main"
418 }
419 l = l.With("defaultBranch", defaultBranch)
420
421 vcs := strings.ToLower(strings.TrimSpace(r.FormValue("vcs")))
422 if vcs == "" {
423 vcs = "git"
424 }
425 switch vcs {
426 case "git", "pijul":
427 default:
428 s.pages.Notice(w, "repo", "Invalid repository type.")
429 return
430 }
431 l = l.With("vcs", vcs)
432
433 description := r.FormValue("description")
434 if len([]rune(description)) > 140 {
435 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
436 return
437 }
438
439 // ACL validation
440 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
441 if err != nil || !ok {
442 l.Info("unauthorized")
443 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
444 return
445 }
446
447 // Check for existing repos
448 existingRepo, err := db.GetRepo(
449 s.db,
450 orm.FilterEq("did", user.Active.Did),
451 orm.FilterEq("name", repoName),
452 )
453 if err == nil && existingRepo != nil {
454 l.Info("repo exists")
455 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
456 return
457 }
458
459 // create atproto record for this repo
460 rkey := tid.TID()
461 repo := &models.Repo{
462 Did: user.Active.Did,
463 Name: repoName,
464 Knot: domain,
465 Rkey: rkey,
466 Description: description,
467 Created: time.Now(),
468 Labels: s.config.Label.DefaultLabelDefs,
469 Vcs: vcs,
470 }
471 record := repo.AsRecord()
472
473 atpClient, err := s.oauth.AuthorizedClient(r)
474 if err != nil {
475 l.Info("PDS write failed", "err", err)
476 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
477 return
478 }
479
480 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
481 Collection: tangled.RepoNSID,
482 Repo: user.Active.Did,
483 Rkey: rkey,
484 Record: &lexutil.LexiconTypeDecoder{
485 Val: &record,
486 },
487 })
488 if err != nil {
489 l.Info("PDS write failed", "err", err)
490 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
491 return
492 }
493
494 aturi := atresp.Uri
495 l = l.With("aturi", aturi)
496 l.Info("wrote to PDS")
497
498 tx, err := s.db.BeginTx(r.Context(), nil)
499 if err != nil {
500 l.Info("txn failed", "err", err)
501 s.pages.Notice(w, "repo", "Failed to save repository information.")
502 return
503 }
504
505 // The rollback function reverts a few things on failure:
506 // - the pending txn
507 // - the ACLs
508 // - the atproto record created
509 rollback := func() {
510 err1 := tx.Rollback()
511 err2 := s.enforcer.E.LoadPolicy()
512 err3 := rollbackRecord(context.Background(), aturi, atpClient)
513
514 // ignore txn complete errors, this is okay
515 if errors.Is(err1, sql.ErrTxDone) {
516 err1 = nil
517 }
518
519 if errs := errors.Join(err1, err2, err3); errs != nil {
520 l.Error("failed to rollback changes", "errs", errs)
521 return
522 }
523 }
524 defer rollback()
525
526 client, err := s.oauth.ServiceClient(
527 r,
528 oauth.WithService(domain),
529 oauth.WithLxm(tangled.RepoCreateNSID),
530 oauth.WithDev(s.config.Core.Dev),
531 )
532 if err != nil {
533 l.Error("service auth failed", "err", err)
534 s.pages.Notice(w, "repo", "Failed to reach PDS.")
535 return
536 }
537
538 xe := tangled.RepoCreate(
539 r.Context(),
540 client,
541 &tangled.RepoCreate_Input{
542 Rkey: rkey,
543 Vcs: &vcs,
544 },
545 )
546 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
547 l.Error("xrpc error", "xe", xe)
548 s.pages.Notice(w, "repo", err.Error())
549 return
550 }
551
552 err = db.AddRepo(tx, repo)
553 if err != nil {
554 l.Error("db write failed", "err", err)
555 s.pages.Notice(w, "repo", "Failed to save repository information.")
556 return
557 }
558
559 // acls
560 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
561 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
562 if err != nil {
563 l.Error("acl setup failed", "err", err)
564 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
565 return
566 }
567 err = tx.Commit()
568 if err != nil {
569 l.Error("txn commit failed", "err", err)
570 http.Error(w, err.Error(), http.StatusInternalServerError)
571 return
572 }
573
574 err = s.enforcer.E.SavePolicy()
575 if err != nil {
576 l.Error("acl save failed", "err", err)
577 http.Error(w, err.Error(), http.StatusInternalServerError)
578 return
579 }
580
581 // reset the ATURI because the transaction completed successfully
582 aturi = ""
583
584 s.notifier.NewRepo(r.Context(), repo)
585 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
586 }
587}
588
589// this is used to rollback changes made to the PDS
590//
591// it is a no-op if the provided ATURI is empty
592func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
593 if aturi == "" {
594 return nil
595 }
596
597 parsed := syntax.ATURI(aturi)
598
599 collection := parsed.Collection().String()
600 repo := parsed.Authority().String()
601 rkey := parsed.RecordKey().String()
602
603 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
604 Collection: collection,
605 Repo: repo,
606 Rkey: rkey,
607 })
608 return err
609}
610
611func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
612 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
613 if err != nil {
614 return err
615 }
616 // already present
617 if len(defaultLabels) == len(defaults) {
618 return nil
619 }
620
621 labelDefs, err := models.FetchLabelDefs(r, defaults)
622 if err != nil {
623 return err
624 }
625
626 // Insert each label definition to the database
627 for _, labelDef := range labelDefs {
628 _, err = db.AddLabelDefinition(e, &labelDef)
629 if err != nil {
630 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
631 }
632 }
633
634 return nil
635}
636
637func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
638 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
639 if err != nil {
640 logger.Error("failed to resolve tangled.org DID", "err", err)
641 return
642 }
643
644 pdsEndpoint := resolved.PDSEndpoint()
645 if pdsEndpoint == "" {
646 logger.Error("no PDS endpoint found for tangled.sh DID")
647 return
648 }
649
650 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger)
651 if err != nil {
652 logger.Error("failed to create appassword session... skipping fetch", "err", err)
653 return
654 }
655
656 client := xrpc.Client{
657 Auth: &xrpc.AuthInfo{
658 AccessJwt: session.AccessJwt,
659 Did: session.Did,
660 },
661 Host: session.PdsEndpoint,
662 }
663
664 l := log.SubLogger(logger, "bluesky")
665
666 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
667 defer ticker.Stop()
668
669 for {
670 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
671 if err != nil {
672 l.Error("failed to fetch bluesky posts", "err", err)
673 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
674 l.Error("failed to insert bluesky posts", "err", err)
675 } else {
676 l.Info("inserted bluesky posts", "count", len(posts))
677 }
678
679 select {
680 case <-ticker.C:
681 case <-ctx.Done():
682 l.Info("stopping bluesky updater")
683 return
684 }
685 }
686}