A vibe coded tangled fork which supports pijul.
at cdcae4dfed24485726686bc8fc5131b818ae56ff 133 lines 3.1 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/netip" 10 "net/url" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/knotmirror/config" 15 "tangled.org/core/knotmirror/db" 16 "tangled.org/core/knotmirror/knotstream" 17 "tangled.org/core/knotmirror/models" 18 "tangled.org/core/log" 19 "tangled.org/core/tapc" 20) 21 22type Tap struct { 23 logger *slog.Logger 24 cfg *config.Config 25 tap tapc.Client 26 db *sql.DB 27 ks *knotstream.KnotStream 28} 29 30func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, ks *knotstream.KnotStream) *Tap { 31 return &Tap{ 32 logger: log.SubLogger(l, "tapclient"), 33 cfg: cfg, 34 tap: tapc.NewClient(cfg.TapUrl, ""), 35 db: db, 36 ks: ks, 37 } 38} 39 40func (t *Tap) Start(ctx context.Context) { 41 // TODO: better reconnect logic 42 go func() { 43 for { 44 t.tap.Connect(ctx, &tapc.SimpleIndexer{ 45 EventHandler: t.processEvent, 46 }) 47 time.Sleep(time.Second) 48 } 49 }() 50} 51 52func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 53 l := t.logger.With("component", "tapIndexer") 54 55 var err error 56 switch evt.Type { 57 case tapc.EvtRecord: 58 switch evt.Record.Collection.String() { 59 case tangled.RepoNSID: 60 err = t.processRepo(ctx, evt.Record) 61 } 62 } 63 64 if err != nil { 65 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 66 return err 67 } 68 return nil 69} 70 71func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 72 switch evt.Action { 73 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 74 record := tangled.Repo{} 75 if err := json.Unmarshal(evt.Record, &record); err != nil { 76 return fmt.Errorf("parsing record: %w", err) 77 } 78 79 status := models.RepoStatePending 80 errMsg := "" 81 u, err := url.Parse("http://" + record.Knot) // parsing with fake scheme 82 if err != nil { 83 status = models.RepoStateSuspended 84 errMsg = "failed to parse knot url" 85 } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) { 86 status = models.RepoStateSuspended 87 errMsg = "suspending non-public knot" 88 } 89 90 if err := db.UpsertRepo(ctx, t.db, &models.Repo{ 91 Did: evt.Did, 92 Rkey: evt.Rkey, 93 Cid: evt.CID, 94 Name: record.Name, 95 KnotDomain: record.Knot, 96 State: status, 97 ErrorMsg: errMsg, 98 }); err != nil { 99 return fmt.Errorf("upserting repo to db: %w", err) 100 } 101 102 if !t.ks.CheckIfSubscribed(record.Knot) { 103 if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil { 104 return fmt.Errorf("subscribing to knot: %w", err) 105 } 106 } 107 108 case tapc.RecordDeleteAction: 109 if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { 110 return fmt.Errorf("deleting repo from db: %w", err) 111 } 112 } 113 return nil 114} 115 116// isPrivate checks if host is private network. It doesn't perform DNS resolution 117func isPrivate(host string) bool { 118 if host == "localhost" { 119 return true 120 } 121 addr, err := netip.ParseAddr(host) 122 if err != nil { 123 return false 124 } 125 return isPrivateAddr(addr) 126} 127 128func isPrivateAddr(addr netip.Addr) bool { 129 return addr.IsLoopback() || 130 addr.IsPrivate() || 131 addr.IsLinkLocalUnicast() || 132 addr.IsLinkLocalMulticast() 133}