A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 _ "net/http/pprof"
8 "time"
9
10 "github.com/go-chi/chi/v5"
11 "github.com/prometheus/client_golang/prometheus/promhttp"
12 "tangled.org/core/idresolver"
13 "tangled.org/core/knotmirror/config"
14 "tangled.org/core/knotmirror/db"
15 "tangled.org/core/knotmirror/knotstream"
16 "tangled.org/core/knotmirror/models"
17 "tangled.org/core/knotmirror/xrpc"
18 "tangled.org/core/log"
19)
20
21func Run(ctx context.Context, cfg *config.Config) error {
22 // make sure every services are cleaned up on fast return
23 ctx, cancel := context.WithCancel(ctx)
24 defer cancel()
25
26 logger := log.FromContext(ctx)
27
28 db, err := db.Make(ctx, cfg.DbUrl, 32)
29 if err != nil {
30 return fmt.Errorf("initializing db: %w", err)
31 }
32
33 resolver := idresolver.DefaultResolver(cfg.PlcUrl)
34
35 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive.
36 gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL)
37
38 res, err := db.ExecContext(ctx,
39 `update repos set state = $1 where state = $2`,
40 models.RepoStateDesynchronized,
41 models.RepoStateResyncing,
42 )
43 if err != nil {
44 return fmt.Errorf("clearing resyning states: %w", err)
45 }
46 rows, err := res.RowsAffected()
47 if err != nil {
48 return fmt.Errorf("getting affected rows: %w", err)
49 }
50 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows))
51
52 knotstream := knotstream.NewKnotStream(logger, db, cfg)
53 crawler := NewCrawler(logger, db)
54 resyncer := NewResyncer(logger, db, gitm, cfg)
55 adminpage := NewAdminServer(logger, db, resyncer)
56 xrpc := xrpc.New(logger, cfg, db, resolver, knotstream)
57
58 // maintain repository list with tap
59 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
60 tap := NewTapClient(logger, cfg, db, gitm, knotstream)
61
62 // start http server
63 go func() {
64 logger.Info("starting http server", "addr", cfg.Listen)
65
66 mux := chi.NewRouter()
67 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
68 w.Write([]byte("Welcome to a knotmirror server.\n"))
69 })
70 mux.Mount("/xrpc", xrpc.Router())
71
72 if err := http.ListenAndServe(cfg.Listen, mux); err != nil {
73 logger.Error("xrpc server failed", "error", err)
74 }
75 }()
76
77 // start metrics endpoint
78 go func() {
79 metricsAddr := cfg.MetricsListen
80 logger.Info("starting metrics server", "addr", metricsAddr)
81 http.Handle("/metrics", promhttp.Handler())
82 if err := http.ListenAndServe(metricsAddr, nil); err != nil {
83 logger.Error("metrics server failed", "error", err)
84 }
85 }()
86
87 // start admin page endpoint
88 go func() {
89 logger.Info("starting admin server", "addr", cfg.AdminListen)
90 if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil {
91 logger.Error("admin server failed", "error", err)
92 }
93 }()
94
95 tap.Start(ctx)
96
97 resyncer.Start(ctx)
98
99 // periodically crawl the entire network to mirror the repositories
100 crawler.Start(ctx)
101
102 // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots)
103 knotstream.Start(ctx)
104
105 svcErr := make(chan error, 1)
106 if err := knotstream.ResubscribeAllHosts(ctx); err != nil {
107 svcErr <- fmt.Errorf("resubscribing known hosts: %w", err)
108 }
109
110 logger.Info("startup complete")
111 select {
112 case <-ctx.Done():
113 logger.Info("received shutdown signal", "reason", ctx.Err())
114 case err := <-svcErr:
115 if err != nil {
116 logger.Error("service error", "error", err)
117 }
118 cancel()
119 }
120
121 logger.Info("shutting down knotmirror")
122 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
123 defer shutdownCancel()
124
125 var errs []error
126 if err := knotstream.Shutdown(shutdownCtx); err != nil {
127 errs = append(errs, err)
128 }
129 if err := db.Close(); err != nil {
130 errs = append(errs, err)
131 }
132 for _, err := range errs {
133 logger.Error("error during shutdown", "err", err)
134 }
135
136 logger.Info("shutdown complete")
137 return nil
138}