A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/netip"
10 "net/url"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/knotmirror/config"
15 "tangled.org/core/knotmirror/db"
16 "tangled.org/core/knotmirror/knotstream"
17 "tangled.org/core/knotmirror/models"
18 "tangled.org/core/log"
19 "tangled.org/core/tapc"
20)
21
22type Tap struct {
23 logger *slog.Logger
24 cfg *config.Config
25 tap tapc.Client
26 db *sql.DB
27 gitm GitMirrorManager
28 ks *knotstream.KnotStream
29}
30
31func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap {
32 return &Tap{
33 logger: log.SubLogger(l, "tapclient"),
34 cfg: cfg,
35 tap: tapc.NewClient(cfg.TapUrl, ""),
36 db: db,
37 gitm: gitm,
38 ks: ks,
39 }
40}
41
42func (t *Tap) Start(ctx context.Context) {
43 // TODO: better reconnect logic
44 go func() {
45 for {
46 t.tap.Connect(ctx, &tapc.SimpleIndexer{
47 EventHandler: t.processEvent,
48 })
49 time.Sleep(time.Second)
50 }
51 }()
52}
53
54func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
55 l := t.logger.With("component", "tapIndexer")
56
57 var err error
58 switch evt.Type {
59 case tapc.EvtRecord:
60 switch evt.Record.Collection.String() {
61 case tangled.RepoNSID:
62 err = t.processRepo(ctx, evt.Record)
63 }
64 }
65
66 if err != nil {
67 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
68 return err
69 }
70 return nil
71}
72
73func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
74 switch evt.Action {
75 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
76 record := tangled.Repo{}
77 if err := json.Unmarshal(evt.Record, &record); err != nil {
78 return fmt.Errorf("parsing record: %w", err)
79 }
80
81 status := models.RepoStatePending
82 errMsg := ""
83 u, err := url.Parse("http://" + record.Knot) // parsing with fake scheme
84 if err != nil {
85 status = models.RepoStateSuspended
86 errMsg = "failed to parse knot url"
87 } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) {
88 status = models.RepoStateSuspended
89 errMsg = "suspending non-public knot"
90 }
91
92 repo := &models.Repo{
93 Did: evt.Did,
94 Rkey: evt.Rkey,
95 Cid: evt.CID,
96 Name: record.Name,
97 KnotDomain: record.Knot,
98 State: status,
99 ErrorMsg: errMsg,
100 RetryAfter: 0, // clear retry info
101 RetryCount: 0,
102 }
103
104 if evt.Action == tapc.RecordUpdateAction {
105 // update git repo remote url
106 if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil {
107 return fmt.Errorf("updating git repo remote url: %w", err)
108 }
109 }
110
111 if err := db.UpsertRepo(ctx, t.db, repo); err != nil {
112 return fmt.Errorf("upserting repo to db: %w", err)
113 }
114
115 if !t.ks.CheckIfSubscribed(record.Knot) {
116 if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil {
117 return fmt.Errorf("subscribing to knot: %w", err)
118 }
119 }
120
121 case tapc.RecordDeleteAction:
122 if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil {
123 return fmt.Errorf("deleting repo from db: %w", err)
124 }
125 }
126 return nil
127}
128
129// isPrivate checks if host is private network. It doesn't perform DNS resolution
130func isPrivate(host string) bool {
131 if host == "localhost" {
132 return true
133 }
134 addr, err := netip.ParseAddr(host)
135 if err != nil {
136 return false
137 }
138 return isPrivateAddr(addr)
139}
140
141func isPrivateAddr(addr netip.Addr) bool {
142 return addr.IsLoopback() ||
143 addr.IsPrivate() ||
144 addr.IsLinkLocalUnicast() ||
145 addr.IsLinkLocalMulticast()
146}