A vibe coded tangled fork which supports pijul.
at master 686 lines 18 kB view raw
1package state 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview" 15 "tangled.org/core/appview/bsky" 16 "tangled.org/core/appview/cloudflare" 17 "tangled.org/core/appview/config" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/indexer" 20 "tangled.org/core/appview/mentions" 21 "tangled.org/core/appview/models" 22 "tangled.org/core/appview/notify" 23 dbnotify "tangled.org/core/appview/notify/db" 24 phnotify "tangled.org/core/appview/notify/posthog" 25 "tangled.org/core/appview/oauth" 26 "tangled.org/core/appview/pages" 27 "tangled.org/core/appview/reporesolver" 28 "tangled.org/core/appview/validator" 29 xrpcclient "tangled.org/core/appview/xrpcclient" 30 "tangled.org/core/consts" 31 "tangled.org/core/eventconsumer" 32 "tangled.org/core/idresolver" 33 "tangled.org/core/jetstream" 34 "tangled.org/core/log" 35 tlog "tangled.org/core/log" 36 "tangled.org/core/orm" 37 "tangled.org/core/rbac" 38 "tangled.org/core/tid" 39 40 comatproto "github.com/bluesky-social/indigo/api/atproto" 41 atpclient "github.com/bluesky-social/indigo/atproto/client" 42 "github.com/bluesky-social/indigo/atproto/syntax" 43 lexutil "github.com/bluesky-social/indigo/lex/util" 44 "github.com/bluesky-social/indigo/xrpc" 45 securejoin "github.com/cyphar/filepath-securejoin" 46 "github.com/go-chi/chi/v5" 47 "github.com/posthog/posthog-go" 48) 49 50type State struct { 51 db *db.DB 52 notifier notify.Notifier 53 indexer *indexer.Indexer 54 oauth *oauth.OAuth 55 enforcer *rbac.Enforcer 56 pages *pages.Pages 57 idResolver *idresolver.Resolver 58 mentionsResolver *mentions.Resolver 59 posthog posthog.Client 60 jc *jetstream.JetstreamClient 61 config *config.Config 62 repoResolver *reporesolver.RepoResolver 63 knotstream *eventconsumer.Consumer 64 spindlestream *eventconsumer.Consumer 65 logger *slog.Logger 66 validator *validator.Validator 67 cfClient *cloudflare.Client 68} 69 70func Make(ctx context.Context, config *config.Config) (*State, error) { 71 logger := tlog.FromContext(ctx) 72 73 d, err := db.Make(ctx, config.Core.DbPath) 74 if err != nil { 75 return nil, fmt.Errorf("failed to create db: %w", err) 76 } 77 78 indexer := indexer.New(log.SubLogger(logger, "indexer")) 79 err = indexer.Init(ctx, d) 80 if err != nil { 81 return nil, fmt.Errorf("failed to create indexer: %w", err) 82 } 83 84 enforcer, err := rbac.NewEnforcer(config.Core.DbPath) 85 if err != nil { 86 return nil, fmt.Errorf("failed to create enforcer: %w", err) 87 } 88 89 res, err := idresolver.RedisResolver(config.Redis.ToURL(), config.Plc.PLCURL) 90 if err != nil { 91 logger.Error("failed to create redis resolver", "err", err) 92 res = idresolver.DefaultResolver(config.Plc.PLCURL) 93 } 94 95 posthog, err := posthog.NewWithConfig(config.Posthog.ApiKey, posthog.Config{Endpoint: config.Posthog.Endpoint}) 96 if err != nil { 97 return nil, fmt.Errorf("failed to create posthog client: %w", err) 98 } 99 100 pages := pages.NewPages(config, res, d, log.SubLogger(logger, "pages")) 101 oauth, err := oauth.New(config, posthog, d, enforcer, res, log.SubLogger(logger, "oauth")) 102 if err != nil { 103 return nil, fmt.Errorf("failed to start oauth handler: %w", err) 104 } 105 validator := validator.New(d, res, enforcer) 106 107 repoResolver := reporesolver.New(config, enforcer, d) 108 109 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 110 111 wrapper := db.DbWrapper{Execer: d} 112 jc, err := jetstream.NewJetstreamClient( 113 config.Jetstream.Endpoint, 114 "appview", 115 []string{ 116 tangled.GraphFollowNSID, 117 tangled.FeedStarNSID, 118 tangled.PublicKeyNSID, 119 tangled.RepoArtifactNSID, 120 tangled.ActorProfileNSID, 121 tangled.KnotMemberNSID, 122 tangled.SpindleMemberNSID, 123 tangled.SpindleNSID, 124 tangled.StringNSID, 125 tangled.RepoIssueNSID, 126 tangled.RepoIssueCommentNSID, 127 tangled.LabelDefinitionNSID, 128 tangled.LabelOpNSID, 129 }, 130 nil, 131 tlog.SubLogger(logger, "jetstream"), 132 wrapper, 133 false, 134 135 // in-memory filter is inapplicable to appview so 136 // we'll never log dids anyway. 137 false, 138 ) 139 if err != nil { 140 return nil, fmt.Errorf("failed to create jetstream client: %w", err) 141 } 142 143 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 144 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 145 } 146 147 ingester := appview.Ingester{ 148 Db: wrapper, 149 Enforcer: enforcer, 150 IdResolver: res, 151 Config: config, 152 Logger: log.SubLogger(logger, "ingester"), 153 Validator: validator, 154 } 155 err = jc.StartJetstream(ctx, ingester.Ingest()) 156 if err != nil { 157 return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 158 } 159 160 var notifiers []notify.Notifier 161 162 // Always add the database notifier 163 notifiers = append(notifiers, dbnotify.NewDatabaseNotifier(d, res)) 164 165 // Add other notifiers in production only 166 if !config.Core.Dev { 167 notifiers = append(notifiers, phnotify.NewPosthogNotifier(posthog)) 168 } 169 notifiers = append(notifiers, indexer) 170 171 // Add webhook notifier 172 notifiers = append(notifiers, notify.NewWebhookNotifier(d)) 173 174 notifier := notify.NewMergedNotifier(notifiers) 175 notifier = notify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 176 177 var cfClient *cloudflare.Client 178 if config.Cloudflare.ApiToken != "" { 179 cfClient, err = cloudflare.New(config) 180 if err != nil { 181 logger.Warn("failed to create cloudflare client, sites upload will be disabled", "err", err) 182 cfClient = nil 183 } 184 } 185 186 knotstream, err := Knotstream(ctx, config, d, enforcer, posthog, notifier, cfClient) 187 if err != nil { 188 return nil, fmt.Errorf("failed to start knotstream consumer: %w", err) 189 } 190 knotstream.Start(ctx) 191 192 spindlestream, err := Spindlestream(ctx, config, d, enforcer) 193 if err != nil { 194 return nil, fmt.Errorf("failed to start spindlestream consumer: %w", err) 195 } 196 spindlestream.Start(ctx) 197 198 state := &State{ 199 db: d, 200 notifier: notifier, 201 indexer: indexer, 202 oauth: oauth, 203 enforcer: enforcer, 204 pages: pages, 205 idResolver: res, 206 mentionsResolver: mentionsResolver, 207 posthog: posthog, 208 jc: jc, 209 config: config, 210 repoResolver: repoResolver, 211 knotstream: knotstream, 212 spindlestream: spindlestream, 213 logger: logger, 214 validator: validator, 215 cfClient: cfClient, 216 } 217 218 // fetch initial bluesky posts if configured 219 go fetchBskyPosts(ctx, res, config, d, logger) 220 221 return state, nil 222} 223 224func (s *State) Close() error { 225 // other close up logic goes here 226 return s.db.Close() 227} 228 229func (s *State) RobotsTxt(w http.ResponseWriter, r *http.Request) { 230 w.Header().Set("Content-Type", "text/plain") 231 w.Header().Set("Cache-Control", "public, max-age=86400") // one day 232 233 robotsTxt := `# Hello, Tanglers! 234User-agent: * 235Allow: / 236Disallow: /*/*/settings 237Disallow: /settings 238Disallow: /*/*/compare 239Disallow: /*/*/fork 240 241Crawl-delay: 1 242` 243 w.Write([]byte(robotsTxt)) 244} 245 246func (s *State) TermsOfService(w http.ResponseWriter, r *http.Request) { 247 user := s.oauth.GetMultiAccountUser(r) 248 s.pages.TermsOfService(w, pages.TermsOfServiceParams{ 249 LoggedInUser: user, 250 }) 251} 252 253func (s *State) PrivacyPolicy(w http.ResponseWriter, r *http.Request) { 254 user := s.oauth.GetMultiAccountUser(r) 255 s.pages.PrivacyPolicy(w, pages.PrivacyPolicyParams{ 256 LoggedInUser: user, 257 }) 258} 259 260func (s *State) Brand(w http.ResponseWriter, r *http.Request) { 261 user := s.oauth.GetMultiAccountUser(r) 262 s.pages.Brand(w, pages.BrandParams{ 263 LoggedInUser: user, 264 }) 265} 266 267func (s *State) UpgradeBanner(w http.ResponseWriter, r *http.Request) { 268 user := s.oauth.GetMultiAccountUser(r) 269 if user == nil { 270 return 271 } 272 273 l := s.logger.With("handler", "UpgradeBanner") 274 l = l.With("did", user.Active.Did) 275 276 regs, err := db.GetRegistrations( 277 s.db, 278 orm.FilterEq("did", user.Active.Did), 279 orm.FilterEq("needs_upgrade", 1), 280 ) 281 if err != nil { 282 l.Error("non-fatal: failed to get registrations", "err", err) 283 } 284 285 spindles, err := db.GetSpindles( 286 s.db, 287 orm.FilterEq("owner", user.Active.Did), 288 orm.FilterEq("needs_upgrade", 1), 289 ) 290 if err != nil { 291 l.Error("non-fatal: failed to get spindles", "err", err) 292 } 293 294 if regs == nil && spindles == nil { 295 return 296 } 297 298 s.pages.UpgradeBanner(w, pages.UpgradeBannerParams{ 299 Registrations: regs, 300 Spindles: spindles, 301 }) 302} 303 304func (s *State) Keys(w http.ResponseWriter, r *http.Request) { 305 user := chi.URLParam(r, "user") 306 user = strings.TrimPrefix(user, "@") 307 308 if user == "" { 309 w.WriteHeader(http.StatusBadRequest) 310 return 311 } 312 313 id, err := s.idResolver.ResolveIdent(r.Context(), user) 314 if err != nil { 315 w.WriteHeader(http.StatusInternalServerError) 316 return 317 } 318 319 pubKeys, err := db.GetPublicKeysForDid(s.db, id.DID.String()) 320 if err != nil { 321 s.logger.Error("failed to get public keys", "err", err) 322 http.Error(w, "failed to get public keys", http.StatusInternalServerError) 323 return 324 } 325 326 if len(pubKeys) == 0 { 327 w.WriteHeader(http.StatusNoContent) 328 return 329 } 330 331 for _, k := range pubKeys { 332 key := strings.TrimRight(k.Key, "\n") 333 fmt.Fprintln(w, key) 334 } 335} 336 337func validateRepoName(name string) error { 338 // check for path traversal attempts 339 if name == "." || name == ".." || 340 strings.Contains(name, "/") || strings.Contains(name, "\\") { 341 return fmt.Errorf("Repository name contains invalid path characters") 342 } 343 344 // check for sequences that could be used for traversal when normalized 345 if strings.Contains(name, "./") || strings.Contains(name, "../") || 346 strings.HasPrefix(name, ".") || strings.HasSuffix(name, ".") { 347 return fmt.Errorf("Repository name contains invalid path sequence") 348 } 349 350 // then continue with character validation 351 for _, char := range name { 352 if !((char >= 'a' && char <= 'z') || 353 (char >= 'A' && char <= 'Z') || 354 (char >= '0' && char <= '9') || 355 char == '-' || char == '_' || char == '.') { 356 return fmt.Errorf("Repository name can only contain alphanumeric characters, periods, hyphens, and underscores") 357 } 358 } 359 360 // additional check to prevent multiple sequential dots 361 if strings.Contains(name, "..") { 362 return fmt.Errorf("Repository name cannot contain sequential dots") 363 } 364 365 // if all checks pass 366 return nil 367} 368 369func stripGitExt(name string) string { 370 return strings.TrimSuffix(name, ".git") 371} 372 373func (s *State) NewRepo(w http.ResponseWriter, r *http.Request) { 374 switch r.Method { 375 case http.MethodGet: 376 user := s.oauth.GetMultiAccountUser(r) 377 knots, err := s.enforcer.GetKnotsForUser(user.Active.Did) 378 if err != nil { 379 s.pages.Notice(w, "repo", "Invalid user account.") 380 return 381 } 382 383 s.pages.NewRepo(w, pages.NewRepoParams{ 384 LoggedInUser: user, 385 Knots: knots, 386 }) 387 388 case http.MethodPost: 389 l := s.logger.With("handler", "NewRepo") 390 391 user := s.oauth.GetMultiAccountUser(r) 392 l = l.With("did", user.Active.Did) 393 394 // form validation 395 domain := r.FormValue("domain") 396 if domain == "" { 397 s.pages.Notice(w, "repo", "Invalid form submission&mdash;missing knot domain.") 398 return 399 } 400 l = l.With("knot", domain) 401 402 repoName := r.FormValue("name") 403 if repoName == "" { 404 s.pages.Notice(w, "repo", "Repository name cannot be empty.") 405 return 406 } 407 408 if err := validateRepoName(repoName); err != nil { 409 s.pages.Notice(w, "repo", err.Error()) 410 return 411 } 412 repoName = stripGitExt(repoName) 413 l = l.With("repoName", repoName) 414 415 defaultBranch := r.FormValue("branch") 416 if defaultBranch == "" { 417 defaultBranch = "main" 418 } 419 l = l.With("defaultBranch", defaultBranch) 420 421 vcs := strings.ToLower(strings.TrimSpace(r.FormValue("vcs"))) 422 if vcs == "" { 423 vcs = "git" 424 } 425 switch vcs { 426 case "git", "pijul": 427 default: 428 s.pages.Notice(w, "repo", "Invalid repository type.") 429 return 430 } 431 l = l.With("vcs", vcs) 432 433 description := r.FormValue("description") 434 if len([]rune(description)) > 140 { 435 s.pages.Notice(w, "repo", "Description must be 140 characters or fewer.") 436 return 437 } 438 439 // ACL validation 440 ok, err := s.enforcer.E.Enforce(user.Active.Did, domain, domain, "repo:create") 441 if err != nil || !ok { 442 l.Info("unauthorized") 443 s.pages.Notice(w, "repo", "You do not have permission to create a repo in this knot.") 444 return 445 } 446 447 // Check for existing repos 448 existingRepo, err := db.GetRepo( 449 s.db, 450 orm.FilterEq("did", user.Active.Did), 451 orm.FilterEq("name", repoName), 452 ) 453 if err == nil && existingRepo != nil { 454 l.Info("repo exists") 455 s.pages.Notice(w, "repo", fmt.Sprintf("You already have a repository by this name on %s", existingRepo.Knot)) 456 return 457 } 458 459 // create atproto record for this repo 460 rkey := tid.TID() 461 repo := &models.Repo{ 462 Did: user.Active.Did, 463 Name: repoName, 464 Knot: domain, 465 Rkey: rkey, 466 Description: description, 467 Created: time.Now(), 468 Labels: s.config.Label.DefaultLabelDefs, 469 Vcs: vcs, 470 } 471 record := repo.AsRecord() 472 473 atpClient, err := s.oauth.AuthorizedClient(r) 474 if err != nil { 475 l.Info("PDS write failed", "err", err) 476 s.pages.Notice(w, "repo", "Failed to write record to PDS.") 477 return 478 } 479 480 atresp, err := comatproto.RepoPutRecord(r.Context(), atpClient, &comatproto.RepoPutRecord_Input{ 481 Collection: tangled.RepoNSID, 482 Repo: user.Active.Did, 483 Rkey: rkey, 484 Record: &lexutil.LexiconTypeDecoder{ 485 Val: &record, 486 }, 487 }) 488 if err != nil { 489 l.Info("PDS write failed", "err", err) 490 s.pages.Notice(w, "repo", "Failed to announce repository creation.") 491 return 492 } 493 494 aturi := atresp.Uri 495 l = l.With("aturi", aturi) 496 l.Info("wrote to PDS") 497 498 tx, err := s.db.BeginTx(r.Context(), nil) 499 if err != nil { 500 l.Info("txn failed", "err", err) 501 s.pages.Notice(w, "repo", "Failed to save repository information.") 502 return 503 } 504 505 // The rollback function reverts a few things on failure: 506 // - the pending txn 507 // - the ACLs 508 // - the atproto record created 509 rollback := func() { 510 err1 := tx.Rollback() 511 err2 := s.enforcer.E.LoadPolicy() 512 err3 := rollbackRecord(context.Background(), aturi, atpClient) 513 514 // ignore txn complete errors, this is okay 515 if errors.Is(err1, sql.ErrTxDone) { 516 err1 = nil 517 } 518 519 if errs := errors.Join(err1, err2, err3); errs != nil { 520 l.Error("failed to rollback changes", "errs", errs) 521 return 522 } 523 } 524 defer rollback() 525 526 client, err := s.oauth.ServiceClient( 527 r, 528 oauth.WithService(domain), 529 oauth.WithLxm(tangled.RepoCreateNSID), 530 oauth.WithDev(s.config.Core.Dev), 531 ) 532 if err != nil { 533 l.Error("service auth failed", "err", err) 534 s.pages.Notice(w, "repo", "Failed to reach PDS.") 535 return 536 } 537 538 xe := tangled.RepoCreate( 539 r.Context(), 540 client, 541 &tangled.RepoCreate_Input{ 542 Rkey: rkey, 543 Vcs: &vcs, 544 }, 545 ) 546 if err := xrpcclient.HandleXrpcErr(xe); err != nil { 547 l.Error("xrpc error", "xe", xe) 548 s.pages.Notice(w, "repo", err.Error()) 549 return 550 } 551 552 err = db.AddRepo(tx, repo) 553 if err != nil { 554 l.Error("db write failed", "err", err) 555 s.pages.Notice(w, "repo", "Failed to save repository information.") 556 return 557 } 558 559 // acls 560 p, _ := securejoin.SecureJoin(user.Active.Did, repoName) 561 err = s.enforcer.AddRepo(user.Active.Did, domain, p) 562 if err != nil { 563 l.Error("acl setup failed", "err", err) 564 s.pages.Notice(w, "repo", "Failed to set up repository permissions.") 565 return 566 } 567 err = tx.Commit() 568 if err != nil { 569 l.Error("txn commit failed", "err", err) 570 http.Error(w, err.Error(), http.StatusInternalServerError) 571 return 572 } 573 574 err = s.enforcer.E.SavePolicy() 575 if err != nil { 576 l.Error("acl save failed", "err", err) 577 http.Error(w, err.Error(), http.StatusInternalServerError) 578 return 579 } 580 581 // reset the ATURI because the transaction completed successfully 582 aturi = "" 583 584 s.notifier.NewRepo(r.Context(), repo) 585 s.pages.HxLocation(w, fmt.Sprintf("/%s/%s", user.Active.Did, repoName)) 586 } 587} 588 589// this is used to rollback changes made to the PDS 590// 591// it is a no-op if the provided ATURI is empty 592func rollbackRecord(ctx context.Context, aturi string, client *atpclient.APIClient) error { 593 if aturi == "" { 594 return nil 595 } 596 597 parsed := syntax.ATURI(aturi) 598 599 collection := parsed.Collection().String() 600 repo := parsed.Authority().String() 601 rkey := parsed.RecordKey().String() 602 603 _, err := comatproto.RepoDeleteRecord(ctx, client, &comatproto.RepoDeleteRecord_Input{ 604 Collection: collection, 605 Repo: repo, 606 Rkey: rkey, 607 }) 608 return err 609} 610 611func BackfillDefaultDefs(e db.Execer, r *idresolver.Resolver, defaults []string) error { 612 defaultLabels, err := db.GetLabelDefinitions(e, orm.FilterIn("at_uri", defaults)) 613 if err != nil { 614 return err 615 } 616 // already present 617 if len(defaultLabels) == len(defaults) { 618 return nil 619 } 620 621 labelDefs, err := models.FetchLabelDefs(r, defaults) 622 if err != nil { 623 return err 624 } 625 626 // Insert each label definition to the database 627 for _, labelDef := range labelDefs { 628 _, err = db.AddLabelDefinition(e, &labelDef) 629 if err != nil { 630 return fmt.Errorf("failed to add label definition %s: %v", labelDef.Name, err) 631 } 632 } 633 634 return nil 635} 636 637func fetchBskyPosts(ctx context.Context, res *idresolver.Resolver, config *config.Config, d *db.DB, logger *slog.Logger) { 638 resolved, err := res.ResolveIdent(context.Background(), consts.TangledDid) 639 if err != nil { 640 logger.Error("failed to resolve tangled.org DID", "err", err) 641 return 642 } 643 644 pdsEndpoint := resolved.PDSEndpoint() 645 if pdsEndpoint == "" { 646 logger.Error("no PDS endpoint found for tangled.sh DID") 647 return 648 } 649 650 session, err := oauth.CreateAppPasswordSession(res, config.Core.AppPassword, consts.TangledDid, logger) 651 if err != nil { 652 logger.Error("failed to create appassword session... skipping fetch", "err", err) 653 return 654 } 655 656 client := xrpc.Client{ 657 Auth: &xrpc.AuthInfo{ 658 AccessJwt: session.AccessJwt, 659 Did: session.Did, 660 }, 661 Host: session.PdsEndpoint, 662 } 663 664 l := log.SubLogger(logger, "bluesky") 665 666 ticker := time.NewTicker(config.Bluesky.UpdateInterval) 667 defer ticker.Stop() 668 669 for { 670 posts, _, err := bsky.FetchPosts(ctx, &client, 20, "") 671 if err != nil { 672 l.Error("failed to fetch bluesky posts", "err", err) 673 } else if err := db.InsertBlueskyPosts(d, posts); err != nil { 674 l.Error("failed to insert bluesky posts", "err", err) 675 } else { 676 l.Info("inserted bluesky posts", "count", len(posts)) 677 } 678 679 select { 680 case <-ticker.C: 681 case <-ctx.Done(): 682 l.Info("stopping bluesky updater") 683 return 684 } 685 } 686}