A vibe coded tangled fork which supports pijul.
at f4a09133a636451c495fa96c98b99379b7692497 651 lines 17 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/indexer" 18 "tangled.org/core/appview/mentions" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/appview/notify" 21 dbnotify "tangled.org/core/appview/notify/db" 22 phnotify "tangled.org/core/appview/notify/posthog" 23 "tangled.org/core/appview/oauth" 24 "tangled.org/core/appview/pages" 25 "tangled.org/core/appview/reporesolver" 26 xrpcclient "tangled.org/core/appview/xrpcclient" 27 "tangled.org/core/eventconsumer" 28 "tangled.org/core/idresolver" 29 "tangled.org/core/jetstream" 30 "tangled.org/core/log" 31 tlog "tangled.org/core/log" 32 "tangled.org/core/orm" 33 "tangled.org/core/rbac" 34 "tangled.org/core/tid" 35 36 comatproto "github.com/bluesky-social/indigo/api/atproto" 37 atpclient "github.com/bluesky-social/indigo/atproto/client" 38 "github.com/bluesky-social/indigo/atproto/syntax" 39 lexutil "github.com/bluesky-social/indigo/lex/util" 40 securejoin "github.com/cyphar/filepath-securejoin" 41 "github.com/go-chi/chi/v5" 42 "github.com/posthog/posthog-go" 43) 44 45type State struct { 46 db *db.DB 47 notifier notify.Notifier 48 indexer *indexer.Indexer 49 oauth *oauth.OAuth 50 enforcer *rbac.Enforcer 51 pages *pages.Pages 52 idResolver *idresolver.Resolver 53 mentionsResolver *mentions.Resolver 54 posthog posthog.Client 55 jc *jetstream.JetstreamClient 56 config *config.Config 57 repoResolver *reporesolver.RepoResolver 58 knotstream *eventconsumer.Consumer 59 spindlestream *eventconsumer.Consumer 60 logger *slog.Logger 61} 62 63func Make(ctx context.Context, config *config.Config) (*State, error) { 64 logger := tlog.FromContext(ctx) 65 66 d, err := db.Make(ctx, config.Core.DbPath) 67 if err != nil { 68 return nil, fmt.Errorf("failed to create db: %w", err) 69 } 70 71 indexer := indexer.New(log.SubLogger(logger, "indexer")) 72 err = indexer.Init(ctx, d) 73 if err != nil { 74 return nil, fmt.Errorf("failed to create indexer: %w", err) 75 } 76 77 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create enforcer: %w", err) 80 } 81 82 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 83 if err != nil { 84 logger.Error("failed to create redis resolver", "err", err) 85 res = idresolver.DefaultResolver(config.Plc.PLCURL) 86 } 87 88 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 89 if err != nil { 90 return nil, fmt.Errorf("failed to create posthog client: %w", err) 91 } 92 93 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 94 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 95 if err != nil { 96 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 97 } 98 99 repoResolver := reporesolver.New(config, enforcer, d) 100 101 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 102 103 wrapper := db.DbWrapper{Execer: d} 104 jc, err := jetstream.NewJetstreamClient( 105 config.Jetstream.Endpoint, 106 "appview", 107 []string{ 108 tangled.GraphFollowNSID, 109 tangled.FeedStarNSID, 110 tangled.PublicKeyNSID, 111 tangled.RepoArtifactNSID, 112 tangled.ActorProfileNSID, 113 tangled.SpindleMemberNSID, 114 tangled.SpindleNSID, 115 tangled.StringNSID, 116 tangled.RepoIssueNSID, 117 tangled.RepoIssueCommentNSID, 118 tangled.LabelDefinitionNSID, 119 tangled.LabelOpNSID, 120 }, 121 nil, 122 tlog.SubLogger(logger, "jetstream"), 123 wrapper, 124 false, 125 126 // in-memory filter is inapplicalble to appview so 127 // we'll never log dids anyway. 128 false, 129 ) 130 if err != nil { 131 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 132 } 133 134 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 135 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 136 } 137 138 ingester := appview.Ingester{ 139 Db: wrapper, 140 Enforcer: enforcer, 141 IdResolver: res, 142 Config: config, 143 Logger: log.SubLogger(logger, "ingester"), 144 } 145 err = jc.StartJetstream(ctx, ingester.Ingest()) 146 if err != nil { 147 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 148 } 149 150 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog) 151 if err != nil { 152 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 153 } 154 knotstream.Start(ctx) 155 156 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 157 if err != nil { 158 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 159 } 160 spindlestream.Start(ctx) 161 162 var notifiers []notify.Notifier 163 164 // Always add the database notifier 165 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 166 167 // Add other notifiers in production only 168 if !config.Core.Dev { 169 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 170 } 171 notifiers = append(notifiers, indexer) 172 notifier := notify.NewMergedNotifier(notifiers, tlog.SubLogger(logger, "notify")) 173 174 state := &State{ 175 d, 176 notifier, 177 indexer, 178 oauth, 179 enforcer, 180 pages, 181 res, 182 mentionsResolver, 183 posthog, 184 jc, 185 config, 186 repoResolver, 187 knotstream, 188 spindlestream, 189 logger, 190 } 191 192 return state, nil 193} 194 195func (s *State) Close() error { 196 // other close up logic goes here 197 return s.db.Close() 198} 199 200func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 201 w.Header().Set("Content-Type", "text/plain") 202 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 203 204 robotsTxt := `User-agent: * 205Allow: / 206` 207 w.Write([]byte(robotsTxt)) 208} 209 210func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 211 user := s.oauth.GetMultiAccountUser(r) 212 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 213 LoggedInUser: user, 214 }) 215} 216 217func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 218 user := s.oauth.GetMultiAccountUser(r) 219 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 220 LoggedInUser: user, 221 }) 222} 223 224func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 225 user := s.oauth.GetMultiAccountUser(r) 226 s.pages.Brand(w, pages.BrandParams{ 227 LoggedInUser: user, 228 }) 229} 230 231func (s *State) HomeOrTimeline(w http.ResponseWriter, r *http.Request) { 232 if s.oauth.GetMultiAccountUser(r) != nil { 233 s.Timeline(w, r) 234 return 235 } 236 s.Home(w, r) 237} 238 239func (s *State) Timeline(w http.ResponseWriter, r *http.Request) { 240 user := s.oauth.GetMultiAccountUser(r) 241 242 // TODO: set this flag based on the UI 243 filtered := false 244 245 var userDid string 246 if user != nil && user.Active != nil { 247 userDid = user.Active.Did 248 } 249 timeline, err := db.MakeTimeline(s.db, 50, userDid, filtered) 250 if err != nil { 251 s.logger.Error("failed to make timeline", "err", err) 252 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 253 } 254 255 repos, err := db.GetTopStarredReposLastWeek(s.db) 256 if err != nil { 257 s.logger.Error("failed to get top starred repos", "err", err) 258 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 259 return 260 } 261 262 gfiLabel, err := db.GetLabelDefinition(s.db, orm.FilterEq("at_uri", s.config.Label.GoodFirstIssue)) 263 if err != nil { 264 // non-fatal 265 } 266 267 s.pages.Timeline(w, pages.TimelineParams{ 268 LoggedInUser: user, 269 Timeline: timeline, 270 Repos: repos, 271 GfiLabel: gfiLabel, 272 }) 273} 274 275func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 276 user := s.oauth.GetMultiAccountUser(r) 277 if user == nil { 278 return 279 } 280 281 l := s.logger.With("handler", "UpgradeBanner") 282 l = l.With("did", user.Active.Did) 283 284 regs, err := db.GetRegistrations( 285 s.db, 286 orm.FilterEq("did", user.Active.Did), 287 orm.FilterEq("needs_upgrade", 1), 288 ) 289 if err != nil { 290 l.Error("non-fatal: failed to get registrations", "err", err) 291 } 292 293 spindles, err := db.GetSpindles( 294 s.db, 295 orm.FilterEq("owner", user.Active.Did), 296 orm.FilterEq("needs_upgrade", 1), 297 ) 298 if err != nil { 299 l.Error("non-fatal: failed to get spindles", "err", err) 300 } 301 302 if regs == nil && spindles == nil { 303 return 304 } 305 306 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 307 Registrations: regs, 308 Spindles: spindles, 309 }) 310} 311 312func (s *State) Home(w http.ResponseWriter, r *http.Request) { 313 // TODO: set this flag based on the UI 314 filtered := false 315 316 timeline, err := db.MakeTimeline(s.db, 5, "", filtered) 317 if err != nil { 318 s.logger.Error("failed to make timeline", "err", err) 319 s.pages.Notice(w, "timeline", "Uh oh! Failed to load timeline.") 320 return 321 } 322 323 repos, err := db.GetTopStarredReposLastWeek(s.db) 324 if err != nil { 325 s.logger.Error("failed to get top starred repos", "err", err) 326 s.pages.Notice(w, "topstarredrepos", "Unable to load.") 327 return 328 } 329 330 s.pages.Home(w, pages.TimelineParams{ 331 LoggedInUser: nil, 332 Timeline: timeline, 333 Repos: repos, 334 }) 335} 336 337func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 338 user := chi.URLParam(r, "user") 339 user = strings.TrimPrefix(user, "@") 340 341 if user == "" { 342 w.WriteHeader(http.StatusBadRequest) 343 return 344 } 345 346 id, err := s.idResolver.ResolveIdent(r.Context(), user) 347 if err != nil { 348 w.WriteHeader(http.StatusInternalServerError) 349 return 350 } 351 352 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 353 if err != nil { 354 s.logger.Error("failed to get public keys", "err", err) 355 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 356 return 357 } 358 359 if len(pubKeys) == 0 { 360 w.WriteHeader(http.StatusNoContent) 361 return 362 } 363 364 for _, k := range pubKeys { 365 key := strings.TrimRight(k.Key, "\n") 366 fmt.Fprintln(w, key) 367 } 368} 369 370func validateRepoName(name string) error { 371 // check for path traversal attempts 372 if name == "." || name == ".." || 373 strings.Contains(name, "/") || strings.Contains(name, "\\") { 374 return fmt.Errorf("Repository name contains invalid path characters") 375 } 376 377 // check for sequences that could be used for traversal when normalized 378 if strings.Contains(name, "./") || strings.Contains(name, "../") || 379 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 380 return fmt.Errorf("Repository name contains invalid path sequence") 381 } 382 383 // then continue with character validation 384 for _, char := range name { 385 if !((char >= 'a' && char <= 'z') || 386 (char >= 'A' && char <= 'Z') || 387 (char >= '0' && char <= '9') || 388 char == '-' || char == '_' || char == '.') { 389 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 390 } 391 } 392 393 // additional check to prevent multiple sequential dots 394 if strings.Contains(name, "..") { 395 return fmt.Errorf("Repository name cannot contain sequential dots") 396 } 397 398 // if all checks pass 399 return nil 400} 401 402func stripGitExt(name string) string { 403 return strings.TrimSuffix(name, ".git") 404} 405 406func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 407 switch r.Method { 408 case http.MethodGet: 409 user := s.oauth.GetMultiAccountUser(r) 410 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 411 if err != nil { 412 s.pages.Notice(w, "repo", "Invalid user account.") 413 return 414 } 415 416 s.pages.NewRepo(w, pages.NewRepoParams{ 417 LoggedInUser: user, 418 Knots: knots, 419 }) 420 421 case http.MethodPost: 422 l := s.logger.With("handler", "NewRepo") 423 424 user := s.oauth.GetMultiAccountUser(r) 425 l = l.With("did", user.Active.Did) 426 427 // form validation 428 domain := r.FormValue("domain") 429 if domain == "" { 430 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 431 return 432 } 433 l = l.With("knot", domain) 434 435 repoName := r.FormValue("name") 436 if repoName == "" { 437 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 438 return 439 } 440 441 if err := validateRepoName(repoName); err != nil { 442 s.pages.Notice(w, "repo", err.Error()) 443 return 444 } 445 repoName = stripGitExt(repoName) 446 l = l.With("repoName", repoName) 447 448 defaultBranch := r.FormValue("branch") 449 if defaultBranch == "" { 450 defaultBranch = "main" 451 } 452 l = l.With("defaultBranch", defaultBranch) 453 454 description := r.FormValue("description") 455 456 // ACL validation 457 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 458 if err != nil || !ok { 459 l.Info("unauthorized") 460 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 461 return 462 } 463 464 // Check for existing repos 465 existingRepo, err := db.GetRepo( 466 s.db, 467 orm.FilterEq("did", user.Active.Did), 468 orm.FilterEq("name", repoName), 469 ) 470 if err == nil && existingRepo != nil { 471 l.Info("repo exists") 472 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 473 return 474 } 475 476 // create atproto record for this repo 477 rkey := tid.TID() 478 repo := &models.Repo{ 479 Did: user.Active.Did, 480 Name: repoName, 481 Knot: domain, 482 Rkey: rkey, 483 Description: description, 484 Created: time.Now(), 485 Labels: s.config.Label.DefaultLabelDefs, 486 } 487 record := repo.AsRecord() 488 489 atpClient, err := s.oauth.AuthorizedClient(r) 490 if err != nil { 491 l.Info("PDS write failed", "err", err) 492 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 493 return 494 } 495 496 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 497 Collection: tangled.RepoNSID, 498 Repo: user.Active.Did, 499 Rkey: rkey, 500 Record: &lexutil.LexiconTypeDecoder{ 501 Val: &record, 502 }, 503 }) 504 if err != nil { 505 l.Info("PDS write failed", "err", err) 506 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 507 return 508 } 509 510 aturi := atresp.Uri 511 l = l.With("aturi", aturi) 512 l.Info("wrote to PDS") 513 514 tx, err := s.db.BeginTx(r.Context(), nil) 515 if err != nil { 516 l.Info("txn failed", "err", err) 517 s.pages.Notice(w, "repo", "Failed to save repository information.") 518 return 519 } 520 521 // The rollback function reverts a few things on failure: 522 // - the pending txn 523 // - the ACLs 524 // - the atproto record created 525 rollback := func() { 526 err1 := tx.Rollback() 527 err2 := s.enforcer.E.LoadPolicy() 528 err3 := rollbackRecord(context.Background(), aturi, atpClient) 529 530 // ignore txn complete errors, this is okay 531 if errors.Is(err1, sql.ErrTxDone) { 532 err1 = nil 533 } 534 535 if errs := errors.Join(err1, err2, err3); errs != nil { 536 l.Error("failed to rollback changes", "errs", errs) 537 return 538 } 539 } 540 defer rollback() 541 542 client, err := s.oauth.ServiceClient( 543 r, 544 oauth.WithService(domain), 545 oauth.WithLxm(tangled.RepoCreateNSID), 546 oauth.WithDev(s.config.Core.Dev), 547 ) 548 if err != nil { 549 l.Error("service auth failed", "err", err) 550 s.pages.Notice(w, "repo", "Failed to reach PDS.") 551 return 552 } 553 554 xe := tangled.RepoCreate( 555 r.Context(), 556 client, 557 &tangled.RepoCreate_Input{ 558 Rkey: rkey, 559 }, 560 ) 561 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 562 l.Error("xrpc error", "xe", xe) 563 s.pages.Notice(w, "repo", err.Error()) 564 return 565 } 566 567 err = db.AddRepo(tx, repo) 568 if err != nil { 569 l.Error("db write failed", "err", err) 570 s.pages.Notice(w, "repo", "Failed to save repository information.") 571 return 572 } 573 574 // acls 575 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 576 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 577 if err != nil { 578 l.Error("acl setup failed", "err", err) 579 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 580 return 581 } 582 583 err = tx.Commit() 584 if err != nil { 585 l.Error("txn commit failed", "err", err) 586 http.Error(w, err.Error(), http.StatusInternalServerError) 587 return 588 } 589 590 err = s.enforcer.E.SavePolicy() 591 if err != nil { 592 l.Error("acl save failed", "err", err) 593 http.Error(w, err.Error(), http.StatusInternalServerError) 594 return 595 } 596 597 // reset the ATURI because the transaction completed successfully 598 aturi = "" 599 600 s.notifier.NewRepo(r.Context(), repo) 601 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 602 } 603} 604 605// this is used to rollback changes made to the PDS 606// 607// it is a no-op if the provided ATURI is empty 608func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 609 if aturi == "" { 610 return nil 611 } 612 613 parsed := syntax.ATURI(aturi) 614 615 collection := parsed.Collection().String() 616 repo := parsed.Authority().String() 617 rkey := parsed.RecordKey().String() 618 619 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 620 Collection: collection, 621 Repo: repo, 622 Rkey: rkey, 623 }) 624 return err 625} 626 627func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 628 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 629 if err != nil { 630 return err 631 } 632 // already present 633 if len(defaultLabels) == len(defaults) { 634 return nil 635 } 636 637 labelDefs, err := models.FetchLabelDefs(r, defaults) 638 if err != nil { 639 return err 640 } 641 642 // Insert each label definition to the database 643 for _, labelDef := range labelDefs { 644 _, err = db.AddLabelDefinition(e, &labelDef) 645 if err != nil { 646 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 647 } 648 } 649 650 return nil 651}