A vibe coded tangled fork which supports pijul.
1package state
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview"
15 "tangled.org/core/appview/bsky"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/indexer"
19 "tangled.org/core/appview/mentions"
20 "tangled.org/core/appview/models"
21 "tangled.org/core/appview/notify"
22 dbnotify "tangled.org/core/appview/notify/db"
23 phnotify "tangled.org/core/appview/notify/posthog"
24 "tangled.org/core/appview/oauth"
25 "tangled.org/core/appview/pages"
26 "tangled.org/core/appview/reporesolver"
27 "tangled.org/core/appview/validator"
28 xrpcclient "tangled.org/core/appview/xrpcclient"
29 "tangled.org/core/consts"
30 "tangled.org/core/eventconsumer"
31 "tangled.org/core/idresolver"
32 "tangled.org/core/jetstream"
33 "tangled.org/core/log"
34 tlog "tangled.org/core/log"
35 "tangled.org/core/orm"
36 "tangled.org/core/rbac"
37 "tangled.org/core/tid"
38
39 comatproto "github.com/bluesky-social/indigo/api/atproto"
40 atpclient "github.com/bluesky-social/indigo/atproto/client"
41 "github.com/bluesky-social/indigo/atproto/syntax"
42 lexutil "github.com/bluesky-social/indigo/lex/util"
43 "github.com/bluesky-social/indigo/xrpc"
44 securejoin "github.com/cyphar/filepath-securejoin"
45 "github.com/go-chi/chi/v5"
46 "github.com/posthog/posthog-go"
47)
48
49type State struct {
50 db *db.DB
51 notifier notify.Notifier
52 indexer *indexer.Indexer
53 oauth *oauth.OAuth
54 enforcer *rbac.Enforcer
55 pages *pages.Pages
56 idResolver *idresolver.Resolver
57 mentionsResolver *mentions.Resolver
58 posthog posthog.Client
59 jc *jetstream.JetstreamClient
60 config *config.Config
61 repoResolver *reporesolver.RepoResolver
62 knotstream *eventconsumer.Consumer
63 spindlestream *eventconsumer.Consumer
64 logger *slog.Logger
65 validator *validator.Validator
66}
67
68func Make(ctx context.Context, config *config.Config) (*State, error) {
69 logger := tlog.FromContext(ctx)
70
71 d, err := db.Make(ctx, config.Core.DbPath)
72 if err != nil {
73 return nil, fmt.Errorf("failed to create db: %w", err)
74 }
75
76 indexer := indexer.New(log.SubLogger(logger, "indexer"))
77 err = indexer.Init(ctx, d)
78 if err != nil {
79 return nil, fmt.Errorf("failed to create indexer: %w", err)
80 }
81
82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath)
83 if err != nil {
84 return nil, fmt.Errorf("failed to create enforcer: %w", err)
85 }
86
87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL)
88 if err != nil {
89 logger.Error("failed to create redis resolver", "err", err)
90 res = idresolver.DefaultResolver(config.Plc.PLCURL)
91 }
92
93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint})
94 if err != nil {
95 return nil, fmt.Errorf("failed to create posthog client: %w", err)
96 }
97
98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages"))
99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth"))
100 if err != nil {
101 return nil, fmt.Errorf("failed to start oauth handler: %w", err)
102 }
103 validator := validator.New(d, res, enforcer)
104
105 repoResolver := reporesolver.New(config, enforcer, d)
106
107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
108
109 wrapper := db.DbWrapper{Execer: d}
110 jc, err := jetstream.NewJetstreamClient(
111 config.Jetstream.Endpoint,
112 "appview",
113 []string{
114 tangled.GraphFollowNSID,
115 tangled.FeedStarNSID,
116 tangled.PublicKeyNSID,
117 tangled.RepoArtifactNSID,
118 tangled.ActorProfileNSID,
119 tangled.KnotMemberNSID,
120 tangled.SpindleMemberNSID,
121 tangled.SpindleNSID,
122 tangled.StringNSID,
123 tangled.RepoIssueNSID,
124 tangled.RepoIssueCommentNSID,
125 tangled.CommentNSID,
126 tangled.LabelDefinitionNSID,
127 tangled.LabelOpNSID,
128 },
129 nil,
130 tlog.SubLogger(logger, "jetstream"),
131 wrapper,
132 false,
133
134 // in-memory filter is inapplicable to appview so
135 // we'll never log dids anyway.
136 false,
137 )
138 if err != nil {
139 return nil, fmt.Errorf("failed to create jetstream client: %w", err)
140 }
141
142 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil {
143 return nil, fmt.Errorf("failed to backfill default label defs: %w", err)
144 }
145
146 ingester := appview.Ingester{
147 Db: wrapper,
148 Enforcer: enforcer,
149 IdResolver: res,
150 Config: config,
151 Logger: log.SubLogger(logger, "ingester"),
152 Validator: validator,
153 }
154 err = jc.StartJetstream(ctx, ingester.Ingest())
155 if err != nil {
156 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err)
157 }
158
159 var notifiers []notify.Notifier
160
161 // Always add the database notifier
162 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res))
163
164 // Add other notifiers in production only
165 if !config.Core.Dev {
166 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog))
167 }
168 notifiers = append(notifiers, indexer)
169
170 // Add webhook notifier
171 notifiers = append(notifiers, notify.NewWebhookNotifier(d))
172
173 notifier := notify.NewMergedNotifier(notifiers)
174 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify"))
175
176 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier)
177 if err != nil {
178 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err)
179 }
180 knotstream.Start(ctx)
181
182 spindlestream, err := Spindlestream(ctx, config, d, enforcer)
183 if err != nil {
184 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err)
185 }
186 spindlestream.Start(ctx)
187
188 state := &State{
189 d,
190 notifier,
191 indexer,
192 oauth,
193 enforcer,
194 pages,
195 res,
196 mentionsResolver,
197 posthog,
198 jc,
199 config,
200 repoResolver,
201 knotstream,
202 spindlestream,
203 logger,
204 validator,
205 }
206
207 // fetch initial bluesky posts if configured
208 go fetchBskyPosts(ctx, res, config, d, logger)
209
210 return state, nil
211}
212
213func (s *State) Close() error {
214 // other close up logic goes here
215 return s.db.Close()
216}
217
218func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) {
219 w.Header().Set("Content-Type", "text/plain")
220 w.Header().Set("Cache-Control", "public, max-age=86400") // one day
221
222 robotsTxt := `# Hello, Tanglers!
223User-agent: *
224Allow: /
225Disallow: /*/*/settings
226Disallow: /settings
227Disallow: /*/*/compare
228Disallow: /*/*/fork
229
230Crawl-delay: 1
231`
232 w.Write([]byte(robotsTxt))
233}
234
235func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) {
236 user := s.oauth.GetMultiAccountUser(r)
237 s.pages.TermsOfService(w, pages.TermsOfServiceParams{
238 LoggedInUser: user,
239 })
240}
241
242func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) {
243 user := s.oauth.GetMultiAccountUser(r)
244 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{
245 LoggedInUser: user,
246 })
247}
248
249func (s *State) Brand(w http.ResponseWriter, r *http.Request) {
250 user := s.oauth.GetMultiAccountUser(r)
251 s.pages.Brand(w, pages.BrandParams{
252 LoggedInUser: user,
253 })
254}
255
256func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) {
257 user := s.oauth.GetMultiAccountUser(r)
258 if user == nil {
259 return
260 }
261
262 l := s.logger.With("handler", "UpgradeBanner")
263 l = l.With("did", user.Active.Did)
264
265 regs, err := db.GetRegistrations(
266 s.db,
267 orm.FilterEq("did", user.Active.Did),
268 orm.FilterEq("needs_upgrade", 1),
269 )
270 if err != nil {
271 l.Error("non-fatal: failed to get registrations", "err", err)
272 }
273
274 spindles, err := db.GetSpindles(
275 s.db,
276 orm.FilterEq("owner", user.Active.Did),
277 orm.FilterEq("needs_upgrade", 1),
278 )
279 if err != nil {
280 l.Error("non-fatal: failed to get spindles", "err", err)
281 }
282
283 if regs == nil && spindles == nil {
284 return
285 }
286
287 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{
288 Registrations: regs,
289 Spindles: spindles,
290 })
291}
292
293func (s *State) Keys(w http.ResponseWriter, r *http.Request) {
294 user := chi.URLParam(r, "user")
295 user = strings.TrimPrefix(user, "@")
296
297 if user == "" {
298 w.WriteHeader(http.StatusBadRequest)
299 return
300 }
301
302 id, err := s.idResolver.ResolveIdent(r.Context(), user)
303 if err != nil {
304 w.WriteHeader(http.StatusInternalServerError)
305 return
306 }
307
308 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String())
309 if err != nil {
310 s.logger.Error("failed to get public keys", "err", err)
311 http.Error(w, "failed to get public keys", http.StatusInternalServerError)
312 return
313 }
314
315 if len(pubKeys) == 0 {
316 w.WriteHeader(http.StatusNoContent)
317 return
318 }
319
320 for _, k := range pubKeys {
321 key := strings.TrimRight(k.Key, "\n")
322 fmt.Fprintln(w, key)
323 }
324}
325
326func validateRepoName(name string) error {
327 // check for path traversal attempts
328 if name == "." || name == ".." ||
329 strings.Contains(name, "/") || strings.Contains(name, "\\") {
330 return fmt.Errorf("Repository name contains invalid path characters")
331 }
332
333 // check for sequences that could be used for traversal when normalized
334 if strings.Contains(name, "./") || strings.Contains(name, "../") ||
335 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") {
336 return fmt.Errorf("Repository name contains invalid path sequence")
337 }
338
339 // then continue with character validation
340 for _, char := range name {
341 if !((char >= 'a' && char <= 'z') ||
342 (char >= 'A' && char <= 'Z') ||
343 (char >= '0' && char <= '9') ||
344 char == '-' || char == '_' || char == '.') {
345 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores")
346 }
347 }
348
349 // additional check to prevent multiple sequential dots
350 if strings.Contains(name, "..") {
351 return fmt.Errorf("Repository name cannot contain sequential dots")
352 }
353
354 // if all checks pass
355 return nil
356}
357
358func stripGitExt(name string) string {
359 return strings.TrimSuffix(name, ".git")
360}
361
362func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) {
363 switch r.Method {
364 case http.MethodGet:
365 user := s.oauth.GetMultiAccountUser(r)
366 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did)
367 if err != nil {
368 s.pages.Notice(w, "repo", "Invalid user account.")
369 return
370 }
371
372 s.pages.NewRepo(w, pages.NewRepoParams{
373 LoggedInUser: user,
374 Knots: knots,
375 })
376
377 case http.MethodPost:
378 l := s.logger.With("handler", "NewRepo")
379
380 user := s.oauth.GetMultiAccountUser(r)
381 l = l.With("did", user.Active.Did)
382
383 // form validation
384 domain := r.FormValue("domain")
385 if domain == "" {
386 s.pages.Notice(w, "repo", "Invalid form submission—missing knot domain.")
387 return
388 }
389 l = l.With("knot", domain)
390
391 repoName := r.FormValue("name")
392 if repoName == "" {
393 s.pages.Notice(w, "repo", "Repository name cannot be empty.")
394 return
395 }
396
397 if err := validateRepoName(repoName); err != nil {
398 s.pages.Notice(w, "repo", err.Error())
399 return
400 }
401 repoName = stripGitExt(repoName)
402 l = l.With("repoName", repoName)
403
404 defaultBranch := r.FormValue("branch")
405 if defaultBranch == "" {
406 defaultBranch = "main"
407 }
408 l = l.With("defaultBranch", defaultBranch)
409
410 description := r.FormValue("description")
411 if len([]rune(description)) > 140 {
412 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.")
413 return
414 }
415
416 // ACL validation
417 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create")
418 if err != nil || !ok {
419 l.Info("unauthorized")
420 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.")
421 return
422 }
423
424 // Check for existing repos
425 existingRepo, err := db.GetRepo(
426 s.db,
427 orm.FilterEq("did", user.Active.Did),
428 orm.FilterEq("name", repoName),
429 )
430 if err == nil && existingRepo != nil {
431 l.Info("repo exists")
432 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot))
433 return
434 }
435
436 // create atproto record for this repo
437 rkey := tid.TID()
438 repo := &models.Repo{
439 Did: user.Active.Did,
440 Name: repoName,
441 Knot: domain,
442 Rkey: rkey,
443 Description: description,
444 Created: time.Now(),
445 Labels: s.config.Label.DefaultLabelDefs,
446 }
447 record := repo.AsRecord()
448
449 atpClient, err := s.oauth.AuthorizedClient(r)
450 if err != nil {
451 l.Info("PDS write failed", "err", err)
452 s.pages.Notice(w, "repo", "Failed to write record to PDS.")
453 return
454 }
455
456 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{
457 Collection: tangled.RepoNSID,
458 Repo: user.Active.Did,
459 Rkey: rkey,
460 Record: &lexutil.LexiconTypeDecoder{
461 Val: &record,
462 },
463 })
464 if err != nil {
465 l.Info("PDS write failed", "err", err)
466 s.pages.Notice(w, "repo", "Failed to announce repository creation.")
467 return
468 }
469
470 aturi := atresp.Uri
471 l = l.With("aturi", aturi)
472 l.Info("wrote to PDS")
473
474 tx, err := s.db.BeginTx(r.Context(), nil)
475 if err != nil {
476 l.Info("txn failed", "err", err)
477 s.pages.Notice(w, "repo", "Failed to save repository information.")
478 return
479 }
480
481 // The rollback function reverts a few things on failure:
482 // - the pending txn
483 // - the ACLs
484 // - the atproto record created
485 rollback := func() {
486 err1 := tx.Rollback()
487 err2 := s.enforcer.E.LoadPolicy()
488 err3 := rollbackRecord(context.Background(), aturi, atpClient)
489
490 // ignore txn complete errors, this is okay
491 if errors.Is(err1, sql.ErrTxDone) {
492 err1 = nil
493 }
494
495 if errs := errors.Join(err1, err2, err3); errs != nil {
496 l.Error("failed to rollback changes", "errs", errs)
497 return
498 }
499 }
500 defer rollback()
501
502 client, err := s.oauth.ServiceClient(
503 r,
504 oauth.WithService(domain),
505 oauth.WithLxm(tangled.RepoCreateNSID),
506 oauth.WithDev(s.config.Core.Dev),
507 )
508 if err != nil {
509 l.Error("service auth failed", "err", err)
510 s.pages.Notice(w, "repo", "Failed to reach PDS.")
511 return
512 }
513
514 xe := tangled.RepoCreate(
515 r.Context(),
516 client,
517 &tangled.RepoCreate_Input{
518 Rkey: rkey,
519 },
520 )
521 if err := xrpcclient.HandleXrpcErr(xe); err != nil {
522 l.Error("xrpc error", "xe", xe)
523 s.pages.Notice(w, "repo", err.Error())
524 return
525 }
526
527 err = db.AddRepo(tx, repo)
528 if err != nil {
529 l.Error("db write failed", "err", err)
530 s.pages.Notice(w, "repo", "Failed to save repository information.")
531 return
532 }
533
534 // acls
535 p, _ := securejoin.SecureJoin(user.Active.Did, repoName)
536 err = s.enforcer.AddRepo(user.Active.Did, domain, p)
537 if err != nil {
538 l.Error("acl setup failed", "err", err)
539 s.pages.Notice(w, "repo", "Failed to set up repository permissions.")
540 return
541 }
542
543 err = tx.Commit()
544 if err != nil {
545 l.Error("txn commit failed", "err", err)
546 http.Error(w, err.Error(), http.StatusInternalServerError)
547 return
548 }
549
550 err = s.enforcer.E.SavePolicy()
551 if err != nil {
552 l.Error("acl save failed", "err", err)
553 http.Error(w, err.Error(), http.StatusInternalServerError)
554 return
555 }
556
557 // reset the ATURI because the transaction completed successfully
558 aturi = ""
559
560 s.notifier.NewRepo(r.Context(), repo)
561 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName))
562 }
563}
564
565// this is used to rollback changes made to the PDS
566//
567// it is a no-op if the provided ATURI is empty
568func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error {
569 if aturi == "" {
570 return nil
571 }
572
573 parsed := syntax.ATURI(aturi)
574
575 collection := parsed.Collection().String()
576 repo := parsed.Authority().String()
577 rkey := parsed.RecordKey().String()
578
579 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{
580 Collection: collection,
581 Repo: repo,
582 Rkey: rkey,
583 })
584 return err
585}
586
587func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error {
588 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults))
589 if err != nil {
590 return err
591 }
592 // already present
593 if len(defaultLabels) == len(defaults) {
594 return nil
595 }
596
597 labelDefs, err := models.FetchLabelDefs(r, defaults)
598 if err != nil {
599 return err
600 }
601
602 // Insert each label definition to the database
603 for _, labelDef := range labelDefs {
604 _, err = db.AddLabelDefinition(e, &labelDef)
605 if err != nil {
606 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err)
607 }
608 }
609
610 return nil
611}
612
613func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) {
614 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid)
615 if err != nil {
616 logger.Error("failed to resolve tangled.org DID", "err", err)
617 return
618 }
619
620 pdsEndpoint := resolved.PDSEndpoint()
621 if pdsEndpoint == "" {
622 logger.Error("no PDS endpoint found for tangled.sh DID")
623 return
624 }
625
626 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, config.Core.RateLimitBypass, logger)
627 if err != nil {
628 logger.Error("failed to create appassword session... skipping fetch", "err", err)
629 return
630 }
631
632 client := xrpc.Client{
633 Auth: &xrpc.AuthInfo{
634 AccessJwt: session.AccessJwt,
635 Did: session.Did,
636 },
637 Host: session.PdsEndpoint,
638 }
639
640 l := log.SubLogger(logger, "bluesky")
641
642 ticker := time.NewTicker(config.Bluesky.UpdateInterval)
643 defer ticker.Stop()
644
645 for {
646 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "")
647 if err != nil {
648 l.Error("failed to fetch bluesky posts", "err", err)
649 } else if err := db.InsertBlueskyPosts(d, posts); err != nil {
650 l.Error("failed to insert bluesky posts", "err", err)
651 } else {
652 l.Info("inserted bluesky posts", "count", len(posts))
653 }
654
655 select {
656 case <-ticker.C:
657 case <-ctx.Done():
658 l.Info("stopping bluesky updater")
659 return
660 }
661 }
662}