A vibe coded tangled fork which supports pijul.
at sl/knotmirror 146 lines 3.5 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "net/netip" 10 "net/url" 11 "time" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/knotmirror/config" 15 "tangled.org/core/knotmirror/db" 16 "tangled.org/core/knotmirror/knotstream" 17 "tangled.org/core/knotmirror/models" 18 "tangled.org/core/log" 19 "tangled.org/core/tapc" 20) 21 22type Tap struct { 23 logger *slog.Logger 24 cfg *config.Config 25 tap tapc.Client 26 db *sql.DB 27 gitm GitMirrorManager 28 ks *knotstream.KnotStream 29} 30 31func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap { 32 return &Tap{ 33 logger: log.SubLogger(l, "tapclient"), 34 cfg: cfg, 35 tap: tapc.NewClient(cfg.TapUrl, ""), 36 db: db, 37 gitm: gitm, 38 ks: ks, 39 } 40} 41 42func (t *Tap) Start(ctx context.Context) { 43 // TODO: better reconnect logic 44 go func() { 45 for { 46 t.tap.Connect(ctx, &tapc.SimpleIndexer{ 47 EventHandler: t.processEvent, 48 }) 49 time.Sleep(time.Second) 50 } 51 }() 52} 53 54func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 55 l := t.logger.With("component", "tapIndexer") 56 57 var err error 58 switch evt.Type { 59 case tapc.EvtRecord: 60 switch evt.Record.Collection.String() { 61 case tangled.RepoNSID: 62 err = t.processRepo(ctx, evt.Record) 63 } 64 } 65 66 if err != nil { 67 l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 68 return err 69 } 70 return nil 71} 72 73func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 74 switch evt.Action { 75 case tapc.RecordCreateAction, tapc.RecordUpdateAction: 76 record := tangled.Repo{} 77 if err := json.Unmarshal(evt.Record, &record); err != nil { 78 return fmt.Errorf("parsing record: %w", err) 79 } 80 81 status := models.RepoStatePending 82 errMsg := "" 83 u, err := url.Parse("http://" + record.Knot) // parsing with fake scheme 84 if err != nil { 85 status = models.RepoStateSuspended 86 errMsg = "failed to parse knot url" 87 } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) { 88 status = models.RepoStateSuspended 89 errMsg = "suspending non-public knot" 90 } 91 92 repo := &models.Repo{ 93 Did: evt.Did, 94 Rkey: evt.Rkey, 95 Cid: evt.CID, 96 Name: record.Name, 97 KnotDomain: record.Knot, 98 State: status, 99 ErrorMsg: errMsg, 100 RetryAfter: 0, // clear retry info 101 RetryCount: 0, 102 } 103 104 if evt.Action == tapc.RecordUpdateAction { 105 // update git repo remote url 106 if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil { 107 return fmt.Errorf("updating git repo remote url: %w", err) 108 } 109 } 110 111 if err := db.UpsertRepo(ctx, t.db, repo); err != nil { 112 return fmt.Errorf("upserting repo to db: %w", err) 113 } 114 115 if !t.ks.CheckIfSubscribed(record.Knot) { 116 if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil { 117 return fmt.Errorf("subscribing to knot: %w", err) 118 } 119 } 120 121 case tapc.RecordDeleteAction: 122 if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { 123 return fmt.Errorf("deleting repo from db: %w", err) 124 } 125 } 126 return nil 127} 128 129// isPrivate checks if host is private network. It doesn't perform DNS resolution 130func isPrivate(host string) bool { 131 if host == "localhost" { 132 return true 133 } 134 addr, err := netip.ParseAddr(host) 135 if err != nil { 136 return false 137 } 138 return isPrivateAddr(addr) 139} 140 141func isPrivateAddr(addr netip.Addr) bool { 142 return addr.IsLoopback() || 143 addr.IsPrivate() || 144 addr.IsLinkLocalUnicast() || 145 addr.IsLinkLocalMulticast() 146}