A vibe coded tangled fork which supports pijul.
at dda7e5c4760ecd9809464b15499aa4c2bfc41d72 334 lines 8.9 kB view raw
1package knotstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "net/http" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/util/ssrf" 16 "github.com/carlmjohnson/versioninfo" 17 "github.com/gorilla/websocket" 18 "tangled.org/core/api/tangled" 19 "tangled.org/core/knotmirror/config" 20 "tangled.org/core/knotmirror/db" 21 "tangled.org/core/knotmirror/models" 22 "tangled.org/core/log" 23) 24 25type KnotSlurper struct { 26 logger *slog.Logger 27 db *sql.DB 28 cfg config.SlurperConfig 29 30 subsLk sync.Mutex 31 subs map[string]*subscription 32} 33 34func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg config.SlurperConfig) *KnotSlurper { 35 return &KnotSlurper{ 36 logger: log.SubLogger(l, "slurper"), 37 db: db, 38 cfg: cfg, 39 subs: make(map[string]*subscription), 40 } 41} 42 43func (s *KnotSlurper) Run(ctx context.Context) { 44 for { 45 select { 46 case <-ctx.Done(): 47 return 48 case <-time.After(s.cfg.PersistCursorPeriod): 49 if err := s.persistCursors(ctx); err != nil { 50 s.logger.Error("failed to flush cursors", "err", err) 51 } 52 } 53 } 54} 55 56func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool { 57 s.subsLk.Lock() 58 defer s.subsLk.Unlock() 59 60 _, ok := s.subs[hostname] 61 return ok 62} 63 64func (s *KnotSlurper) Shutdown(ctx context.Context) error { 65 s.logger.Info("starting shutdown host cursor flush") 66 err := s.persistCursors(ctx) 67 if err != nil { 68 s.logger.Error("shutdown error", "err", err) 69 } 70 s.logger.Info("slurper shutdown complete") 71 return err 72} 73 74func (s *KnotSlurper) persistCursors(ctx context.Context) error { 75 // // gather cursor list from subscriptions and store them to DB 76 // start := time.Now() 77 78 s.subsLk.Lock() 79 cursors := make([]models.HostCursor, len(s.subs)) 80 i := 0 81 for _, sub := range s.subs { 82 cursors[i] = sub.HostCursor() 83 i++ 84 } 85 s.subsLk.Unlock() 86 87 err := db.StoreCursors(ctx, s.db, cursors) 88 // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err) 89 return err 90} 91 92func (s *KnotSlurper) Subscribe(ctx context.Context, host models.Host) error { 93 s.subsLk.Lock() 94 defer s.subsLk.Unlock() 95 96 _, ok := s.subs[host.Hostname] 97 if ok { 98 return fmt.Errorf("already subscribed: %s", host.Hostname) 99 } 100 101 // TODO: include `cancel` function to kill subscription by hostname 102 sub := &subscription{ 103 hostname: host.Hostname, 104 scheduler: NewParallelScheduler( 105 s.cfg.ConcurrencyPerHost, 106 host.Hostname, 107 s.ProcessEvent, 108 ), 109 } 110 s.subs[host.Hostname] = sub 111 112 sub.scheduler.Start(ctx) 113 go s.subscribeWithRedialer(ctx, host, sub) 114 return nil 115} 116 117func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) { 118 l := s.logger.With("host", host.Hostname) 119 120 dialer := websocket.Dialer{ 121 HandshakeTimeout: time.Second * 5, 122 } 123 124 // if this isn't a localhost / private connection, then we should enable SSRF protections 125 if !host.NoSSL { 126 netDialer := ssrf.PublicOnlyDialer() 127 dialer.NetDialContext = netDialer.DialContext 128 } 129 130 cursor := host.LastSeq 131 132 connectedInbound.Inc() 133 defer connectedInbound.Dec() 134 135 var backoff int 136 for { 137 select { 138 case <-ctx.Done(): 139 return 140 default: 141 } 142 u := host.LegacyEventsURL(cursor) 143 l.Debug("made url with cursor", "cursor", cursor, "url", u) 144 145 // NOTE: manual backoff retry implementation to explicitly handle fails 146 hdr := make(http.Header) 147 hdr.Add("User-Agent", userAgent()) 148 conn, resp, err := dialer.DialContext(ctx, u, hdr) 149 if err != nil { 150 l.Warn("dialing failed", "err", err, "backoff", backoff) 151 time.Sleep(sleepForBackoff(backoff)) 152 backoff++ 153 if backoff > 15 { 154 l.Warn("host does not appear to be online, disabling for now") 155 host.Status = models.HostStatusOffline 156 if err := db.UpsertHost(ctx, s.db, &host); err != nil { 157 l.Error("failed to update host status", "err", err) 158 } 159 return 160 } 161 continue 162 } 163 164 l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u) 165 166 if err := s.handleConnection(ctx, conn, sub); err != nil { 167 // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl 168 l.Warn("host connection failed", "err", err, "backoff", backoff) 169 } 170 171 updatedCursor := sub.LastSeq() 172 didProgress := updatedCursor > cursor 173 l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress) 174 if cursor == 0 || didProgress { 175 cursor = updatedCursor 176 backoff = 0 177 178 batch := []models.HostCursor{sub.HostCursor()} 179 if err := db.StoreCursors(ctx, s.db, batch); err != nil { 180 l.Error("failed to store cursors", "err", err) 181 } 182 } 183 } 184} 185 186// handleConnection handles websocket connection. 187// Schedules task from received event and return when connection is closed 188func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error { 189 // ping on every 30s 190 ctx, cancel := context.WithCancel(ctx) 191 defer cancel() // close the background ping job on connection close 192 go func() { 193 t := time.NewTicker(30 * time.Second) 194 defer t.Stop() 195 failcount := 0 196 197 for { 198 select { 199 case <-t.C: 200 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil { 201 s.logger.Warn("failed to ping", "err", err) 202 failcount++ 203 if failcount >= 4 { 204 s.logger.Error("too many ping fails", "count", failcount) 205 _ = conn.Close() 206 return 207 } 208 } else { 209 failcount = 0 // ok ping 210 } 211 case <-ctx.Done(): 212 _ = conn.Close() 213 return 214 } 215 } 216 }() 217 218 conn.SetPingHandler(func(message string) error { 219 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute)) 220 if err == websocket.ErrCloseSent { 221 return nil 222 } 223 return err 224 }) 225 conn.SetPongHandler(func(_ string) error { 226 if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil { 227 s.logger.Error("failed to set read deadline", "err", err) 228 } 229 return nil 230 }) 231 232 for { 233 select { 234 case <-ctx.Done(): 235 return ctx.Err() 236 default: 237 } 238 msgType, msg, err := conn.ReadMessage() 239 if err != nil { 240 return err 241 } 242 243 if msgType != websocket.TextMessage { 244 continue 245 } 246 247 sub.scheduler.AddTask(ctx, &Task{ 248 key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency 249 message: msg, 250 }) 251 } 252} 253 254type LegacyGitEvent struct { 255 Rkey string 256 Nsid string 257 Event tangled.GitRefUpdate 258} 259 260func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error { 261 var legacyMessage LegacyGitEvent 262 if err := json.Unmarshal(task.message, &legacyMessage); err != nil { 263 return fmt.Errorf("unmarshaling message: %w", err) 264 } 265 266 if err := s.ProcessLegacyGitRefUpdate(ctx, &legacyMessage); err != nil { 267 return fmt.Errorf("processing gitRefUpdate: %w", err) 268 } 269 return nil 270} 271 272func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, evt *LegacyGitEvent) error { 273 knotstreamEventsReceived.Inc() 274 275 curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(evt.Event.RepoDid), evt.Event.RepoName) 276 if err != nil { 277 return fmt.Errorf("failed to get repo '%s': %w", evt.Event.RepoDid+"/"+evt.Event.RepoName, err) 278 } 279 if curr == nil { 280 // if repo doesn't exist in DB, just ignore the event. That repo is unknown. 281 // 282 // Normally did+name is already enough to perform git-fetch as that's 283 // what needed to fetch the repository. 284 // But we want to store that in did/rkey in knot-mirror. 285 // Therefore, we should ignore when the repository is unknown. 286 // Hopefully crawler will sync it later. 287 s.logger.Warn("skipping event from unknown repo", "did/repo", evt.Event.RepoDid+"/"+evt.Event.RepoName) 288 knotstreamEventsSkipped.Inc() 289 return nil 290 } 291 l := s.logger.With("repoAt", curr.AtUri()) 292 293 // TODO: should plan resync to resyncBuffer on RepoStateResyncing 294 if curr.State != models.RepoStateActive { 295 l.Debug("skipping non-active repo") 296 knotstreamEventsSkipped.Inc() 297 return nil 298 } 299 300 if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() { 301 l.Debug("skipping replayed event", "event.Rkey", evt.Rkey, "currentRev", curr.GitRev) 302 knotstreamEventsSkipped.Inc() 303 return nil 304 } 305 306 // if curr.State == models.RepoStateResyncing { 307 // firehoseEventsSkipped.Inc() 308 // return fp.events.addToResyncBuffer(ctx, commit) 309 // } 310 311 // can't skip anything, update repo state 312 if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil { 313 return err 314 } 315 316 l.Info("event processed", "eventRev", evt.Rkey) 317 318 knotstreamEventsProcessed.Inc() 319 return nil 320} 321 322func userAgent() string { 323 return fmt.Sprintf("knotmirror/%s", versioninfo.Short()) 324} 325 326func sleepForBackoff(b int) time.Duration { 327 if b == 0 { 328 return 0 329 } 330 if b < 10 { 331 return (time.Duration(b) * 2) + (time.Millisecond * time.Duration(rand.Intn(1000))) 332 } 333 return time.Second * 30 334}