A vibe coded tangled fork which supports pijul.
at 0a8187a566ee0e8b2efbcc746fe5014f92fcc8e3 678 lines 18 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/indexer" 19 "tangled.org/core/appview/mentions" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 dbnotify "tangled.org/core/appview/notify/db" 23 phnotify "tangled.org/core/appview/notify/posthog" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/reporesolver" 27 "tangled.org/core/appview/validator" 28 xrpcclient "tangled.org/core/appview/xrpcclient" 29 "tangled.org/core/consts" 30 "tangled.org/core/eventconsumer" 31 "tangled.org/core/idresolver" 32 "tangled.org/core/jetstream" 33 "tangled.org/core/log" 34 tlog "tangled.org/core/log" 35 "tangled.org/core/orm" 36 "tangled.org/core/rbac" 37 "tangled.org/core/tid" 38 39 comatproto "github.com/bluesky-social/indigo/api/atproto" 40 atpclient "github.com/bluesky-social/indigo/atproto/client" 41 "github.com/bluesky-social/indigo/atproto/syntax" 42 lexutil "github.com/bluesky-social/indigo/lex/util" 43 "github.com/bluesky-social/indigo/xrpc" 44 securejoin "github.com/cyphar/filepath-securejoin" 45 "github.com/go-chi/chi/v5" 46 "github.com/posthog/posthog-go" 47) 48 49type State struct { 50 db *db.DB 51 notifier notify.Notifier 52 indexer *indexer.Indexer 53 oauth *oauth.OAuth 54 enforcer *rbac.Enforcer 55 pages *pages.Pages 56 idResolver *idresolver.Resolver 57 mentionsResolver *mentions.Resolver 58 posthog posthog.Client 59 jc *jetstream.JetstreamClient 60 config *config.Config 61 repoResolver *reporesolver.RepoResolver 62 knotstream *eventconsumer.Consumer 63 spindlestream *eventconsumer.Consumer 64 logger *slog.Logger 65 validator *validator.Validator 66} 67 68func Make(ctx context.Context, config *config.Config) (*State, error) { 69 logger := tlog.FromContext(ctx) 70 71 d, err := db.Make(ctx, config.Core.DbPath) 72 if err != nil { 73 return nil, fmt.Errorf("failed to create db: %w", err) 74 } 75 76 indexer := indexer.New(log.SubLogger(logger, "indexer")) 77 err = indexer.Init(ctx, d) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create indexer: %w", err) 80 } 81 82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create enforcer: %w", err) 85 } 86 87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 88 if err != nil { 89 logger.Error("failed to create redis resolver", "err", err) 90 res = idresolver.DefaultResolver(config.Plc.PLCURL) 91 } 92 93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 94 if err != nil { 95 return nil, fmt.Errorf("failed to create posthog client: %w", err) 96 } 97 98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 102 } 103 validator := validator.New(d, res, enforcer) 104 105 repoResolver := reporesolver.New(config, enforcer, d) 106 107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 108 109 wrapper := db.DbWrapper{Execer: d} 110 jc, err := jetstream.NewJetstreamClient( 111 config.Jetstream.Endpoint, 112 "appview", 113 []string{ 114 tangled.GraphFollowNSID, 115 tangled.FeedStarNSID, 116 tangled.PublicKeyNSID, 117 tangled.RepoArtifactNSID, 118 tangled.ActorProfileNSID, 119 tangled.SpindleMemberNSID, 120 tangled.SpindleNSID, 121 tangled.StringNSID, 122 tangled.RepoIssueNSID, 123 tangled.RepoIssueCommentNSID, 124 tangled.LabelDefinitionNSID, 125 tangled.LabelOpNSID, 126 }, 127 nil, 128 tlog.SubLogger(logger, "jetstream"), 129 wrapper, 130 false, 131 132 // in-memory filter is inapplicable to appview so 133 // we'll never log dids anyway. 134 false, 135 ) 136 if err != nil { 137 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 138 } 139 140 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 141 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 142 } 143 144 ingester := appview.Ingester{ 145 Db: wrapper, 146 Enforcer: enforcer, 147 IdResolver: res, 148 Config: config, 149 Logger: log.SubLogger(logger, "ingester"), 150 Validator: validator, 151 } 152 err = jc.StartJetstream(ctx, ingester.Ingest()) 153 if err != nil { 154 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 155 } 156 157 var notifiers []notify.Notifier 158 159 // Always add the database notifier 160 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 161 162 // Add other notifiers in production only 163 if !config.Core.Dev { 164 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 165 } 166 notifiers = append(notifiers, indexer) 167 168 // Add webhook notifier 169 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 170 171 notifier := notify.NewMergedNotifier(notifiers) 172 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 173 174 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier) 175 if err != nil { 176 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 177 } 178 knotstream.Start(ctx) 179 180 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 181 if err != nil { 182 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 183 } 184 spindlestream.Start(ctx) 185 186 state := &State{ 187 d, 188 notifier, 189 indexer, 190 oauth, 191 enforcer, 192 pages, 193 res, 194 mentionsResolver, 195 posthog, 196 jc, 197 config, 198 repoResolver, 199 knotstream, 200 spindlestream, 201 logger, 202 validator, 203 } 204 205 // fetch initial bluesky posts if configured 206 go fetchBskyPosts(ctx, res, config, d, logger) 207 208 return state, nil 209} 210 211func (s *State) Close() error { 212 // other close up logic goes here 213 return s.db.Close() 214} 215 216func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 217 w.Header().Set("Content-Type", "text/plain") 218 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 219 220 robotsTxt := `# Hello, Tanglers! 221User-agent: * 222Allow: / 223Disallow: /*/*/settings 224Disallow: /settings 225Disallow: /*/*/compare 226Disallow: /*/*/fork 227 228Crawl-delay: 1 229` 230 w.Write([]byte(robotsTxt)) 231} 232 233func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 234 user := s.oauth.GetMultiAccountUser(r) 235 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 236 LoggedInUser: user, 237 }) 238} 239 240func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 241 user := s.oauth.GetMultiAccountUser(r) 242 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 243 LoggedInUser: user, 244 }) 245} 246 247func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 248 user := s.oauth.GetMultiAccountUser(r) 249 s.pages.Brand(w, pages.BrandParams{ 250 LoggedInUser: user, 251 }) 252} 253 254func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 255 if s.oauth.GetMultiAccountUser(r) != nil { 256 s.Timeline(w, r) 257 return 258 } 259 s.Home(w, r) 260} 261 262func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 263 user := s.oauth.GetMultiAccountUser(r) 264 265 // TODO: set this flag based on the UI 266 filtered := false 267 268 var userDid string 269 if user != nil && user.Active != nil { 270 userDid = user.Active.Did 271 } 272 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 273 if err != nil { 274 s.logger.Error("failed to make timeline", "err", err) 275 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 276 } 277 278 repos, err := db.GetTopStarredReposLastWeek(s.db) 279 if err != nil { 280 s.logger.Error("failed to get top starred repos", "err", err) 281 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 282 return 283 } 284 285 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 286 if err != nil { 287 // non-fatal 288 } 289 290 s.pages.Timeline(w, pages.TimelineParams{ 291 LoggedInUser: user, 292 Timeline: timeline, 293 Repos: repos, 294 GfiLabel: gfiLabel, 295 }) 296} 297 298func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 299 user := s.oauth.GetMultiAccountUser(r) 300 if user == nil { 301 return 302 } 303 304 l := s.logger.With("handler", "UpgradeBanner") 305 l = l.With("did", user.Active.Did) 306 307 regs, err := db.GetRegistrations( 308 s.db, 309 orm.FilterEq("did", user.Active.Did), 310 orm.FilterEq("needs_upgrade", 1), 311 ) 312 if err != nil { 313 l.Error("non-fatal: failed to get registrations", "err", err) 314 } 315 316 spindles, err := db.GetSpindles( 317 s.db, 318 orm.FilterEq("owner", user.Active.Did), 319 orm.FilterEq("needs_upgrade", 1), 320 ) 321 if err != nil { 322 l.Error("non-fatal: failed to get spindles", "err", err) 323 } 324 325 if regs == nil && spindles == nil { 326 return 327 } 328 329 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 330 Registrations: regs, 331 Spindles: spindles, 332 }) 333} 334 335func (s *State) Home(w http.ResponseWriter, r *http.Request) { 336 // TODO: set this flag based on the UI 337 filtered := false 338 339 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 340 if err != nil { 341 s.logger.Error("failed to make timeline", "err", err) 342 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 343 return 344 } 345 346 repos, err := db.GetTopStarredReposLastWeek(s.db) 347 if err != nil { 348 s.logger.Error("failed to get top starred repos", "err", err) 349 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 350 return 351 } 352 353 s.pages.Home(w, pages.TimelineParams{ 354 LoggedInUser: nil, 355 Timeline: timeline, 356 Repos: repos, 357 }) 358} 359 360func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 361 user := chi.URLParam(r, "user") 362 user = strings.TrimPrefix(user, "@") 363 364 if user == "" { 365 w.WriteHeader(http.StatusBadRequest) 366 return 367 } 368 369 id, err := s.idResolver.ResolveIdent(r.Context(), user) 370 if err != nil { 371 w.WriteHeader(http.StatusInternalServerError) 372 return 373 } 374 375 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 376 if err != nil { 377 s.logger.Error("failed to get public keys", "err", err) 378 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 379 return 380 } 381 382 if len(pubKeys) == 0 { 383 w.WriteHeader(http.StatusNoContent) 384 return 385 } 386 387 for _, k := range pubKeys { 388 key := strings.TrimRight(k.Key, "\n") 389 fmt.Fprintln(w, key) 390 } 391} 392 393func validateRepoName(name string) error { 394 // check for path traversal attempts 395 if name == "." || name == ".." || 396 strings.Contains(name, "/") || strings.Contains(name, "\\") { 397 return fmt.Errorf("Repository name contains invalid path characters") 398 } 399 400 // check for sequences that could be used for traversal when normalized 401 if strings.Contains(name, "./") || strings.Contains(name, "../") || 402 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 403 return fmt.Errorf("Repository name contains invalid path sequence") 404 } 405 406 // then continue with character validation 407 for _, char := range name { 408 if !((char >= 'a' && char <= 'z') || 409 (char >= 'A' && char <= 'Z') || 410 (char >= '0' && char <= '9') || 411 char == '-' || char == '_' || char == '.') { 412 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 413 } 414 } 415 416 // additional check to prevent multiple sequential dots 417 if strings.Contains(name, "..") { 418 return fmt.Errorf("Repository name cannot contain sequential dots") 419 } 420 421 // if all checks pass 422 return nil 423} 424 425func stripGitExt(name string) string { 426 return strings.TrimSuffix(name, ".git") 427} 428 429func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 430 switch r.Method { 431 case http.MethodGet: 432 user := s.oauth.GetMultiAccountUser(r) 433 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 434 if err != nil { 435 s.pages.Notice(w, "repo", "Invalid user account.") 436 return 437 } 438 439 s.pages.NewRepo(w, pages.NewRepoParams{ 440 LoggedInUser: user, 441 Knots: knots, 442 }) 443 444 case http.MethodPost: 445 l := s.logger.With("handler", "NewRepo") 446 447 user := s.oauth.GetMultiAccountUser(r) 448 l = l.With("did", user.Active.Did) 449 450 // form validation 451 domain := r.FormValue("domain") 452 if domain == "" { 453 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 454 return 455 } 456 l = l.With("knot", domain) 457 458 repoName := r.FormValue("name") 459 if repoName == "" { 460 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 461 return 462 } 463 464 if err := validateRepoName(repoName); err != nil { 465 s.pages.Notice(w, "repo", err.Error()) 466 return 467 } 468 repoName = stripGitExt(repoName) 469 l = l.With("repoName", repoName) 470 471 defaultBranch := r.FormValue("branch") 472 if defaultBranch == "" { 473 defaultBranch = "main" 474 } 475 l = l.With("defaultBranch", defaultBranch) 476 477 description := r.FormValue("description") 478 if len([]rune(description)) > 140 { 479 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 480 return 481 } 482 483 // ACL validation 484 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 485 if err != nil || !ok { 486 l.Info("unauthorized") 487 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 488 return 489 } 490 491 // Check for existing repos 492 existingRepo, err := db.GetRepo( 493 s.db, 494 orm.FilterEq("did", user.Active.Did), 495 orm.FilterEq("name", repoName), 496 ) 497 if err == nil && existingRepo != nil { 498 l.Info("repo exists") 499 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 500 return 501 } 502 503 // create atproto record for this repo 504 rkey := tid.TID() 505 repo := &models.Repo{ 506 Did: user.Active.Did, 507 Name: repoName, 508 Knot: domain, 509 Rkey: rkey, 510 Description: description, 511 Created: time.Now(), 512 Labels: s.config.Label.DefaultLabelDefs, 513 } 514 record := repo.AsRecord() 515 516 atpClient, err := s.oauth.AuthorizedClient(r) 517 if err != nil { 518 l.Info("PDS write failed", "err", err) 519 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 520 return 521 } 522 523 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 524 Collection: tangled.RepoNSID, 525 Repo: user.Active.Did, 526 Rkey: rkey, 527 Record: &lexutil.LexiconTypeDecoder{ 528 Val: &record, 529 }, 530 }) 531 if err != nil { 532 l.Info("PDS write failed", "err", err) 533 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 534 return 535 } 536 537 aturi := atresp.Uri 538 l = l.With("aturi", aturi) 539 l.Info("wrote to PDS") 540 541 tx, err := s.db.BeginTx(r.Context(), nil) 542 if err != nil { 543 l.Info("txn failed", "err", err) 544 s.pages.Notice(w, "repo", "Failed to save repository information.") 545 return 546 } 547 548 // The rollback function reverts a few things on failure: 549 // - the pending txn 550 // - the ACLs 551 // - the atproto record created 552 rollback := func() { 553 err1 := tx.Rollback() 554 err2 := s.enforcer.E.LoadPolicy() 555 err3 := rollbackRecord(context.Background(), aturi, atpClient) 556 557 // ignore txn complete errors, this is okay 558 if errors.Is(err1, sql.ErrTxDone) { 559 err1 = nil 560 } 561 562 if errs := errors.Join(err1, err2, err3); errs != nil { 563 l.Error("failed to rollback changes", "errs", errs) 564 return 565 } 566 } 567 defer rollback() 568 569 client, err := s.oauth.ServiceClient( 570 r, 571 oauth.WithService(domain), 572 oauth.WithLxm(tangled.RepoCreateNSID), 573 oauth.WithDev(s.config.Core.Dev), 574 ) 575 if err != nil { 576 l.Error("service auth failed", "err", err) 577 s.pages.Notice(w, "repo", "Failed to reach PDS.") 578 return 579 } 580 581 xe := tangled.RepoCreate( 582 r.Context(), 583 client, 584 &tangled.RepoCreate_Input{ 585 Rkey: rkey, 586 }, 587 ) 588 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 589 l.Error("xrpc error", "xe", xe) 590 s.pages.Notice(w, "repo", err.Error()) 591 return 592 } 593 594 err = db.AddRepo(tx, repo) 595 if err != nil { 596 l.Error("db write failed", "err", err) 597 s.pages.Notice(w, "repo", "Failed to save repository information.") 598 return 599 } 600 601 // acls 602 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 603 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 604 if err != nil { 605 l.Error("acl setup failed", "err", err) 606 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 607 return 608 } 609 610 err = tx.Commit() 611 if err != nil { 612 l.Error("txn commit failed", "err", err) 613 http.Error(w, err.Error(), http.StatusInternalServerError) 614 return 615 } 616 617 err = s.enforcer.E.SavePolicy() 618 if err != nil { 619 l.Error("acl save failed", "err", err) 620 http.Error(w, err.Error(), http.StatusInternalServerError) 621 return 622 } 623 624 // reset the ATURI because the transaction completed successfully 625 aturi = "" 626 627 s.notifier.NewRepo(r.Context(), repo) 628 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 629 } 630} 631 632// this is used to rollback changes made to the PDS 633// 634// it is a no-op if the provided ATURI is empty 635func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 636 if aturi == "" { 637 return nil 638 } 639 640 parsed := syntax.ATURI(aturi) 641 642 collection := parsed.Collection().String() 643 repo := parsed.Authority().String() 644 rkey := parsed.RecordKey().String() 645 646 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 647 Collection: collection, 648 Repo: repo, 649 Rkey: rkey, 650 }) 651 return err 652} 653 654func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 655 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 656 if err != nil { 657 return err 658 } 659 // already present 660 if len(defaultLabels) == len(defaults) { 661 return nil 662 } 663 664 labelDefs, err := models.FetchLabelDefs(r, defaults) 665 if err != nil { 666 return err 667 } 668 669 // Insert each label definition to the database 670 for _, labelDef := range labelDefs { 671 _, err = db.AddLabelDefinition(e, &labelDef) 672 if err != nil { 673 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 674 } 675 } 676 677 return nil 678}