A vibe coded tangled fork which supports pijul.
1package knotstream
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "net/http"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/bluesky-social/indigo/util/ssrf"
16 "github.com/carlmjohnson/versioninfo"
17 "github.com/gorilla/websocket"
18 "tangled.org/core/api/tangled"
19 "tangled.org/core/knotmirror/config"
20 "tangled.org/core/knotmirror/db"
21 "tangled.org/core/knotmirror/models"
22 "tangled.org/core/log"
23)
24
25type KnotSlurper struct {
26 logger *slog.Logger
27 db *sql.DB
28 cfg config.SlurperConfig
29
30 subsLk sync.Mutex
31 subs map[string]*subscription
32}
33
34func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg config.SlurperConfig) *KnotSlurper {
35 return &KnotSlurper{
36 logger: log.SubLogger(l, "slurper"),
37 db: db,
38 cfg: cfg,
39 subs: make(map[string]*subscription),
40 }
41}
42
43func (s *KnotSlurper) Run(ctx context.Context) {
44 for {
45 select {
46 case <-ctx.Done():
47 return
48 case <-time.After(s.cfg.PersistCursorPeriod):
49 if err := s.persistCursors(ctx); err != nil {
50 s.logger.Error("failed to flush cursors", "err", err)
51 }
52 }
53 }
54}
55
56func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool {
57 s.subsLk.Lock()
58 defer s.subsLk.Unlock()
59
60 _, ok := s.subs[hostname]
61 return ok
62}
63
64func (s *KnotSlurper) Shutdown(ctx context.Context) error {
65 s.logger.Info("starting shutdown host cursor flush")
66 err := s.persistCursors(ctx)
67 if err != nil {
68 s.logger.Error("shutdown error", "err", err)
69 }
70 s.logger.Info("slurper shutdown complete")
71 return err
72}
73
74func (s *KnotSlurper) persistCursors(ctx context.Context) error {
75 // // gather cursor list from subscriptions and store them to DB
76 // start := time.Now()
77
78 s.subsLk.Lock()
79 cursors := make([]models.HostCursor, len(s.subs))
80 i := 0
81 for _, sub := range s.subs {
82 cursors[i] = sub.HostCursor()
83 i++
84 }
85 s.subsLk.Unlock()
86
87 err := db.StoreCursors(ctx, s.db, cursors)
88 // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err)
89 return err
90}
91
92func (s *KnotSlurper) Subscribe(ctx context.Context, host models.Host) error {
93 s.subsLk.Lock()
94 defer s.subsLk.Unlock()
95
96 _, ok := s.subs[host.Hostname]
97 if ok {
98 return fmt.Errorf("already subscribed: %s", host.Hostname)
99 }
100
101 // TODO: include `cancel` function to kill subscription by hostname
102 sub := &subscription{
103 hostname: host.Hostname,
104 scheduler: NewParallelScheduler(
105 s.cfg.ConcurrencyPerHost,
106 host.Hostname,
107 s.ProcessEvent,
108 ),
109 }
110 s.subs[host.Hostname] = sub
111
112 sub.scheduler.Start(ctx)
113 go s.subscribeWithRedialer(ctx, host, sub)
114 return nil
115}
116
117func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) {
118 l := s.logger.With("host", host.Hostname)
119 defer func() {
120 s.subsLk.Lock()
121 defer s.subsLk.Unlock()
122
123 delete(s.subs, host.Hostname)
124 }()
125
126 dialer := websocket.Dialer{
127 HandshakeTimeout: time.Second * 5,
128 }
129
130 // if this isn't a localhost / private connection, then we should enable SSRF protections
131 if !host.NoSSL {
132 netDialer := ssrf.PublicOnlyDialer()
133 dialer.NetDialContext = netDialer.DialContext
134 }
135
136 cursor := host.LastSeq
137
138 connectedInbound.Inc()
139 defer connectedInbound.Dec()
140
141 var backoff int
142 for {
143 select {
144 case <-ctx.Done():
145 return
146 default:
147 }
148 u := host.LegacyEventsURL(cursor)
149 l.Debug("made url with cursor", "cursor", cursor, "url", u)
150
151 // NOTE: manual backoff retry implementation to explicitly handle fails
152 hdr := make(http.Header)
153 hdr.Add("User-Agent", userAgent())
154 conn, resp, err := dialer.DialContext(ctx, u, hdr)
155 if err != nil {
156 l.Warn("dialing failed", "err", err, "backoff", backoff)
157 time.Sleep(sleepForBackoff(backoff))
158 backoff++
159 if backoff > 30 {
160 l.Warn("host does not appear to be online, disabling for now")
161 host.Status = models.HostStatusOffline
162 if err := db.UpsertHost(ctx, s.db, &host); err != nil {
163 l.Error("failed to update host status", "err", err)
164 }
165 return
166 }
167 continue
168 }
169
170 l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u)
171
172 if err := s.handleConnection(ctx, conn, sub); err != nil {
173 // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl
174 l.Warn("host connection failed", "err", err, "backoff", backoff)
175 }
176
177 updatedCursor := sub.LastSeq()
178 didProgress := updatedCursor > cursor
179 l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress)
180 if cursor == 0 || didProgress {
181 cursor = updatedCursor
182 backoff = 0
183
184 batch := []models.HostCursor{sub.HostCursor()}
185 if err := db.StoreCursors(ctx, s.db, batch); err != nil {
186 l.Error("failed to store cursors", "err", err)
187 }
188 }
189 }
190}
191
192// handleConnection handles websocket connection.
193// Schedules task from received event and return when connection is closed
194func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error {
195 // ping on every 30s
196 ctx, cancel := context.WithCancel(ctx)
197 defer cancel() // close the background ping job on connection close
198 go func() {
199 t := time.NewTicker(30 * time.Second)
200 defer t.Stop()
201 failcount := 0
202
203 for {
204 select {
205 case <-t.C:
206 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil {
207 s.logger.Warn("failed to ping", "err", err)
208 failcount++
209 if failcount >= 4 {
210 s.logger.Error("too many ping fails", "count", failcount)
211 _ = conn.Close()
212 return
213 }
214 } else {
215 failcount = 0 // ok ping
216 }
217 case <-ctx.Done():
218 _ = conn.Close()
219 return
220 }
221 }
222 }()
223
224 conn.SetPingHandler(func(message string) error {
225 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute))
226 if err == websocket.ErrCloseSent {
227 return nil
228 }
229 return err
230 })
231 conn.SetPongHandler(func(_ string) error {
232 if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil {
233 s.logger.Error("failed to set read deadline", "err", err)
234 }
235 return nil
236 })
237
238 for {
239 select {
240 case <-ctx.Done():
241 return ctx.Err()
242 default:
243 }
244 msgType, msg, err := conn.ReadMessage()
245 if err != nil {
246 return err
247 }
248
249 if msgType != websocket.TextMessage {
250 continue
251 }
252
253 sub.scheduler.AddTask(ctx, &Task{
254 key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency
255 message: msg,
256 })
257 }
258}
259
260type LegacyGitEvent struct {
261 Rkey string
262 Nsid string
263 Event tangled.GitRefUpdate
264}
265
266func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error {
267 var legacyMessage LegacyGitEvent
268 if err := json.Unmarshal(task.message, &legacyMessage); err != nil {
269 return fmt.Errorf("unmarshaling message: %w", err)
270 }
271
272 if err := s.ProcessLegacyGitRefUpdate(ctx, task.key, &legacyMessage); err != nil {
273 return fmt.Errorf("processing gitRefUpdate: %w", err)
274 }
275 return nil
276}
277
278func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, source string, evt *LegacyGitEvent) error {
279 knotstreamEventsReceived.Inc()
280
281 l := s.logger.With("src", source)
282
283 ownerDid := ""
284 if evt.Event.OwnerDid != nil {
285 ownerDid = *evt.Event.OwnerDid
286 } else {
287 // handle legacy event
288 if evt.Event.RepoDid != nil {
289 ownerDid = *evt.Event.RepoDid
290 }
291 }
292 curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(ownerDid), evt.Event.RepoName)
293 if err != nil {
294 return fmt.Errorf("failed to get repo '%s': %w", ownerDid+"/"+evt.Event.RepoName, err)
295 }
296 if curr == nil {
297 // if repo doesn't exist in DB, just ignore the event. That repo is unknown.
298 //
299 // Normally did+name is already enough to perform git-fetch as that's
300 // what needed to fetch the repository.
301 // But we want to store that in did/rkey in knot-mirror.
302 // Therefore, we should ignore when the repository is unknown.
303 // Hopefully crawler will sync it later.
304 l.Warn("skipping event from unknown repo", "did/name", ownerDid+"/"+evt.Event.RepoName)
305 knotstreamEventsSkipped.Inc()
306 return nil
307 }
308 l = l.With("repoAt", curr.AtUri())
309
310 // TODO: should plan resync to resyncBuffer on RepoStateResyncing
311 if curr.State != models.RepoStateActive {
312 l.Debug("skipping non-active repo")
313 knotstreamEventsSkipped.Inc()
314 return nil
315 }
316
317 if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() {
318 l.Debug("skipping replayed event", "event.Rkey", evt.Rkey, "currentRev", curr.GitRev)
319 knotstreamEventsSkipped.Inc()
320 return nil
321 }
322
323 // if curr.State == models.RepoStateResyncing {
324 // firehoseEventsSkipped.Inc()
325 // return fp.events.addToResyncBuffer(ctx, commit)
326 // }
327
328 // can't skip anything, update repo state
329 if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil {
330 return err
331 }
332
333 l.Info("event processed", "eventRev", evt.Rkey)
334
335 knotstreamEventsProcessed.Inc()
336 return nil
337}
338
339func userAgent() string {
340 return fmt.Sprintf("knotmirror/%s", versioninfo.Short())
341}
342
343func sleepForBackoff(b int) time.Duration {
344 if b == 0 {
345 return 0
346 }
347 if b < 10 {
348 return time.Millisecond * time.Duration((50*b)+rand.Intn(500))
349 }
350 return time.Second * 30
351}