A vibe coded tangled fork which supports pijul.
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "net/netip"
10 "net/url"
11 "time"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/knotmirror/config"
15 "tangled.org/core/knotmirror/db"
16 "tangled.org/core/knotmirror/knotstream"
17 "tangled.org/core/knotmirror/models"
18 "tangled.org/core/log"
19 "tangled.org/core/tapc"
20)
21
22type Tap struct {
23 logger *slog.Logger
24 cfg *config.Config
25 tap tapc.Client
26 db *sql.DB
27 ks *knotstream.KnotStream
28}
29
30func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, ks *knotstream.KnotStream) *Tap {
31 return &Tap{
32 logger: log.SubLogger(l, "tapclient"),
33 cfg: cfg,
34 tap: tapc.NewClient(cfg.TapUrl, ""),
35 db: db,
36 ks: ks,
37 }
38}
39
40func (t *Tap) Start(ctx context.Context) {
41 // TODO: better reconnect logic
42 go func() {
43 for {
44 t.tap.Connect(ctx, &tapc.SimpleIndexer{
45 EventHandler: t.processEvent,
46 })
47 time.Sleep(time.Second)
48 }
49 }()
50}
51
52func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error {
53 l := t.logger.With("component", "tapIndexer")
54
55 var err error
56 switch evt.Type {
57 case tapc.EvtRecord:
58 switch evt.Record.Collection.String() {
59 case tangled.RepoNSID:
60 err = t.processRepo(ctx, evt.Record)
61 }
62 }
63
64 if err != nil {
65 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err)
66 return err
67 }
68 return nil
69}
70
71func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error {
72 switch evt.Action {
73 case tapc.RecordCreateAction, tapc.RecordUpdateAction:
74 record := tangled.Repo{}
75 if err := json.Unmarshal(evt.Record, &record); err != nil {
76 return fmt.Errorf("parsing record: %w", err)
77 }
78
79 status := models.RepoStatePending
80 errMsg := ""
81 u, err := url.Parse("http://" + record.Knot) // parsing with fake scheme
82 if err != nil {
83 status = models.RepoStateSuspended
84 errMsg = "failed to parse knot url"
85 } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) {
86 status = models.RepoStateSuspended
87 errMsg = "suspending non-public knot"
88 }
89
90 if err := db.UpsertRepo(ctx, t.db, &models.Repo{
91 Did: evt.Did,
92 Rkey: evt.Rkey,
93 Cid: evt.CID,
94 Name: record.Name,
95 KnotDomain: record.Knot,
96 State: status,
97 ErrorMsg: errMsg,
98 }); err != nil {
99 return fmt.Errorf("upserting repo to db: %w", err)
100 }
101
102 if !t.ks.CheckIfSubscribed(record.Knot) {
103 if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil {
104 return fmt.Errorf("subscribing to knot: %w", err)
105 }
106 }
107
108 case tapc.RecordDeleteAction:
109 if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil {
110 return fmt.Errorf("deleting repo from db: %w", err)
111 }
112 }
113 return nil
114}
115
116// isPrivate checks if host is private network. It doesn't perform DNS resolution
117func isPrivate(host string) bool {
118 if host == "localhost" {
119 return true
120 }
121 addr, err := netip.ParseAddr(host)
122 if err != nil {
123 return false
124 }
125 return isPrivateAddr(addr)
126}
127
128func isPrivateAddr(addr netip.Addr) bool {
129 return addr.IsLoopback() ||
130 addr.IsPrivate() ||
131 addr.IsLinkLocalUnicast() ||
132 addr.IsLinkLocalMulticast()
133}