A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 _ "net/http/pprof"
8 "time"
9
10 "github.com/prometheus/client_golang/prometheus/promhttp"
11 "tangled.org/core/knotmirror/config"
12 "tangled.org/core/knotmirror/db"
13 "tangled.org/core/knotmirror/knotstream"
14 "tangled.org/core/knotmirror/models"
15 "tangled.org/core/log"
16)
17
18func Run(ctx context.Context) error {
19 // make sure every services are cleaned up on fast return
20 ctx, cancel := context.WithCancel(ctx)
21 defer cancel()
22
23 logger := log.FromContext(ctx)
24 cfg, err := config.Load(ctx)
25 if err != nil {
26 return fmt.Errorf("loading config: %w", err)
27 }
28
29 logger.Debug("config loaded:", "config", cfg)
30
31 db, err := db.Make(ctx, cfg.DbUrl, 32)
32 if err != nil {
33 return fmt.Errorf("initializing db: %w", err)
34 }
35
36 res, err := db.ExecContext(ctx,
37 `update repos set state = $1 where state = $2`,
38 models.RepoStateDesynchronized,
39 models.RepoStateResyncing,
40 )
41 if err != nil {
42 return fmt.Errorf("clearing resyning states: %w", err)
43 }
44 rows, err := res.RowsAffected()
45 if err != nil {
46 return fmt.Errorf("getting affected rows: %w", err)
47 }
48 logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows))
49
50 knotstream := knotstream.NewKnotStream(logger, db, cfg)
51 crawler := NewCrawler(logger, db)
52 resyncer := NewResyncer(logger, db, cfg)
53 adminpage := NewAdminServer(db)
54
55 // maintain repository list with tap
56 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
57 tap := NewTapClient(logger, cfg, db, knotstream)
58
59 // start metrics endpoint
60 go func() {
61 metricsAddr := cfg.MetricsListen
62 logger.Info("starting metrics server", "addr", metricsAddr)
63 http.Handle("/metrics", promhttp.Handler())
64 if err := http.ListenAndServe(metricsAddr, nil); err != nil {
65 logger.Error("metrics server failed", "error", err)
66 }
67 }()
68
69 // start admin page endpoint
70 go func() {
71 logger.Info("starting admin server", "addr", cfg.AdminListen)
72 if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil {
73 logger.Error("admin server failed", "error", err)
74 }
75 }()
76
77 tap.Start(ctx)
78
79 resyncer.Start(ctx)
80
81 // periodically crawl the entire network to mirror the repositories
82 crawler.Start(ctx)
83
84 // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots)
85 knotstream.Start(ctx)
86
87 svcErr := make(chan error, 1)
88 if err := knotstream.ResubscribeAllHosts(ctx); err != nil {
89 svcErr <- fmt.Errorf("resubscribing known hosts: %w", err)
90 }
91
92 logger.Info("startup complete")
93 select {
94 case <-ctx.Done():
95 logger.Info("received shutdown signal", "reason", ctx.Err())
96 case err := <-svcErr:
97 if err != nil {
98 logger.Error("service error", "error", err)
99 }
100 cancel()
101 }
102
103 logger.Info("shutting down knotmirror")
104 shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
105 defer shutdownCancel()
106
107 var errs []error
108 if err := knotstream.Shutdown(shutdownCtx); err != nil {
109 errs = append(errs, err)
110 }
111 if err := db.Close(); err != nil {
112 errs = append(errs, err)
113 }
114 for _, err := range errs {
115 logger.Error("error during shutdown", "err", err)
116 }
117
118 logger.Info("shutdown complete")
119 return nil
120}