A vibe coded tangled fork which supports pijul.
at d861bff51e1597cd6902ff3d8ff601c3b1bf1451 662 lines 17 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/indexer" 19 "tangled.org/core/appview/mentions" 20 "tangled.org/core/appview/models" 21 "tangled.org/core/appview/notify" 22 dbnotify "tangled.org/core/appview/notify/db" 23 phnotify "tangled.org/core/appview/notify/posthog" 24 "tangled.org/core/appview/oauth" 25 "tangled.org/core/appview/pages" 26 "tangled.org/core/appview/reporesolver" 27 "tangled.org/core/appview/validator" 28 xrpcclient "tangled.org/core/appview/xrpcclient" 29 "tangled.org/core/consts" 30 "tangled.org/core/eventconsumer" 31 "tangled.org/core/idresolver" 32 "tangled.org/core/jetstream" 33 "tangled.org/core/log" 34 tlog "tangled.org/core/log" 35 "tangled.org/core/orm" 36 "tangled.org/core/rbac" 37 "tangled.org/core/tid" 38 39 comatproto "github.com/bluesky-social/indigo/api/atproto" 40 atpclient "github.com/bluesky-social/indigo/atproto/client" 41 "github.com/bluesky-social/indigo/atproto/syntax" 42 lexutil "github.com/bluesky-social/indigo/lex/util" 43 "github.com/bluesky-social/indigo/xrpc" 44 securejoin "github.com/cyphar/filepath-securejoin" 45 "github.com/go-chi/chi/v5" 46 "github.com/posthog/posthog-go" 47) 48 49type State struct { 50 db *db.DB 51 notifier notify.Notifier 52 indexer *indexer.Indexer 53 oauth *oauth.OAuth 54 enforcer *rbac.Enforcer 55 pages *pages.Pages 56 idResolver *idresolver.Resolver 57 mentionsResolver *mentions.Resolver 58 posthog posthog.Client 59 jc *jetstream.JetstreamClient 60 config *config.Config 61 repoResolver *reporesolver.RepoResolver 62 knotstream *eventconsumer.Consumer 63 spindlestream *eventconsumer.Consumer 64 logger *slog.Logger 65 validator *validator.Validator 66} 67 68func Make(ctx context.Context, config *config.Config) (*State, error) { 69 logger := tlog.FromContext(ctx) 70 71 d, err := db.Make(ctx, config.Core.DbPath) 72 if err != nil { 73 return nil, fmt.Errorf("failed to create db: %w", err) 74 } 75 76 indexer := indexer.New(log.SubLogger(logger, "indexer")) 77 err = indexer.Init(ctx, d) 78 if err != nil { 79 return nil, fmt.Errorf("failed to create indexer: %w", err) 80 } 81 82 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 83 if err != nil { 84 return nil, fmt.Errorf("failed to create enforcer: %w", err) 85 } 86 87 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 88 if err != nil { 89 logger.Error("failed to create redis resolver", "err", err) 90 res = idresolver.DefaultResolver(config.Plc.PLCURL) 91 } 92 93 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 94 if err != nil { 95 return nil, fmt.Errorf("failed to create posthog client: %w", err) 96 } 97 98 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 99 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 100 if err != nil { 101 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 102 } 103 validator := validator.New(d, res, enforcer) 104 105 repoResolver := reporesolver.New(config, enforcer, d) 106 107 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 108 109 wrapper := db.DbWrapper{Execer: d} 110 jc, err := jetstream.NewJetstreamClient( 111 config.Jetstream.Endpoint, 112 "appview", 113 []string{ 114 tangled.GraphFollowNSID, 115 tangled.FeedStarNSID, 116 tangled.PublicKeyNSID, 117 tangled.RepoArtifactNSID, 118 tangled.ActorProfileNSID, 119 tangled.KnotMemberNSID, 120 tangled.SpindleMemberNSID, 121 tangled.SpindleNSID, 122 tangled.StringNSID, 123 tangled.RepoIssueNSID, 124 tangled.RepoIssueCommentNSID, 125 tangled.CommentNSID, 126 tangled.LabelDefinitionNSID, 127 tangled.LabelOpNSID, 128 }, 129 nil, 130 tlog.SubLogger(logger, "jetstream"), 131 wrapper, 132 false, 133 134 // in-memory filter is inapplicable to appview so 135 // we'll never log dids anyway. 136 false, 137 ) 138 if err != nil { 139 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 140 } 141 142 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 143 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 144 } 145 146 ingester := appview.Ingester{ 147 Db: wrapper, 148 Enforcer: enforcer, 149 IdResolver: res, 150 Config: config, 151 Logger: log.SubLogger(logger, "ingester"), 152 Validator: validator, 153 } 154 err = jc.StartJetstream(ctx, ingester.Ingest()) 155 if err != nil { 156 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 157 } 158 159 var notifiers []notify.Notifier 160 161 // Always add the database notifier 162 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 163 164 // Add other notifiers in production only 165 if !config.Core.Dev { 166 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 167 } 168 notifiers = append(notifiers, indexer) 169 170 // Add webhook notifier 171 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 172 173 notifier := notify.NewMergedNotifier(notifiers) 174 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 175 176 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier) 177 if err != nil { 178 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 179 } 180 knotstream.Start(ctx) 181 182 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 183 if err != nil { 184 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 185 } 186 spindlestream.Start(ctx) 187 188 state := &State{ 189 d, 190 notifier, 191 indexer, 192 oauth, 193 enforcer, 194 pages, 195 res, 196 mentionsResolver, 197 posthog, 198 jc, 199 config, 200 repoResolver, 201 knotstream, 202 spindlestream, 203 logger, 204 validator, 205 } 206 207 // fetch initial bluesky posts if configured 208 go fetchBskyPosts(ctx, res, config, d, logger) 209 210 return state, nil 211} 212 213func (s *State) Close() error { 214 // other close up logic goes here 215 return s.db.Close() 216} 217 218func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 219 w.Header().Set("Content-Type", "text/plain") 220 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 221 222 robotsTxt := `# Hello, Tanglers! 223User-agent: * 224Allow: / 225Disallow: /*/*/settings 226Disallow: /settings 227Disallow: /*/*/compare 228Disallow: /*/*/fork 229 230Crawl-delay: 1 231` 232 w.Write([]byte(robotsTxt)) 233} 234 235func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 236 user := s.oauth.GetMultiAccountUser(r) 237 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 238 LoggedInUser: user, 239 }) 240} 241 242func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 243 user := s.oauth.GetMultiAccountUser(r) 244 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 245 LoggedInUser: user, 246 }) 247} 248 249func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 250 user := s.oauth.GetMultiAccountUser(r) 251 s.pages.Brand(w, pages.BrandParams{ 252 LoggedInUser: user, 253 }) 254} 255 256func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 257 user := s.oauth.GetMultiAccountUser(r) 258 if user == nil { 259 return 260 } 261 262 l := s.logger.With("handler", "UpgradeBanner") 263 l = l.With("did", user.Active.Did) 264 265 regs, err := db.GetRegistrations( 266 s.db, 267 orm.FilterEq("did", user.Active.Did), 268 orm.FilterEq("needs_upgrade", 1), 269 ) 270 if err != nil { 271 l.Error("non-fatal: failed to get registrations", "err", err) 272 } 273 274 spindles, err := db.GetSpindles( 275 s.db, 276 orm.FilterEq("owner", user.Active.Did), 277 orm.FilterEq("needs_upgrade", 1), 278 ) 279 if err != nil { 280 l.Error("non-fatal: failed to get spindles", "err", err) 281 } 282 283 if regs == nil && spindles == nil { 284 return 285 } 286 287 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 288 Registrations: regs, 289 Spindles: spindles, 290 }) 291} 292 293func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 294 user := chi.URLParam(r, "user") 295 user = strings.TrimPrefix(user, "@") 296 297 if user == "" { 298 w.WriteHeader(http.StatusBadRequest) 299 return 300 } 301 302 id, err := s.idResolver.ResolveIdent(r.Context(), user) 303 if err != nil { 304 w.WriteHeader(http.StatusInternalServerError) 305 return 306 } 307 308 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 309 if err != nil { 310 s.logger.Error("failed to get public keys", "err", err) 311 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 312 return 313 } 314 315 if len(pubKeys) == 0 { 316 w.WriteHeader(http.StatusNoContent) 317 return 318 } 319 320 for _, k := range pubKeys { 321 key := strings.TrimRight(k.Key, "\n") 322 fmt.Fprintln(w, key) 323 } 324} 325 326func validateRepoName(name string) error { 327 // check for path traversal attempts 328 if name == "." || name == ".." || 329 strings.Contains(name, "/") || strings.Contains(name, "\\") { 330 return fmt.Errorf("Repository name contains invalid path characters") 331 } 332 333 // check for sequences that could be used for traversal when normalized 334 if strings.Contains(name, "./") || strings.Contains(name, "../") || 335 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 336 return fmt.Errorf("Repository name contains invalid path sequence") 337 } 338 339 // then continue with character validation 340 for _, char := range name { 341 if !((char >= 'a' && char <= 'z') || 342 (char >= 'A' && char <= 'Z') || 343 (char >= '0' && char <= '9') || 344 char == '-' || char == '_' || char == '.') { 345 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 346 } 347 } 348 349 // additional check to prevent multiple sequential dots 350 if strings.Contains(name, "..") { 351 return fmt.Errorf("Repository name cannot contain sequential dots") 352 } 353 354 // if all checks pass 355 return nil 356} 357 358func stripGitExt(name string) string { 359 return strings.TrimSuffix(name, ".git") 360} 361 362func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 363 switch r.Method { 364 case http.MethodGet: 365 user := s.oauth.GetMultiAccountUser(r) 366 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 367 if err != nil { 368 s.pages.Notice(w, "repo", "Invalid user account.") 369 return 370 } 371 372 s.pages.NewRepo(w, pages.NewRepoParams{ 373 LoggedInUser: user, 374 Knots: knots, 375 }) 376 377 case http.MethodPost: 378 l := s.logger.With("handler", "NewRepo") 379 380 user := s.oauth.GetMultiAccountUser(r) 381 l = l.With("did", user.Active.Did) 382 383 // form validation 384 domain := r.FormValue("domain") 385 if domain == "" { 386 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 387 return 388 } 389 l = l.With("knot", domain) 390 391 repoName := r.FormValue("name") 392 if repoName == "" { 393 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 394 return 395 } 396 397 if err := validateRepoName(repoName); err != nil { 398 s.pages.Notice(w, "repo", err.Error()) 399 return 400 } 401 repoName = stripGitExt(repoName) 402 l = l.With("repoName", repoName) 403 404 defaultBranch := r.FormValue("branch") 405 if defaultBranch == "" { 406 defaultBranch = "main" 407 } 408 l = l.With("defaultBranch", defaultBranch) 409 410 description := r.FormValue("description") 411 if len([]rune(description)) > 140 { 412 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 413 return 414 } 415 416 // ACL validation 417 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 418 if err != nil || !ok { 419 l.Info("unauthorized") 420 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 421 return 422 } 423 424 // Check for existing repos 425 existingRepo, err := db.GetRepo( 426 s.db, 427 orm.FilterEq("did", user.Active.Did), 428 orm.FilterEq("name", repoName), 429 ) 430 if err == nil && existingRepo != nil { 431 l.Info("repo exists") 432 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 433 return 434 } 435 436 // create atproto record for this repo 437 rkey := tid.TID() 438 repo := &models.Repo{ 439 Did: user.Active.Did, 440 Name: repoName, 441 Knot: domain, 442 Rkey: rkey, 443 Description: description, 444 Created: time.Now(), 445 Labels: s.config.Label.DefaultLabelDefs, 446 } 447 record := repo.AsRecord() 448 449 atpClient, err := s.oauth.AuthorizedClient(r) 450 if err != nil { 451 l.Info("PDS write failed", "err", err) 452 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 453 return 454 } 455 456 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 457 Collection: tangled.RepoNSID, 458 Repo: user.Active.Did, 459 Rkey: rkey, 460 Record: &lexutil.LexiconTypeDecoder{ 461 Val: &record, 462 }, 463 }) 464 if err != nil { 465 l.Info("PDS write failed", "err", err) 466 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 467 return 468 } 469 470 aturi := atresp.Uri 471 l = l.With("aturi", aturi) 472 l.Info("wrote to PDS") 473 474 tx, err := s.db.BeginTx(r.Context(), nil) 475 if err != nil { 476 l.Info("txn failed", "err", err) 477 s.pages.Notice(w, "repo", "Failed to save repository information.") 478 return 479 } 480 481 // The rollback function reverts a few things on failure: 482 // - the pending txn 483 // - the ACLs 484 // - the atproto record created 485 rollback := func() { 486 err1 := tx.Rollback() 487 err2 := s.enforcer.E.LoadPolicy() 488 err3 := rollbackRecord(context.Background(), aturi, atpClient) 489 490 // ignore txn complete errors, this is okay 491 if errors.Is(err1, sql.ErrTxDone) { 492 err1 = nil 493 } 494 495 if errs := errors.Join(err1, err2, err3); errs != nil { 496 l.Error("failed to rollback changes", "errs", errs) 497 return 498 } 499 } 500 defer rollback() 501 502 client, err := s.oauth.ServiceClient( 503 r, 504 oauth.WithService(domain), 505 oauth.WithLxm(tangled.RepoCreateNSID), 506 oauth.WithDev(s.config.Core.Dev), 507 ) 508 if err != nil { 509 l.Error("service auth failed", "err", err) 510 s.pages.Notice(w, "repo", "Failed to reach PDS.") 511 return 512 } 513 514 xe := tangled.RepoCreate( 515 r.Context(), 516 client, 517 &tangled.RepoCreate_Input{ 518 Rkey: rkey, 519 }, 520 ) 521 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 522 l.Error("xrpc error", "xe", xe) 523 s.pages.Notice(w, "repo", err.Error()) 524 return 525 } 526 527 err = db.AddRepo(tx, repo) 528 if err != nil { 529 l.Error("db write failed", "err", err) 530 s.pages.Notice(w, "repo", "Failed to save repository information.") 531 return 532 } 533 534 // acls 535 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 536 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 537 if err != nil { 538 l.Error("acl setup failed", "err", err) 539 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 540 return 541 } 542 543 err = tx.Commit() 544 if err != nil { 545 l.Error("txn commit failed", "err", err) 546 http.Error(w, err.Error(), http.StatusInternalServerError) 547 return 548 } 549 550 err = s.enforcer.E.SavePolicy() 551 if err != nil { 552 l.Error("acl save failed", "err", err) 553 http.Error(w, err.Error(), http.StatusInternalServerError) 554 return 555 } 556 557 // reset the ATURI because the transaction completed successfully 558 aturi = "" 559 560 s.notifier.NewRepo(r.Context(), repo) 561 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 562 } 563} 564 565// this is used to rollback changes made to the PDS 566// 567// it is a no-op if the provided ATURI is empty 568func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 569 if aturi == "" { 570 return nil 571 } 572 573 parsed := syntax.ATURI(aturi) 574 575 collection := parsed.Collection().String() 576 repo := parsed.Authority().String() 577 rkey := parsed.RecordKey().String() 578 579 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 580 Collection: collection, 581 Repo: repo, 582 Rkey: rkey, 583 }) 584 return err 585} 586 587func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 588 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 589 if err != nil { 590 return err 591 } 592 // already present 593 if len(defaultLabels) == len(defaults) { 594 return nil 595 } 596 597 labelDefs, err := models.FetchLabelDefs(r, defaults) 598 if err != nil { 599 return err 600 } 601 602 // Insert each label definition to the database 603 for _, labelDef := range labelDefs { 604 _, err = db.AddLabelDefinition(e, &labelDef) 605 if err != nil { 606 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 607 } 608 } 609 610 return nil 611} 612 613func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 614 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 615 if err != nil { 616 logger.Error("failed to resolve tangled.org DID", "err", err) 617 return 618 } 619 620 pdsEndpoint := resolved.PDSEndpoint() 621 if pdsEndpoint == "" { 622 logger.Error("no PDS endpoint found for tangled.sh DID") 623 return 624 } 625 626 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, config.Core.RateLimitBypass, logger) 627 if err != nil { 628 logger.Error("failed to create appassword session... skipping fetch", "err", err) 629 return 630 } 631 632 client := xrpc.Client{ 633 Auth: &xrpc.AuthInfo{ 634 AccessJwt: session.AccessJwt, 635 Did: session.Did, 636 }, 637 Host: session.PdsEndpoint, 638 } 639 640 l := log.SubLogger(logger, "bluesky") 641 642 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 643 defer ticker.Stop() 644 645 for { 646 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 647 if err != nil { 648 l.Error("failed to fetch bluesky posts", "err", err) 649 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 650 l.Error("failed to insert bluesky posts", "err", err) 651 } else { 652 l.Info("inserted bluesky posts", "count", len(posts)) 653 } 654 655 select { 656 case <-ticker.C: 657 case <-ctx.Done(): 658 l.Info("stopping bluesky updater") 659 return 660 } 661 } 662}