A vibe coded tangled fork which supports pijul.
at sl/tap-appview 668 lines 18 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cloudflare" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/indexer" 20 "tangled.org/core/appview/mentions" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 dbnotify "tangled.org/core/appview/notify/db" 24 phnotify "tangled.org/core/appview/notify/posthog" 25 "tangled.org/core/appview/oauth" 26 "tangled.org/core/appview/pages" 27 "tangled.org/core/appview/reporesolver" 28 xrpcclient "tangled.org/core/appview/xrpcclient" 29 "tangled.org/core/consts" 30 "tangled.org/core/eventconsumer" 31 "tangled.org/core/idresolver" 32 "tangled.org/core/jetstream" 33 "tangled.org/core/log" 34 tlog "tangled.org/core/log" 35 "tangled.org/core/orm" 36 "tangled.org/core/rbac" 37 "tangled.org/core/tid" 38 39 comatproto "github.com/bluesky-social/indigo/api/atproto" 40 atpclient "github.com/bluesky-social/indigo/atproto/client" 41 "github.com/bluesky-social/indigo/atproto/syntax" 42 lexutil "github.com/bluesky-social/indigo/lex/util" 43 "github.com/bluesky-social/indigo/xrpc" 44 securejoin "github.com/cyphar/filepath-securejoin" 45 "github.com/go-chi/chi/v5" 46 "github.com/posthog/posthog-go" 47) 48 49type State struct { 50 db *db.DB 51 notifier notify.Notifier 52 indexer *indexer.Indexer 53 oauth *oauth.OAuth 54 enforcer *rbac.Enforcer 55 pages *pages.Pages 56 idResolver *idresolver.Resolver 57 mentionsResolver *mentions.Resolver 58 posthog posthog.Client 59 jc *jetstream.JetstreamClient 60 config *config.Config 61 repoResolver *reporesolver.RepoResolver 62 knotstream *eventconsumer.Consumer 63 spindlestream *eventconsumer.Consumer 64 logger *slog.Logger 65 cfClient *cloudflare.Client 66} 67 68func Make(ctx context.Context, config *config.Config) (*State, error) { 69 logger := tlog.FromContext(ctx) 70 71 d, err := db.Make(ctx, config.Core.DbPath) 72 if err != nil { 73 return nil, fmt.Errorf("failed to create db: %w", err) 74 } 75 76 indexer := indexer.New(log.SubLogger(logger, "indexer")) 77 err = indexer.Init(ctx, d) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create indexer: %w", err) 80 } 81 82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create enforcer: %w", err) 85 } 86 87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 88 if err != nil { 89 logger.Error("failed to create redis resolver", "err", err) 90 res = idresolver.DefaultResolver(config.Plc.PLCURL) 91 } 92 93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 94 if err != nil { 95 return nil, fmt.Errorf("failed to create posthog client: %w", err) 96 } 97 98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 102 } 103 104 repoResolver := reporesolver.New(config, enforcer, d) 105 106 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 107 108 wrapper := db.DbWrapper{Execer: d} 109 jc, err := jetstream.NewJetstreamClient( 110 config.Jetstream.Endpoint, 111 "appview", 112 []string{ 113 tangled.GraphFollowNSID, 114 tangled.FeedStarNSID, 115 tangled.PublicKeyNSID, 116 tangled.RepoArtifactNSID, 117 tangled.ActorProfileNSID, 118 tangled.KnotMemberNSID, 119 tangled.SpindleMemberNSID, 120 tangled.SpindleNSID, 121 tangled.StringNSID, 122 tangled.RepoIssueNSID, 123 tangled.RepoIssueCommentNSID, 124 tangled.LabelDefinitionNSID, 125 tangled.LabelOpNSID, 126 }, 127 nil, 128 tlog.SubLogger(logger, "jetstream"), 129 wrapper, 130 false, 131 132 // in-memory filter is inapplicable to appview so 133 // we'll never log dids anyway. 134 false, 135 ) 136 if err != nil { 137 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 138 } 139 140 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 141 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 142 } 143 144 ingester := appview.Ingester{ 145 Db: wrapper, 146 Enforcer: enforcer, 147 IdResolver: res, 148 Config: config, 149 Logger: log.SubLogger(logger, "ingester"), 150 } 151 err = jc.StartJetstream(ctx, ingester.Ingest()) 152 if err != nil { 153 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 154 } 155 156 var notifiers []notify.Notifier 157 158 // Always add the database notifier 159 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 160 161 // Add other notifiers in production only 162 if !config.Core.Dev { 163 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 164 } 165 notifiers = append(notifiers, indexer) 166 167 // Add webhook notifier 168 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 169 170 notifier := notify.NewMergedNotifier(notifiers) 171 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 172 173 var cfClient *cloudflare.Client 174 if config.Cloudflare.ApiToken != "" { 175 cfClient, err = cloudflare.New(config) 176 if err != nil { 177 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 178 cfClient = nil 179 } 180 } 181 182 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 183 if err != nil { 184 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 185 } 186 knotstream.Start(ctx) 187 188 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 189 if err != nil { 190 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 191 } 192 spindlestream.Start(ctx) 193 194 state := &State{ 195 db: d, 196 notifier: notifier, 197 indexer: indexer, 198 oauth: oauth, 199 enforcer: enforcer, 200 pages: pages, 201 idResolver: res, 202 mentionsResolver: mentionsResolver, 203 posthog: posthog, 204 jc: jc, 205 config: config, 206 repoResolver: repoResolver, 207 knotstream: knotstream, 208 spindlestream: spindlestream, 209 logger: logger, 210 cfClient: cfClient, 211 } 212 213 // fetch initial bluesky posts if configured 214 go fetchBskyPosts(ctx, res, config, d, logger) 215 216 return state, nil 217} 218 219func (s *State) Close() error { 220 // other close up logic goes here 221 return s.db.Close() 222} 223 224func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 225 w.Header().Set("Content-Type", "text/plain") 226 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 227 228 robotsTxt := `# Hello, Tanglers! 229User-agent: * 230Allow: / 231Disallow: /*/*/settings 232Disallow: /settings 233Disallow: /*/*/compare 234Disallow: /*/*/fork 235 236Crawl-delay: 1 237` 238 w.Write([]byte(robotsTxt)) 239} 240 241func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 242 user := s.oauth.GetMultiAccountUser(r) 243 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 244 LoggedInUser: user, 245 }) 246} 247 248func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 249 user := s.oauth.GetMultiAccountUser(r) 250 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 251 LoggedInUser: user, 252 }) 253} 254 255func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 256 user := s.oauth.GetMultiAccountUser(r) 257 s.pages.Brand(w, pages.BrandParams{ 258 LoggedInUser: user, 259 }) 260} 261 262func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 263 user := s.oauth.GetMultiAccountUser(r) 264 if user == nil { 265 return 266 } 267 268 l := s.logger.With("handler", "UpgradeBanner") 269 l = l.With("did", user.Active.Did) 270 271 regs, err := db.GetRegistrations( 272 s.db, 273 orm.FilterEq("did", user.Active.Did), 274 orm.FilterEq("needs_upgrade", 1), 275 ) 276 if err != nil { 277 l.Error("non-fatal: failed to get registrations", "err", err) 278 } 279 280 spindles, err := db.GetSpindles( 281 s.db, 282 orm.FilterEq("owner", user.Active.Did), 283 orm.FilterEq("needs_upgrade", 1), 284 ) 285 if err != nil { 286 l.Error("non-fatal: failed to get spindles", "err", err) 287 } 288 289 if regs == nil && spindles == nil { 290 return 291 } 292 293 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 294 Registrations: regs, 295 Spindles: spindles, 296 }) 297} 298 299func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 300 user := chi.URLParam(r, "user") 301 user = strings.TrimPrefix(user, "@") 302 303 if user == "" { 304 w.WriteHeader(http.StatusBadRequest) 305 return 306 } 307 308 id, err := s.idResolver.ResolveIdent(r.Context(), user) 309 if err != nil { 310 w.WriteHeader(http.StatusInternalServerError) 311 return 312 } 313 314 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 315 if err != nil { 316 s.logger.Error("failed to get public keys", "err", err) 317 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 318 return 319 } 320 321 if len(pubKeys) == 0 { 322 w.WriteHeader(http.StatusNoContent) 323 return 324 } 325 326 for _, k := range pubKeys { 327 key := strings.TrimRight(k.Key, "\n") 328 fmt.Fprintln(w, key) 329 } 330} 331 332func validateRepoName(name string) error { 333 // check for path traversal attempts 334 if name == "." || name == ".." || 335 strings.Contains(name, "/") || strings.Contains(name, "\\") { 336 return fmt.Errorf("Repository name contains invalid path characters") 337 } 338 339 // check for sequences that could be used for traversal when normalized 340 if strings.Contains(name, "./") || strings.Contains(name, "../") || 341 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 342 return fmt.Errorf("Repository name contains invalid path sequence") 343 } 344 345 // then continue with character validation 346 for _, char := range name { 347 if !((char >= 'a' && char <= 'z') || 348 (char >= 'A' && char <= 'Z') || 349 (char >= '0' && char <= '9') || 350 char == '-' || char == '_' || char == '.') { 351 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 352 } 353 } 354 355 // additional check to prevent multiple sequential dots 356 if strings.Contains(name, "..") { 357 return fmt.Errorf("Repository name cannot contain sequential dots") 358 } 359 360 // if all checks pass 361 return nil 362} 363 364func stripGitExt(name string) string { 365 return strings.TrimSuffix(name, ".git") 366} 367 368func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 369 switch r.Method { 370 case http.MethodGet: 371 user := s.oauth.GetMultiAccountUser(r) 372 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 373 if err != nil { 374 s.pages.Notice(w, "repo", "Invalid user account.") 375 return 376 } 377 378 s.pages.NewRepo(w, pages.NewRepoParams{ 379 LoggedInUser: user, 380 Knots: knots, 381 }) 382 383 case http.MethodPost: 384 l := s.logger.With("handler", "NewRepo") 385 386 user := s.oauth.GetMultiAccountUser(r) 387 l = l.With("did", user.Active.Did) 388 389 // form validation 390 domain := r.FormValue("domain") 391 if domain == "" { 392 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 393 return 394 } 395 l = l.With("knot", domain) 396 397 repoName := r.FormValue("name") 398 if repoName == "" { 399 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 400 return 401 } 402 403 if err := validateRepoName(repoName); err != nil { 404 s.pages.Notice(w, "repo", err.Error()) 405 return 406 } 407 repoName = stripGitExt(repoName) 408 l = l.With("repoName", repoName) 409 410 defaultBranch := r.FormValue("branch") 411 if defaultBranch == "" { 412 defaultBranch = "main" 413 } 414 l = l.With("defaultBranch", defaultBranch) 415 416 description := r.FormValue("description") 417 if len([]rune(description)) > 140 { 418 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 419 return 420 } 421 422 // ACL validation 423 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 424 if err != nil || !ok { 425 l.Info("unauthorized") 426 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 427 return 428 } 429 430 // Check for existing repos 431 existingRepo, err := db.GetRepo( 432 s.db, 433 orm.FilterEq("did", user.Active.Did), 434 orm.FilterEq("name", repoName), 435 ) 436 if err == nil && existingRepo != nil { 437 l.Info("repo exists") 438 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 439 return 440 } 441 442 // create atproto record for this repo 443 rkey := tid.TID() 444 repo := &models.Repo{ 445 Did: user.Active.Did, 446 Name: repoName, 447 Knot: domain, 448 Rkey: rkey, 449 Description: description, 450 Created: time.Now(), 451 Labels: s.config.Label.DefaultLabelDefs, 452 } 453 record := repo.AsRecord() 454 455 atpClient, err := s.oauth.AuthorizedClient(r) 456 if err != nil { 457 l.Info("PDS write failed", "err", err) 458 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 459 return 460 } 461 462 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 463 Collection: tangled.RepoNSID, 464 Repo: user.Active.Did, 465 Rkey: rkey, 466 Record: &lexutil.LexiconTypeDecoder{ 467 Val: &record, 468 }, 469 }) 470 if err != nil { 471 l.Info("PDS write failed", "err", err) 472 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 473 return 474 } 475 476 aturi := atresp.Uri 477 l = l.With("aturi", aturi) 478 l.Info("wrote to PDS") 479 480 tx, err := s.db.BeginTx(r.Context(), nil) 481 if err != nil { 482 l.Info("txn failed", "err", err) 483 s.pages.Notice(w, "repo", "Failed to save repository information.") 484 return 485 } 486 487 // The rollback function reverts a few things on failure: 488 // - the pending txn 489 // - the ACLs 490 // - the atproto record created 491 rollback := func() { 492 err1 := tx.Rollback() 493 err2 := s.enforcer.E.LoadPolicy() 494 err3 := rollbackRecord(context.Background(), aturi, atpClient) 495 496 // ignore txn complete errors, this is okay 497 if errors.Is(err1, sql.ErrTxDone) { 498 err1 = nil 499 } 500 501 if errs := errors.Join(err1, err2, err3); errs != nil { 502 l.Error("failed to rollback changes", "errs", errs) 503 return 504 } 505 } 506 defer rollback() 507 508 client, err := s.oauth.ServiceClient( 509 r, 510 oauth.WithService(domain), 511 oauth.WithLxm(tangled.RepoCreateNSID), 512 oauth.WithDev(s.config.Core.Dev), 513 ) 514 if err != nil { 515 l.Error("service auth failed", "err", err) 516 s.pages.Notice(w, "repo", "Failed to reach PDS.") 517 return 518 } 519 520 xe := tangled.RepoCreate( 521 r.Context(), 522 client, 523 &tangled.RepoCreate_Input{ 524 Rkey: rkey, 525 }, 526 ) 527 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 528 l.Error("xrpc error", "xe", xe) 529 s.pages.Notice(w, "repo", err.Error()) 530 return 531 } 532 533 err = db.AddRepo(tx, repo) 534 if err != nil { 535 l.Error("db write failed", "err", err) 536 s.pages.Notice(w, "repo", "Failed to save repository information.") 537 return 538 } 539 540 // acls 541 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 542 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 543 if err != nil { 544 l.Error("acl setup failed", "err", err) 545 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 546 return 547 } 548 549 err = tx.Commit() 550 if err != nil { 551 l.Error("txn commit failed", "err", err) 552 http.Error(w, err.Error(), http.StatusInternalServerError) 553 return 554 } 555 556 err = s.enforcer.E.SavePolicy() 557 if err != nil { 558 l.Error("acl save failed", "err", err) 559 http.Error(w, err.Error(), http.StatusInternalServerError) 560 return 561 } 562 563 // reset the ATURI because the transaction completed successfully 564 aturi = "" 565 566 s.notifier.NewRepo(r.Context(), repo) 567 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 568 } 569} 570 571// this is used to rollback changes made to the PDS 572// 573// it is a no-op if the provided ATURI is empty 574func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 575 if aturi == "" { 576 return nil 577 } 578 579 parsed := syntax.ATURI(aturi) 580 581 collection := parsed.Collection().String() 582 repo := parsed.Authority().String() 583 rkey := parsed.RecordKey().String() 584 585 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 586 Collection: collection, 587 Repo: repo, 588 Rkey: rkey, 589 }) 590 return err 591} 592 593func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 594 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 595 if err != nil { 596 return err 597 } 598 // already present 599 if len(defaultLabels) == len(defaults) { 600 return nil 601 } 602 603 labelDefs, err := models.FetchLabelDefs(r, defaults) 604 if err != nil { 605 return err 606 } 607 608 // Insert each label definition to the database 609 for _, labelDef := range labelDefs { 610 _, err = db.AddLabelDefinition(e, &labelDef) 611 if err != nil { 612 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 613 } 614 } 615 616 return nil 617} 618 619func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 620 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 621 if err != nil { 622 logger.Error("failed to resolve tangled.org DID", "err", err) 623 return 624 } 625 626 pdsEndpoint := resolved.PDSEndpoint() 627 if pdsEndpoint == "" { 628 logger.Error("no PDS endpoint found for tangled.sh DID") 629 return 630 } 631 632 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 633 if err != nil { 634 logger.Error("failed to create appassword session... skipping fetch", "err", err) 635 return 636 } 637 638 client := xrpc.Client{ 639 Auth: &xrpc.AuthInfo{ 640 AccessJwt: session.AccessJwt, 641 Did: session.Did, 642 }, 643 Host: session.PdsEndpoint, 644 } 645 646 l := log.SubLogger(logger, "bluesky") 647 648 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 649 defer ticker.Stop() 650 651 for { 652 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 653 if err != nil { 654 l.Error("failed to fetch bluesky posts", "err", err) 655 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 656 l.Error("failed to insert bluesky posts", "err", err) 657 } else { 658 l.Info("inserted bluesky posts", "count", len(posts)) 659 } 660 661 select { 662 case <-ticker.C: 663 case <-ctx.Done(): 664 l.Info("stopping bluesky updater") 665 return 666 } 667 } 668}