A vibe coded tangled fork which supports pijul.
1package knotstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "net/http"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/util/ssrf"
16 "github.com/carlmjohnson/versioninfo"
17 "github.com/gorilla/websocket"
18 "tangled.org/core/api/tangled"
19 "tangled.org/core/knotmirror/config"
20 "tangled.org/core/knotmirror/db"
21 "tangled.org/core/knotmirror/models"
22 "tangled.org/core/log"
23)
24
25type KnotSlurper struct {
26 logger *slog.Logger
27 db *sql.DB
28 cfg config.SlurperConfig
29
30 subsLk sync.Mutex
31 subs map[string]*subscription
32}
33
34func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg config.SlurperConfig) *KnotSlurper {
35 return &KnotSlurper{
36 logger: log.SubLogger(l, "slurper"),
37 db: db,
38 cfg: cfg,
39 subs: make(map[string]*subscription),
40 }
41}
42
43func (s *KnotSlurper) Run(ctx context.Context) {
44 for {
45 select {
46 case <-ctx.Done():
47 return
48 case <-time.After(s.cfg.PersistCursorPeriod):
49 if err := s.persistCursors(ctx); err != nil {
50 s.logger.Error("failed to flush cursors", "err", err)
51 }
52 }
53 }
54}
55
56func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool {
57 s.subsLk.Lock()
58 defer s.subsLk.Unlock()
59
60 _, ok := s.subs[hostname]
61 return ok
62}
63
64func (s *KnotSlurper) Shutdown(ctx context.Context) error {
65 s.logger.Info("starting shutdown host cursor flush")
66 err := s.persistCursors(ctx)
67 if err != nil {
68 s.logger.Error("shutdown error", "err", err)
69 }
70 s.logger.Info("slurper shutdown complete")
71 return err
72}
73
74func (s *KnotSlurper) persistCursors(ctx context.Context) error {
75 // // gather cursor list from subscriptions and store them to DB
76 // start := time.Now()
77
78 s.subsLk.Lock()
79 cursors := make([]models.HostCursor, len(s.subs))
80 i := 0
81 for _, sub := range s.subs {
82 cursors[i] = sub.HostCursor()
83 i++
84 }
85 s.subsLk.Unlock()
86
87 err := db.StoreCursors(ctx, s.db, cursors)
88 // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err)
89 return err
90}
91
92func (s *KnotSlurper) Subscribe(ctx context.Context, host models.Host) error {
93 s.subsLk.Lock()
94 defer s.subsLk.Unlock()
95
96 _, ok := s.subs[host.Hostname]
97 if ok {
98 return fmt.Errorf("already subscribed: %s", host.Hostname)
99 }
100
101 // TODO: include `cancel` function to kill subscription by hostname
102 sub := &subscription{
103 hostname: host.Hostname,
104 scheduler: NewParallelScheduler(
105 s.cfg.ConcurrencyPerHost,
106 host.Hostname,
107 s.ProcessEvent,
108 ),
109 }
110 s.subs[host.Hostname] = sub
111
112 sub.scheduler.Start(ctx)
113 go s.subscribeWithRedialer(ctx, host, sub)
114 return nil
115}
116
117func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) {
118 l := s.logger.With("host", host.Hostname)
119
120 dialer := websocket.Dialer{
121 HandshakeTimeout: time.Second * 5,
122 }
123
124 // if this isn't a localhost / private connection, then we should enable SSRF protections
125 if !host.NoSSL {
126 netDialer := ssrf.PublicOnlyDialer()
127 dialer.NetDialContext = netDialer.DialContext
128 }
129
130 cursor := host.LastSeq
131
132 connectedInbound.Inc()
133 defer connectedInbound.Dec()
134
135 var backoff int
136 for {
137 select {
138 case <-ctx.Done():
139 return
140 default:
141 }
142 u := host.LegacyEventsURL(cursor)
143 l.Debug("made url with cursor", "cursor", cursor, "url", u)
144
145 // NOTE: manual backoff retry implementation to explicitly handle fails
146 hdr := make(http.Header)
147 hdr.Add("User-Agent", userAgent())
148 conn, resp, err := dialer.DialContext(ctx, u, hdr)
149 if err != nil {
150 l.Warn("dialing failed", "err", err, "backoff", backoff)
151 time.Sleep(sleepForBackoff(backoff))
152 backoff++
153 if backoff > 15 {
154 l.Warn("host does not appear to be online, disabling for now")
155 host.Status = models.HostStatusOffline
156 if err := db.UpsertHost(ctx, s.db, &host); err != nil {
157 l.Error("failed to update host status", "err", err)
158 }
159 return
160 }
161 continue
162 }
163
164 l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u)
165
166 if err := s.handleConnection(ctx, conn, sub); err != nil {
167 // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl
168 l.Warn("host connection failed", "err", err, "backoff", backoff)
169 }
170
171 updatedCursor := sub.LastSeq()
172 didProgress := updatedCursor > cursor
173 l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress)
174 if cursor == 0 || didProgress {
175 cursor = updatedCursor
176 backoff = 0
177
178 batch := []models.HostCursor{sub.HostCursor()}
179 if err := db.StoreCursors(ctx, s.db, batch); err != nil {
180 l.Error("failed to store cursors", "err", err)
181 }
182 }
183 }
184}
185
186// handleConnection handles websocket connection.
187// Schedules task from received event and return when connection is closed
188func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error {
189 // ping on every 30s
190 ctx, cancel := context.WithCancel(ctx)
191 defer cancel() // close the background ping job on connection close
192 go func() {
193 t := time.NewTicker(30 * time.Second)
194 defer t.Stop()
195 failcount := 0
196
197 for {
198 select {
199 case <-t.C:
200 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil {
201 s.logger.Warn("failed to ping", "err", err)
202 failcount++
203 if failcount >= 4 {
204 s.logger.Error("too many ping fails", "count", failcount)
205 _ = conn.Close()
206 return
207 }
208 } else {
209 failcount = 0 // ok ping
210 }
211 case <-ctx.Done():
212 _ = conn.Close()
213 return
214 }
215 }
216 }()
217
218 conn.SetPingHandler(func(message string) error {
219 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute))
220 if err == websocket.ErrCloseSent {
221 return nil
222 }
223 return err
224 })
225 conn.SetPongHandler(func(_ string) error {
226 if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil {
227 s.logger.Error("failed to set read deadline", "err", err)
228 }
229 return nil
230 })
231
232 for {
233 select {
234 case <-ctx.Done():
235 return ctx.Err()
236 default:
237 }
238 msgType, msg, err := conn.ReadMessage()
239 if err != nil {
240 return err
241 }
242
243 if msgType != websocket.TextMessage {
244 continue
245 }
246
247 sub.scheduler.AddTask(ctx, &Task{
248 key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency
249 message: msg,
250 })
251 }
252}
253
254type LegacyGitEvent struct {
255 Rkey string
256 Nsid string
257 Event tangled.GitRefUpdate
258}
259
260func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error {
261 var legacyMessage LegacyGitEvent
262 if err := json.Unmarshal(task.message, &legacyMessage); err != nil {
263 return fmt.Errorf("unmarshaling message: %w", err)
264 }
265
266 if err := s.ProcessLegacyGitRefUpdate(ctx, &legacyMessage); err != nil {
267 return fmt.Errorf("processing gitRefUpdate: %w", err)
268 }
269 return nil
270}
271
272func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, evt *LegacyGitEvent) error {
273 knotstreamEventsReceived.Inc()
274
275 curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(evt.Event.RepoDid), evt.Event.RepoName)
276 if err != nil {
277 return fmt.Errorf("failed to get repo '%s': %w", evt.Event.RepoDid+"/"+evt.Event.RepoName, err)
278 }
279 if curr == nil {
280 // if repo doesn't exist in DB, just ignore the event. That repo is unknown.
281 //
282 // Normally did+name is already enough to perform git-fetch as that's
283 // what needed to fetch the repository.
284 // But we want to store that in did/rkey in knot-mirror.
285 // Therefore, we should ignore when the repository is unknown.
286 // Hopefully crawler will sync it later.
287 s.logger.Warn("skipping event from unknown repo", "did/repo", evt.Event.RepoDid+"/"+evt.Event.RepoName)
288 knotstreamEventsSkipped.Inc()
289 return nil
290 }
291 l := s.logger.With("repoAt", curr.AtUri())
292
293 // TODO: should plan resync to resyncBuffer on RepoStateResyncing
294 if curr.State != models.RepoStateActive {
295 l.Debug("skipping non-active repo")
296 knotstreamEventsSkipped.Inc()
297 return nil
298 }
299
300 if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() {
301 l.Debug("skipping replayed event", "event.Rkey", evt.Rkey, "currentRev", curr.GitRev)
302 knotstreamEventsSkipped.Inc()
303 return nil
304 }
305
306 // if curr.State == models.RepoStateResyncing {
307 // firehoseEventsSkipped.Inc()
308 // return fp.events.addToResyncBuffer(ctx, commit)
309 // }
310
311 // can't skip anything, update repo state
312 if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil {
313 return err
314 }
315
316 l.Info("event processed", "eventRev", evt.Rkey)
317
318 knotstreamEventsProcessed.Inc()
319 return nil
320}
321
322func userAgent() string {
323 return fmt.Sprintf("knotmirror/%s", versioninfo.Short())
324}
325
326func sleepForBackoff(b int) time.Duration {
327 if b == 0 {
328 return 0
329 }
330 if b < 10 {
331 return (time.Duration(b) * 2) + (time.Millisecond * time.Duration(rand.Intn(1000)))
332 }
333 return time.Second * 30
334}