A vibe coded tangled fork which supports pijul.
at 1237bf9f58e4ba5d13d5437f2f82a2078572e229 144 lines 4.0 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "fmt" 6 "net/http" 7 _ "net/http/pprof" 8 "time" 9 10 "github.com/go-chi/chi/v5" 11 "github.com/prometheus/client_golang/prometheus/promhttp" 12 "tangled.org/core/idresolver" 13 "tangled.org/core/knotmirror/config" 14 "tangled.org/core/knotmirror/db" 15 "tangled.org/core/knotmirror/knotstream" 16 "tangled.org/core/knotmirror/models" 17 "tangled.org/core/knotmirror/xrpc" 18 "tangled.org/core/log" 19) 20 21func Run(ctx context.Context) error { 22 // make sure every services are cleaned up on fast return 23 ctx, cancel := context.WithCancel(ctx) 24 defer cancel() 25 26 logger := log.FromContext(ctx) 27 cfg, err := config.Load(ctx) 28 if err != nil { 29 return fmt.Errorf("loading config: %w", err) 30 } 31 32 logger.Debug("config loaded:", "config", cfg) 33 34 db, err := db.Make(ctx, cfg.DbUrl, 32) 35 if err != nil { 36 return fmt.Errorf("initializing db: %w", err) 37 } 38 39 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 40 gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL) 41 42 resolver := idresolver.DefaultResolver(cfg.PlcUrl) 43 44 res, err := db.ExecContext(ctx, 45 `update repos set state = $1 where state = $2`, 46 models.RepoStateDesynchronized, 47 models.RepoStateResyncing, 48 ) 49 if err != nil { 50 return fmt.Errorf("clearing resyning states: %w", err) 51 } 52 rows, err := res.RowsAffected() 53 if err != nil { 54 return fmt.Errorf("getting affected rows: %w", err) 55 } 56 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 57 58 xrpc := xrpc.New(logger, cfg, db, resolver) 59 knotstream := knotstream.NewKnotStream(logger, db, cfg) 60 crawler := NewCrawler(logger, db) 61 resyncer := NewResyncer(logger, db, gitm, cfg) 62 adminpage := NewAdminServer(logger, db, resyncer) 63 64 // maintain repository list with tap 65 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 66 tap := NewTapClient(logger, cfg, db, gitm, knotstream) 67 68 // start http server 69 go func() { 70 logger.Info("starting http server", "addr", cfg.Listen) 71 72 mux := chi.NewRouter() 73 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 74 w.Write([]byte("Welcome to a knotmirror server.")) 75 }) 76 mux.Mount("/xrpc", xrpc.Router()) 77 78 if err := http.ListenAndServe(cfg.Listen, mux); err != nil { 79 logger.Error("xrpc server failed", "error", err) 80 } 81 }() 82 83 // start metrics endpoint 84 go func() { 85 metricsAddr := cfg.MetricsListen 86 logger.Info("starting metrics server", "addr", metricsAddr) 87 http.Handle("/metrics", promhttp.Handler()) 88 if err := http.ListenAndServe(metricsAddr, nil); err != nil { 89 logger.Error("metrics server failed", "error", err) 90 } 91 }() 92 93 // start admin page endpoint 94 go func() { 95 logger.Info("starting admin server", "addr", cfg.AdminListen) 96 if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil { 97 logger.Error("admin server failed", "error", err) 98 } 99 }() 100 101 tap.Start(ctx) 102 103 resyncer.Start(ctx) 104 105 // periodically crawl the entire network to mirror the repositories 106 crawler.Start(ctx) 107 108 // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots) 109 knotstream.Start(ctx) 110 111 svcErr := make(chan error, 1) 112 if err := knotstream.ResubscribeAllHosts(ctx); err != nil { 113 svcErr <- fmt.Errorf("resubscribing known hosts: %w", err) 114 } 115 116 logger.Info("startup complete") 117 select { 118 case <-ctx.Done(): 119 logger.Info("received shutdown signal", "reason", ctx.Err()) 120 case err := <-svcErr: 121 if err != nil { 122 logger.Error("service error", "error", err) 123 } 124 cancel() 125 } 126 127 logger.Info("shutting down knotmirror") 128 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 129 defer shutdownCancel() 130 131 var errs []error 132 if err := knotstream.Shutdown(shutdownCtx); err != nil { 133 errs = append(errs, err) 134 } 135 if err := db.Close(); err != nil { 136 errs = append(errs, err) 137 } 138 for _, err := range errs { 139 logger.Error("error during shutdown", "err", err) 140 } 141 142 logger.Info("shutdown complete") 143 return nil 144}