A vibe coded tangled fork which supports pijul.
at dda7e5c4760ecd9809464b15499aa4c2bfc41d72 120 lines 3.2 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "fmt" 6 "net/http" 7 _ "net/http/pprof" 8 "time" 9 10 "github.com/prometheus/client_golang/prometheus/promhttp" 11 "tangled.org/core/knotmirror/config" 12 "tangled.org/core/knotmirror/db" 13 "tangled.org/core/knotmirror/knotstream" 14 "tangled.org/core/knotmirror/models" 15 "tangled.org/core/log" 16) 17 18func Run(ctx context.Context) error { 19 // make sure every services are cleaned up on fast return 20 ctx, cancel := context.WithCancel(ctx) 21 defer cancel() 22 23 logger := log.FromContext(ctx) 24 cfg, err := config.Load(ctx) 25 if err != nil { 26 return fmt.Errorf("loading config: %w", err) 27 } 28 29 logger.Debug("config loaded:", "config", cfg) 30 31 db, err := db.Make(ctx, cfg.DbUrl, 32) 32 if err != nil { 33 return fmt.Errorf("initializing db: %w", err) 34 } 35 36 res, err := db.ExecContext(ctx, 37 `update repos set state = $1 where state = $2`, 38 models.RepoStateDesynchronized, 39 models.RepoStateResyncing, 40 ) 41 if err != nil { 42 return fmt.Errorf("clearing resyning states: %w", err) 43 } 44 rows, err := res.RowsAffected() 45 if err != nil { 46 return fmt.Errorf("getting affected rows: %w", err) 47 } 48 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 49 50 knotstream := knotstream.NewKnotStream(logger, db, cfg) 51 crawler := NewCrawler(logger, db) 52 resyncer := NewResyncer(logger, db, cfg) 53 adminpage := NewAdminServer(db) 54 55 // maintain repository list with tap 56 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 57 tap := NewTapClient(logger, cfg, db, knotstream) 58 59 // start metrics endpoint 60 go func() { 61 metricsAddr := cfg.MetricsListen 62 logger.Info("starting metrics server", "addr", metricsAddr) 63 http.Handle("/metrics", promhttp.Handler()) 64 if err := http.ListenAndServe(metricsAddr, nil); err != nil { 65 logger.Error("metrics server failed", "error", err) 66 } 67 }() 68 69 // start admin page endpoint 70 go func() { 71 logger.Info("starting admin server", "addr", cfg.AdminListen) 72 if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil { 73 logger.Error("admin server failed", "error", err) 74 } 75 }() 76 77 tap.Start(ctx) 78 79 resyncer.Start(ctx) 80 81 // periodically crawl the entire network to mirror the repositories 82 crawler.Start(ctx) 83 84 // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots) 85 knotstream.Start(ctx) 86 87 svcErr := make(chan error, 1) 88 if err := knotstream.ResubscribeAllHosts(ctx); err != nil { 89 svcErr <- fmt.Errorf("resubscribing known hosts: %w", err) 90 } 91 92 logger.Info("startup complete") 93 select { 94 case <-ctx.Done(): 95 logger.Info("received shutdown signal", "reason", ctx.Err()) 96 case err := <-svcErr: 97 if err != nil { 98 logger.Error("service error", "error", err) 99 } 100 cancel() 101 } 102 103 logger.Info("shutting down knotmirror") 104 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 105 defer shutdownCancel() 106 107 var errs []error 108 if err := knotstream.Shutdown(shutdownCtx); err != nil { 109 errs = append(errs, err) 110 } 111 if err := db.Close(); err != nil { 112 errs = append(errs, err) 113 } 114 for _, err := range errs { 115 logger.Error("error during shutdown", "err", err) 116 } 117 118 logger.Info("shutdown complete") 119 return nil 120}