A vibe coded tangled fork which supports pijul.
at 10b0fbe912120e844758910613f7b10b5a1297c2 667 lines 17 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/mentions" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/appview/notify" 21 dbnotify "tangled.org/core/appview/notify/db" 22 phnotify "tangled.org/core/appview/notify/posthog" 23 "tangled.org/core/appview/oauth" 24 "tangled.org/core/appview/pages" 25 "tangled.org/core/appview/reporesolver" 26 xrpcclient "tangled.org/core/appview/xrpcclient" 27 "tangled.org/core/eventconsumer" 28 "tangled.org/core/idresolver" 29 "tangled.org/core/jetstream" 30 "tangled.org/core/log" 31 tlog "tangled.org/core/log" 32 "tangled.org/core/orm" 33 "tangled.org/core/rbac" 34 "tangled.org/core/tid" 35 36 comatproto "github.com/bluesky-social/indigo/api/atproto" 37 atpclient "github.com/bluesky-social/indigo/atproto/client" 38 "github.com/bluesky-social/indigo/atproto/syntax" 39 lexutil "github.com/bluesky-social/indigo/lex/util" 40 securejoin "github.com/cyphar/filepath-securejoin" 41 "github.com/go-chi/chi/v5" 42 "github.com/posthog/posthog-go" 43) 44 45type State struct { 46 db *db.DB 47 notifier notify.Notifier 48 indexer *indexer.Indexer 49 oauth *oauth.OAuth 50 enforcer *rbac.Enforcer 51 pages *pages.Pages 52 idResolver *idresolver.Resolver 53 mentionsResolver *mentions.Resolver 54 posthog posthog.Client 55 jc *jetstream.JetstreamClient 56 config *config.Config 57 repoResolver *reporesolver.RepoResolver 58 knotstream *eventconsumer.Consumer 59 spindlestream *eventconsumer.Consumer 60 logger *slog.Logger 61} 62 63func Make(ctx context.Context, config *config.Config) (*State, error) { 64 logger := tlog.FromContext(ctx) 65 66 d, err := db.Make(ctx, config.Core.DbPath) 67 if err != nil { 68 return nil, fmt.Errorf("failed to create db: %w", err) 69 } 70 71 indexer := indexer.New(log.SubLogger(logger, "indexer")) 72 err = indexer.Init(ctx, d) 73 if err != nil { 74 return nil, fmt.Errorf("failed to create indexer: %w", err) 75 } 76 77 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create enforcer: %w", err) 80 } 81 82 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 83 if err != nil { 84 logger.Error("failed to create redis resolver", "err", err) 85 res = idresolver.DefaultResolver(config.Plc.PLCURL) 86 } 87 88 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 89 if err != nil { 90 return nil, fmt.Errorf("failed to create posthog client: %w", err) 91 } 92 93 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 94 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 95 if err != nil { 96 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 97 } 98 99 repoResolver := reporesolver.New(config, enforcer, d) 100 101 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 102 103 wrapper := db.DbWrapper{Execer: d} 104 jc, err := jetstream.NewJetstreamClient( 105 config.Jetstream.Endpoint, 106 "appview", 107 []string{ 108 tangled.GraphFollowNSID, 109 tangled.FeedStarNSID, 110 tangled.PublicKeyNSID, 111 tangled.RepoArtifactNSID, 112 tangled.ActorProfileNSID, 113 tangled.SpindleMemberNSID, 114 tangled.SpindleNSID, 115 tangled.StringNSID, 116 tangled.RepoIssueNSID, 117 tangled.RepoIssueCommentNSID, 118 tangled.LabelDefinitionNSID, 119 tangled.LabelOpNSID, 120 }, 121 nil, 122 tlog.SubLogger(logger, "jetstream"), 123 wrapper, 124 false, 125 126 // in-memory filter is inapplicable to appview so 127 // we'll never log dids anyway. 128 false, 129 ) 130 if err != nil { 131 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 132 } 133 134 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 135 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 136 } 137 138 ingester := appview.Ingester{ 139 Db: wrapper, 140 Enforcer: enforcer, 141 IdResolver: res, 142 Config: config, 143 Logger: log.SubLogger(logger, "ingester"), 144 } 145 err = jc.StartJetstream(ctx, ingester.Ingest()) 146 if err != nil { 147 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 148 } 149 150 var notifiers []notify.Notifier 151 152 // Always add the database notifier 153 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 154 155 // Add other notifiers in production only 156 if !config.Core.Dev { 157 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 158 } 159 notifiers = append(notifiers, indexer) 160 161 // Add webhook notifier 162 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 163 164 notifier := notify.NewMergedNotifier(notifiers) 165 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 166 167 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier) 168 if err != nil { 169 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 170 } 171 knotstream.Start(ctx) 172 173 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 174 if err != nil { 175 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 176 } 177 spindlestream.Start(ctx) 178 179 state := &State{ 180 d, 181 notifier, 182 indexer, 183 oauth, 184 enforcer, 185 pages, 186 res, 187 mentionsResolver, 188 posthog, 189 jc, 190 config, 191 repoResolver, 192 knotstream, 193 spindlestream, 194 logger, 195 } 196 197 return state, nil 198} 199 200func (s *State) Close() error { 201 // other close up logic goes here 202 return s.db.Close() 203} 204 205func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 206 w.Header().Set("Content-Type", "text/plain") 207 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 208 209 robotsTxt := `# Hello, Tanglers! 210User-agent: * 211Allow: / 212Disallow: /*/*/settings 213Disallow: /settings 214Disallow: /*/*/compare 215Disallow: /*/*/fork 216 217Crawl-delay: 1 218` 219 w.Write([]byte(robotsTxt)) 220} 221 222func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 223 user := s.oauth.GetMultiAccountUser(r) 224 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 225 LoggedInUser: user, 226 }) 227} 228 229func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 230 user := s.oauth.GetMultiAccountUser(r) 231 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 232 LoggedInUser: user, 233 }) 234} 235 236func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 237 user := s.oauth.GetMultiAccountUser(r) 238 s.pages.Brand(w, pages.BrandParams{ 239 LoggedInUser: user, 240 }) 241} 242 243func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 244 if s.oauth.GetMultiAccountUser(r) != nil { 245 s.Timeline(w, r) 246 return 247 } 248 s.Home(w, r) 249} 250 251func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 252 user := s.oauth.GetMultiAccountUser(r) 253 254 // TODO: set this flag based on the UI 255 filtered := false 256 257 var userDid string 258 if user != nil && user.Active != nil { 259 userDid = user.Active.Did 260 } 261 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 262 if err != nil { 263 s.logger.Error("failed to make timeline", "err", err) 264 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 265 } 266 267 repos, err := db.GetTopStarredReposLastWeek(s.db) 268 if err != nil { 269 s.logger.Error("failed to get top starred repos", "err", err) 270 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 271 return 272 } 273 274 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 275 if err != nil { 276 // non-fatal 277 } 278 279 s.pages.Timeline(w, pages.TimelineParams{ 280 LoggedInUser: user, 281 Timeline: timeline, 282 Repos: repos, 283 GfiLabel: gfiLabel, 284 }) 285} 286 287func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 288 user := s.oauth.GetMultiAccountUser(r) 289 if user == nil { 290 return 291 } 292 293 l := s.logger.With("handler", "UpgradeBanner") 294 l = l.With("did", user.Active.Did) 295 296 regs, err := db.GetRegistrations( 297 s.db, 298 orm.FilterEq("did", user.Active.Did), 299 orm.FilterEq("needs_upgrade", 1), 300 ) 301 if err != nil { 302 l.Error("non-fatal: failed to get registrations", "err", err) 303 } 304 305 spindles, err := db.GetSpindles( 306 s.db, 307 orm.FilterEq("owner", user.Active.Did), 308 orm.FilterEq("needs_upgrade", 1), 309 ) 310 if err != nil { 311 l.Error("non-fatal: failed to get spindles", "err", err) 312 } 313 314 if regs == nil && spindles == nil { 315 return 316 } 317 318 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 319 Registrations: regs, 320 Spindles: spindles, 321 }) 322} 323 324func (s *State) Home(w http.ResponseWriter, r *http.Request) { 325 // TODO: set this flag based on the UI 326 filtered := false 327 328 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 329 if err != nil { 330 s.logger.Error("failed to make timeline", "err", err) 331 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 332 return 333 } 334 335 repos, err := db.GetTopStarredReposLastWeek(s.db) 336 if err != nil { 337 s.logger.Error("failed to get top starred repos", "err", err) 338 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 339 return 340 } 341 342 s.pages.Home(w, pages.TimelineParams{ 343 LoggedInUser: nil, 344 Timeline: timeline, 345 Repos: repos, 346 }) 347} 348 349func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 350 user := chi.URLParam(r, "user") 351 user = strings.TrimPrefix(user, "@") 352 353 if user == "" { 354 w.WriteHeader(http.StatusBadRequest) 355 return 356 } 357 358 id, err := s.idResolver.ResolveIdent(r.Context(), user) 359 if err != nil { 360 w.WriteHeader(http.StatusInternalServerError) 361 return 362 } 363 364 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 365 if err != nil { 366 s.logger.Error("failed to get public keys", "err", err) 367 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 368 return 369 } 370 371 if len(pubKeys) == 0 { 372 w.WriteHeader(http.StatusNoContent) 373 return 374 } 375 376 for _, k := range pubKeys { 377 key := strings.TrimRight(k.Key, "\n") 378 fmt.Fprintln(w, key) 379 } 380} 381 382func validateRepoName(name string) error { 383 // check for path traversal attempts 384 if name == "." || name == ".." || 385 strings.Contains(name, "/") || strings.Contains(name, "\\") { 386 return fmt.Errorf("Repository name contains invalid path characters") 387 } 388 389 // check for sequences that could be used for traversal when normalized 390 if strings.Contains(name, "./") || strings.Contains(name, "../") || 391 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 392 return fmt.Errorf("Repository name contains invalid path sequence") 393 } 394 395 // then continue with character validation 396 for _, char := range name { 397 if !((char >= 'a' && char <= 'z') || 398 (char >= 'A' && char <= 'Z') || 399 (char >= '0' && char <= '9') || 400 char == '-' || char == '_' || char == '.') { 401 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 402 } 403 } 404 405 // additional check to prevent multiple sequential dots 406 if strings.Contains(name, "..") { 407 return fmt.Errorf("Repository name cannot contain sequential dots") 408 } 409 410 // if all checks pass 411 return nil 412} 413 414func stripGitExt(name string) string { 415 return strings.TrimSuffix(name, ".git") 416} 417 418func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 419 switch r.Method { 420 case http.MethodGet: 421 user := s.oauth.GetMultiAccountUser(r) 422 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 423 if err != nil { 424 s.pages.Notice(w, "repo", "Invalid user account.") 425 return 426 } 427 428 s.pages.NewRepo(w, pages.NewRepoParams{ 429 LoggedInUser: user, 430 Knots: knots, 431 }) 432 433 case http.MethodPost: 434 l := s.logger.With("handler", "NewRepo") 435 436 user := s.oauth.GetMultiAccountUser(r) 437 l = l.With("did", user.Active.Did) 438 439 // form validation 440 domain := r.FormValue("domain") 441 if domain == "" { 442 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 443 return 444 } 445 l = l.With("knot", domain) 446 447 repoName := r.FormValue("name") 448 if repoName == "" { 449 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 450 return 451 } 452 453 if err := validateRepoName(repoName); err != nil { 454 s.pages.Notice(w, "repo", err.Error()) 455 return 456 } 457 repoName = stripGitExt(repoName) 458 l = l.With("repoName", repoName) 459 460 defaultBranch := r.FormValue("branch") 461 if defaultBranch == "" { 462 defaultBranch = "main" 463 } 464 l = l.With("defaultBranch", defaultBranch) 465 466 description := r.FormValue("description") 467 if len([]rune(description)) > 140 { 468 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 469 return 470 } 471 472 // ACL validation 473 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 474 if err != nil || !ok { 475 l.Info("unauthorized") 476 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 477 return 478 } 479 480 // Check for existing repos 481 existingRepo, err := db.GetRepo( 482 s.db, 483 orm.FilterEq("did", user.Active.Did), 484 orm.FilterEq("name", repoName), 485 ) 486 if err == nil && existingRepo != nil { 487 l.Info("repo exists") 488 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 489 return 490 } 491 492 // create atproto record for this repo 493 rkey := tid.TID() 494 repo := &models.Repo{ 495 Did: user.Active.Did, 496 Name: repoName, 497 Knot: domain, 498 Rkey: rkey, 499 Description: description, 500 Created: time.Now(), 501 Labels: s.config.Label.DefaultLabelDefs, 502 } 503 record := repo.AsRecord() 504 505 atpClient, err := s.oauth.AuthorizedClient(r) 506 if err != nil { 507 l.Info("PDS write failed", "err", err) 508 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 509 return 510 } 511 512 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 513 Collection: tangled.RepoNSID, 514 Repo: user.Active.Did, 515 Rkey: rkey, 516 Record: &lexutil.LexiconTypeDecoder{ 517 Val: &record, 518 }, 519 }) 520 if err != nil { 521 l.Info("PDS write failed", "err", err) 522 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 523 return 524 } 525 526 aturi := atresp.Uri 527 l = l.With("aturi", aturi) 528 l.Info("wrote to PDS") 529 530 tx, err := s.db.BeginTx(r.Context(), nil) 531 if err != nil { 532 l.Info("txn failed", "err", err) 533 s.pages.Notice(w, "repo", "Failed to save repository information.") 534 return 535 } 536 537 // The rollback function reverts a few things on failure: 538 // - the pending txn 539 // - the ACLs 540 // - the atproto record created 541 rollback := func() { 542 err1 := tx.Rollback() 543 err2 := s.enforcer.E.LoadPolicy() 544 err3 := rollbackRecord(context.Background(), aturi, atpClient) 545 546 // ignore txn complete errors, this is okay 547 if errors.Is(err1, sql.ErrTxDone) { 548 err1 = nil 549 } 550 551 if errs := errors.Join(err1, err2, err3); errs != nil { 552 l.Error("failed to rollback changes", "errs", errs) 553 return 554 } 555 } 556 defer rollback() 557 558 client, err := s.oauth.ServiceClient( 559 r, 560 oauth.WithService(domain), 561 oauth.WithLxm(tangled.RepoCreateNSID), 562 oauth.WithDev(s.config.Core.Dev), 563 ) 564 if err != nil { 565 l.Error("service auth failed", "err", err) 566 s.pages.Notice(w, "repo", "Failed to reach PDS.") 567 return 568 } 569 570 xe := tangled.RepoCreate( 571 r.Context(), 572 client, 573 &tangled.RepoCreate_Input{ 574 Rkey: rkey, 575 }, 576 ) 577 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 578 l.Error("xrpc error", "xe", xe) 579 s.pages.Notice(w, "repo", err.Error()) 580 return 581 } 582 583 err = db.AddRepo(tx, repo) 584 if err != nil { 585 l.Error("db write failed", "err", err) 586 s.pages.Notice(w, "repo", "Failed to save repository information.") 587 return 588 } 589 590 // acls 591 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 592 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 593 if err != nil { 594 l.Error("acl setup failed", "err", err) 595 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 596 return 597 } 598 599 err = tx.Commit() 600 if err != nil { 601 l.Error("txn commit failed", "err", err) 602 http.Error(w, err.Error(), http.StatusInternalServerError) 603 return 604 } 605 606 err = s.enforcer.E.SavePolicy() 607 if err != nil { 608 l.Error("acl save failed", "err", err) 609 http.Error(w, err.Error(), http.StatusInternalServerError) 610 return 611 } 612 613 // reset the ATURI because the transaction completed successfully 614 aturi = "" 615 616 s.notifier.NewRepo(r.Context(), repo) 617 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 618 } 619} 620 621// this is used to rollback changes made to the PDS 622// 623// it is a no-op if the provided ATURI is empty 624func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 625 if aturi == "" { 626 return nil 627 } 628 629 parsed := syntax.ATURI(aturi) 630 631 collection := parsed.Collection().String() 632 repo := parsed.Authority().String() 633 rkey := parsed.RecordKey().String() 634 635 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 636 Collection: collection, 637 Repo: repo, 638 Rkey: rkey, 639 }) 640 return err 641} 642 643func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 644 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 645 if err != nil { 646 return err 647 } 648 // already present 649 if len(defaultLabels) == len(defaults) { 650 return nil 651 } 652 653 labelDefs, err := models.FetchLabelDefs(r, defaults) 654 if err != nil { 655 return err 656 } 657 658 // Insert each label definition to the database 659 for _, labelDef := range labelDefs { 660 _, err = db.AddLabelDefinition(e, &labelDef) 661 if err != nil { 662 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 663 } 664 } 665 666 return nil 667}