A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 _ "net/http/pprof"
8 "time"
9
10 "github.com/go-chi/chi/v5"
11 "github.com/prometheus/client_golang/prometheus/promhttp"
12 "tangled.org/core/idresolver"
13 "tangled.org/core/knotmirror/config"
14 "tangled.org/core/knotmirror/db"
15 "tangled.org/core/knotmirror/knotstream"
16 "tangled.org/core/knotmirror/models"
17 "tangled.org/core/knotmirror/xrpc"
18 "tangled.org/core/log"
19)
20
21func Run(ctx context.Context) error {
22 // make sure every services are cleaned up on fast return
23 ctx, cancel := context.WithCancel(ctx)
24 defer cancel()
25
26 logger := log.FromContext(ctx)
27 cfg, err := config.Load(ctx)
28 if err != nil {
29 return fmt.Errorf("loading config: %w", err)
30 }
31
32 logger.Debug("config loaded:", "config", cfg)
33
34 db, err := db.Make(ctx, cfg.DbUrl, 32)
35 if err != nil {
36 return fmt.Errorf("initializing db: %w", err)
37 }
38
39 // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive.
40 gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL)
41
42 resolver := idresolver.DefaultResolver(cfg.PlcUrl)
43
44 res, err := db.ExecContext(ctx,
45 `update repos set state = $1 where state = $2`,
46 models.RepoStateDesynchronized,
47 models.RepoStateResyncing,
48 )
49 if err != nil {
50 return fmt.Errorf("clearing resyning states: %w", err)
51 }
52 rows, err := res.RowsAffected()
53 if err != nil {
54 return fmt.Errorf("getting affected rows: %w", err)
55 }
56 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows))
57
58 xrpc := xrpc.New(logger, cfg, db, resolver)
59 knotstream := knotstream.NewKnotStream(logger, db, cfg)
60 crawler := NewCrawler(logger, db)
61 resyncer := NewResyncer(logger, db, gitm, cfg)
62 adminpage := NewAdminServer(logger, db, resyncer)
63
64 // maintain repository list with tap
65 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
66 tap := NewTapClient(logger, cfg, db, gitm, knotstream)
67
68 // start http server
69 go func() {
70 logger.Info("starting http server", "addr", cfg.Listen)
71
72 mux := chi.NewRouter()
73 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
74 w.Write([]byte("Welcome to a knotmirror server."))
75 })
76 mux.Mount("/xrpc", xrpc.Router())
77
78 if err := http.ListenAndServe(cfg.Listen, mux); err != nil {
79 logger.Error("xrpc server failed", "error", err)
80 }
81 }()
82
83 // start metrics endpoint
84 go func() {
85 metricsAddr := cfg.MetricsListen
86 logger.Info("starting metrics server", "addr", metricsAddr)
87 http.Handle("/metrics", promhttp.Handler())
88 if err := http.ListenAndServe(metricsAddr, nil); err != nil {
89 logger.Error("metrics server failed", "error", err)
90 }
91 }()
92
93 // start admin page endpoint
94 go func() {
95 logger.Info("starting admin server", "addr", cfg.AdminListen)
96 if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil {
97 logger.Error("admin server failed", "error", err)
98 }
99 }()
100
101 tap.Start(ctx)
102
103 resyncer.Start(ctx)
104
105 // periodically crawl the entire network to mirror the repositories
106 crawler.Start(ctx)
107
108 // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots)
109 knotstream.Start(ctx)
110
111 svcErr := make(chan error, 1)
112 if err := knotstream.ResubscribeAllHosts(ctx); err != nil {
113 svcErr <- fmt.Errorf("resubscribing known hosts: %w", err)
114 }
115
116 logger.Info("startup complete")
117 select {
118 case <-ctx.Done():
119 logger.Info("received shutdown signal", "reason", ctx.Err())
120 case err := <-svcErr:
121 if err != nil {
122 logger.Error("service error", "error", err)
123 }
124 cancel()
125 }
126
127 logger.Info("shutting down knotmirror")
128 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
129 defer shutdownCancel()
130
131 var errs []error
132 if err := knotstream.Shutdown(shutdownCtx); err != nil {
133 errs = append(errs, err)
134 }
135 if err := db.Close(); err != nil {
136 errs = append(errs, err)
137 }
138 for _, err := range errs {
139 logger.Error("error during shutdown", "err", err)
140 }
141
142 logger.Info("shutdown complete")
143 return nil
144}