A vibe coded tangled fork which supports pijul.
at c70617eb024bff418bf546a18dd8e26305243447 256 lines 5.8 kB view raw
1package jetstream 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "os" 8 "os/signal" 9 "sync" 10 "syscall" 11 "time" 12 13 "github.com/bluesky-social/jetstream/pkg/client" 14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 15 "github.com/bluesky-social/jetstream/pkg/models" 16 "tangled.org/core/log" 17) 18 19type DB interface { 20 GetLastTimeUs() (int64, error) 21 SaveLastTimeUs(int64) error 22} 23 24type Set[T comparable] map[T]struct{} 25 26type JetstreamClient struct { 27 cfg *client.ClientConfig 28 client *client.Client 29 ident string 30 l *slog.Logger 31 32 logDids bool 33 wantedDids Set[string] 34 db DB 35 waitForDid bool 36 mu sync.RWMutex 37 38 cancel context.CancelFunc 39 cancelMu sync.Mutex 40} 41 42func (j *JetstreamClient) AddDid(did string) { 43 if did == "" { 44 return 45 } 46 47 if j.logDids { 48 j.l.Info("adding did to in-memory filter", "did", did) 49 } 50 j.mu.Lock() 51 j.wantedDids[did] = struct{}{} 52 j.mu.Unlock() 53} 54 55func (j *JetstreamClient) RemoveDid(did string) { 56 if did == "" { 57 return 58 } 59 60 if j.logDids { 61 j.l.Info("removing did from in-memory filter", "did", did) 62 } 63 j.mu.Lock() 64 delete(j.wantedDids, did) 65 j.mu.Unlock() 66} 67 68type processor func(context.Context, *models.Event) error 69 70func (j *JetstreamClient) withDidFilter(processFunc processor) processor { 71 // since this closure references j.WantedDids; it should auto-update 72 // existing instances of the closure when j.WantedDids is mutated 73 return func(ctx context.Context, evt *models.Event) error { 74 75 j.mu.RLock() 76 // empty filter => all dids allowed 77 matches := len(j.wantedDids) == 0 78 if !matches { 79 if _, ok := j.wantedDids[evt.Did]; ok { 80 matches = true 81 } 82 } 83 j.mu.RUnlock() 84 85 if matches { 86 return processFunc(ctx, evt) 87 } else { 88 return nil 89 } 90 } 91} 92 93func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid, logDids bool) (*JetstreamClient, error) { 94 if cfg == nil { 95 cfg = client.DefaultClientConfig() 96 cfg.WebsocketURL = endpoint 97 cfg.WantedCollections = collections 98 } 99 100 return &JetstreamClient{ 101 cfg: cfg, 102 ident: ident, 103 db: db, 104 l: logger, 105 wantedDids: make(map[string]struct{}), 106 107 logDids: logDids, 108 109 // This will make the goroutine in StartJetstream wait until 110 // j.wantedDids has been populated, typically using addDids. 111 waitForDid: waitForDid, 112 }, nil 113} 114 115// StartJetstream starts the jetstream client and processes events using the provided processFunc. 116// The caller is responsible for saving the last time_us to the database (just use your db.UpdateLastTimeUs). 117func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error { 118 logger := j.l 119 120 sched := sequential.NewScheduler(j.ident, logger, j.withDidFilter(processFunc)) 121 122 client, err := client.NewClient(j.cfg, logger, sched) 123 if err != nil { 124 return fmt.Errorf("failed to create jetstream client: %w", err) 125 } 126 j.client = client 127 128 go func() { 129 if j.waitForDid { 130 for { 131 j.mu.RLock() 132 hasDid := len(j.wantedDids) != 0 133 j.mu.RUnlock() 134 if hasDid { 135 break 136 } 137 time.Sleep(time.Second) 138 } 139 } 140 logger.Info("done waiting for did") 141 142 go j.periodicLastTimeSave(ctx) 143 j.saveIfKilled(ctx) 144 145 j.connectAndRead(ctx) 146 }() 147 148 return nil 149} 150 151func (j *JetstreamClient) connectAndRead(ctx context.Context) { 152 l := log.FromContext(ctx) 153 for { 154 cursor := j.getLastTimeUs(ctx) 155 156 connCtx, cancel := context.WithCancel(ctx) 157 j.cancelMu.Lock() 158 j.cancel = cancel 159 j.cancelMu.Unlock() 160 161 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil { 162 l.Error("error reading jetstream, retry in 3s", "error", err) 163 cancel() 164 time.Sleep(3 * time.Second) 165 continue 166 } 167 168 select { 169 case <-ctx.Done(): 170 l.Info("context done, stopping jetstream") 171 return 172 case <-connCtx.Done(): 173 l.Info("connection context done, reconnecting") 174 continue 175 } 176 } 177} 178 179// save cursor periodically 180func (j *JetstreamClient) periodicLastTimeSave(ctx context.Context) { 181 ticker := time.NewTicker(time.Minute) 182 defer ticker.Stop() 183 184 for { 185 select { 186 case <-ctx.Done(): 187 return 188 case <-ticker.C: 189 j.db.SaveLastTimeUs(time.Now().UnixMicro()) 190 } 191 } 192} 193 194func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 { 195 l := log.FromContext(ctx) 196 lastTimeUs, err := j.db.GetLastTimeUs() 197 if err != nil { 198 l.Warn("couldn't get last time us, starting from now", "error", err) 199 lastTimeUs = time.Now().UnixMicro() 200 err = j.db.SaveLastTimeUs(lastTimeUs) 201 if err != nil { 202 l.Error("failed to save last time us", "error", err) 203 } 204 } 205 206 // If last time is older than 2 days, start from now 207 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 { 208 lastTimeUs = time.Now().UnixMicro() 209 l.Warn("last time us is older than 2 days; discarding that and starting from now") 210 err = j.db.SaveLastTimeUs(lastTimeUs) 211 if err != nil { 212 l.Error("failed to save last time us", "error", err) 213 } 214 } 215 216 l.Info("found last time_us", "time_us", lastTimeUs) 217 return &lastTimeUs 218} 219 220func (j *JetstreamClient) saveIfKilled(ctx context.Context) context.Context { 221 ctxWithCancel, cancel := context.WithCancel(ctx) 222 223 sigChan := make(chan os.Signal, 1) 224 225 signal.Notify(sigChan, 226 syscall.SIGINT, 227 syscall.SIGTERM, 228 syscall.SIGQUIT, 229 syscall.SIGHUP, 230 syscall.SIGKILL, 231 syscall.SIGSTOP, 232 ) 233 234 go func() { 235 sig := <-sigChan 236 j.l.Info("Received signal, initiating graceful shutdown", "signal", sig) 237 238 lastTimeUs := time.Now().UnixMicro() 239 if err := j.db.SaveLastTimeUs(lastTimeUs); err != nil { 240 j.l.Error("Failed to save last time during shutdown", "error", err) 241 } 242 j.l.Info("Saved lastTimeUs before shutdown", "lastTimeUs", lastTimeUs) 243 244 j.cancelMu.Lock() 245 if j.cancel != nil { 246 j.cancel() 247 } 248 j.cancelMu.Unlock() 249 250 cancel() 251 252 os.Exit(0) 253 }() 254 255 return ctxWithCancel 256}