A vibe coded tangled fork which supports pijul.
at master 351 lines 9.2 kB view raw
1package knotstream 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "net/http" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/bluesky-social/indigo/util/ssrf" 16 "github.com/carlmjohnson/versioninfo" 17 "github.com/gorilla/websocket" 18 "tangled.org/core/api/tangled" 19 "tangled.org/core/knotmirror/config" 20 "tangled.org/core/knotmirror/db" 21 "tangled.org/core/knotmirror/models" 22 "tangled.org/core/log" 23) 24 25type KnotSlurper struct { 26 logger *slog.Logger 27 db *sql.DB 28 cfg config.SlurperConfig 29 30 subsLk sync.Mutex 31 subs map[string]*subscription 32} 33 34func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg config.SlurperConfig) *KnotSlurper { 35 return &KnotSlurper{ 36 logger: log.SubLogger(l, "slurper"), 37 db: db, 38 cfg: cfg, 39 subs: make(map[string]*subscription), 40 } 41} 42 43func (s *KnotSlurper) Run(ctx context.Context) { 44 for { 45 select { 46 case <-ctx.Done(): 47 return 48 case <-time.After(s.cfg.PersistCursorPeriod): 49 if err := s.persistCursors(ctx); err != nil { 50 s.logger.Error("failed to flush cursors", "err", err) 51 } 52 } 53 } 54} 55 56func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool { 57 s.subsLk.Lock() 58 defer s.subsLk.Unlock() 59 60 _, ok := s.subs[hostname] 61 return ok 62} 63 64func (s *KnotSlurper) Shutdown(ctx context.Context) error { 65 s.logger.Info("starting shutdown host cursor flush") 66 err := s.persistCursors(ctx) 67 if err != nil { 68 s.logger.Error("shutdown error", "err", err) 69 } 70 s.logger.Info("slurper shutdown complete") 71 return err 72} 73 74func (s *KnotSlurper) persistCursors(ctx context.Context) error { 75 // // gather cursor list from subscriptions and store them to DB 76 // start := time.Now() 77 78 s.subsLk.Lock() 79 cursors := make([]models.HostCursor, len(s.subs)) 80 i := 0 81 for _, sub := range s.subs { 82 cursors[i] = sub.HostCursor() 83 i++ 84 } 85 s.subsLk.Unlock() 86 87 err := db.StoreCursors(ctx, s.db, cursors) 88 // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err) 89 return err 90} 91 92func (s *KnotSlurper) Subscribe(ctx context.Context, host models.Host) error { 93 s.subsLk.Lock() 94 defer s.subsLk.Unlock() 95 96 _, ok := s.subs[host.Hostname] 97 if ok { 98 return fmt.Errorf("already subscribed: %s", host.Hostname) 99 } 100 101 // TODO: include `cancel` function to kill subscription by hostname 102 sub := &subscription{ 103 hostname: host.Hostname, 104 scheduler: NewParallelScheduler( 105 s.cfg.ConcurrencyPerHost, 106 host.Hostname, 107 s.ProcessEvent, 108 ), 109 } 110 s.subs[host.Hostname] = sub 111 112 sub.scheduler.Start(ctx) 113 go s.subscribeWithRedialer(ctx, host, sub) 114 return nil 115} 116 117func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) { 118 l := s.logger.With("host", host.Hostname) 119 defer func() { 120 s.subsLk.Lock() 121 defer s.subsLk.Unlock() 122 123 delete(s.subs, host.Hostname) 124 }() 125 126 dialer := websocket.Dialer{ 127 HandshakeTimeout: time.Second * 5, 128 } 129 130 // if this isn't a localhost / private connection, then we should enable SSRF protections 131 if !host.NoSSL { 132 netDialer := ssrf.PublicOnlyDialer() 133 dialer.NetDialContext = netDialer.DialContext 134 } 135 136 cursor := host.LastSeq 137 138 connectedInbound.Inc() 139 defer connectedInbound.Dec() 140 141 var backoff int 142 for { 143 select { 144 case <-ctx.Done(): 145 return 146 default: 147 } 148 u := host.LegacyEventsURL(cursor) 149 l.Debug("made url with cursor", "cursor", cursor, "url", u) 150 151 // NOTE: manual backoff retry implementation to explicitly handle fails 152 hdr := make(http.Header) 153 hdr.Add("User-Agent", userAgent()) 154 conn, resp, err := dialer.DialContext(ctx, u, hdr) 155 if err != nil { 156 l.Warn("dialing failed", "err", err, "backoff", backoff) 157 time.Sleep(sleepForBackoff(backoff)) 158 backoff++ 159 if backoff > 30 { 160 l.Warn("host does not appear to be online, disabling for now") 161 host.Status = models.HostStatusOffline 162 if err := db.UpsertHost(ctx, s.db, &host); err != nil { 163 l.Error("failed to update host status", "err", err) 164 } 165 return 166 } 167 continue 168 } 169 170 l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u) 171 172 if err := s.handleConnection(ctx, conn, sub); err != nil { 173 // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl 174 l.Warn("host connection failed", "err", err, "backoff", backoff) 175 } 176 177 updatedCursor := sub.LastSeq() 178 didProgress := updatedCursor > cursor 179 l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress) 180 if cursor == 0 || didProgress { 181 cursor = updatedCursor 182 backoff = 0 183 184 batch := []models.HostCursor{sub.HostCursor()} 185 if err := db.StoreCursors(ctx, s.db, batch); err != nil { 186 l.Error("failed to store cursors", "err", err) 187 } 188 } 189 } 190} 191 192// handleConnection handles websocket connection. 193// Schedules task from received event and return when connection is closed 194func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error { 195 // ping on every 30s 196 ctx, cancel := context.WithCancel(ctx) 197 defer cancel() // close the background ping job on connection close 198 go func() { 199 t := time.NewTicker(30 * time.Second) 200 defer t.Stop() 201 failcount := 0 202 203 for { 204 select { 205 case <-t.C: 206 if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil { 207 s.logger.Warn("failed to ping", "err", err) 208 failcount++ 209 if failcount >= 4 { 210 s.logger.Error("too many ping fails", "count", failcount) 211 _ = conn.Close() 212 return 213 } 214 } else { 215 failcount = 0 // ok ping 216 } 217 case <-ctx.Done(): 218 _ = conn.Close() 219 return 220 } 221 } 222 }() 223 224 conn.SetPingHandler(func(message string) error { 225 err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute)) 226 if err == websocket.ErrCloseSent { 227 return nil 228 } 229 return err 230 }) 231 conn.SetPongHandler(func(_ string) error { 232 if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil { 233 s.logger.Error("failed to set read deadline", "err", err) 234 } 235 return nil 236 }) 237 238 for { 239 select { 240 case <-ctx.Done(): 241 return ctx.Err() 242 default: 243 } 244 msgType, msg, err := conn.ReadMessage() 245 if err != nil { 246 return err 247 } 248 249 if msgType != websocket.TextMessage { 250 continue 251 } 252 253 sub.scheduler.AddTask(ctx, &Task{ 254 key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency 255 message: msg, 256 }) 257 } 258} 259 260type LegacyGitEvent struct { 261 Rkey string 262 Nsid string 263 Event tangled.GitRefUpdate 264} 265 266func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error { 267 var legacyMessage LegacyGitEvent 268 if err := json.Unmarshal(task.message, &legacyMessage); err != nil { 269 return fmt.Errorf("unmarshaling message: %w", err) 270 } 271 272 if err := s.ProcessLegacyGitRefUpdate(ctx, task.key, &legacyMessage); err != nil { 273 return fmt.Errorf("processing gitRefUpdate: %w", err) 274 } 275 return nil 276} 277 278func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, source string, evt *LegacyGitEvent) error { 279 knotstreamEventsReceived.Inc() 280 281 l := s.logger.With("src", source) 282 283 ownerDid := "" 284 if evt.Event.OwnerDid != nil { 285 ownerDid = *evt.Event.OwnerDid 286 } else { 287 // handle legacy event 288 if evt.Event.RepoDid != nil { 289 ownerDid = *evt.Event.RepoDid 290 } 291 } 292 curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(ownerDid), evt.Event.RepoName) 293 if err != nil { 294 return fmt.Errorf("failed to get repo '%s': %w", ownerDid+"/"+evt.Event.RepoName, err) 295 } 296 if curr == nil { 297 // if repo doesn't exist in DB, just ignore the event. That repo is unknown. 298 // 299 // Normally did+name is already enough to perform git-fetch as that's 300 // what needed to fetch the repository. 301 // But we want to store that in did/rkey in knot-mirror. 302 // Therefore, we should ignore when the repository is unknown. 303 // Hopefully crawler will sync it later. 304 l.Warn("skipping event from unknown repo", "did/name", ownerDid+"/"+evt.Event.RepoName) 305 knotstreamEventsSkipped.Inc() 306 return nil 307 } 308 l = l.With("repoAt", curr.AtUri()) 309 310 // TODO: should plan resync to resyncBuffer on RepoStateResyncing 311 if curr.State != models.RepoStateActive { 312 l.Debug("skipping non-active repo") 313 knotstreamEventsSkipped.Inc() 314 return nil 315 } 316 317 if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() { 318 l.Debug("skipping replayed event", "event.Rkey", evt.Rkey, "currentRev", curr.GitRev) 319 knotstreamEventsSkipped.Inc() 320 return nil 321 } 322 323 // if curr.State == models.RepoStateResyncing { 324 // firehoseEventsSkipped.Inc() 325 // return fp.events.addToResyncBuffer(ctx, commit) 326 // } 327 328 // can't skip anything, update repo state 329 if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil { 330 return err 331 } 332 333 l.Info("event processed", "eventRev", evt.Rkey) 334 335 knotstreamEventsProcessed.Inc() 336 return nil 337} 338 339func userAgent() string { 340 return fmt.Sprintf("knotmirror/%s", versioninfo.Short()) 341} 342 343func sleepForBackoff(b int) time.Duration { 344 if b == 0 { 345 return 0 346 } 347 if b < 10 { 348 return time.Millisecond * time.Duration((50*b)+rand.Intn(500)) 349 } 350 return time.Second * 30 351}