A vibe coded tangled fork which supports pijul.
1package jetstream
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "os"
8 "os/signal"
9 "sync"
10 "syscall"
11 "time"
12
13 "github.com/bluesky-social/jetstream/pkg/client"
14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
15 "github.com/bluesky-social/jetstream/pkg/models"
16 "tangled.org/core/log"
17)
18
19type DB interface {
20 GetLastTimeUs() (int64, error)
21 SaveLastTimeUs(int64) error
22}
23
24type Set[T comparable] map[T]struct{}
25
26type JetstreamClient struct {
27 cfg *client.ClientConfig
28 client *client.Client
29 ident string
30 l *slog.Logger
31
32 logDids bool
33 wantedDids Set[string]
34 db DB
35 waitForDid bool
36 mu sync.RWMutex
37
38 cancel context.CancelFunc
39 cancelMu sync.Mutex
40}
41
42func (j *JetstreamClient) AddDid(did string) {
43 if did == "" {
44 return
45 }
46
47 if j.logDids {
48 j.l.Info("adding did to in-memory filter", "did", did)
49 }
50 j.mu.Lock()
51 j.wantedDids[did] = struct{}{}
52 j.mu.Unlock()
53}
54
55func (j *JetstreamClient) RemoveDid(did string) {
56 if did == "" {
57 return
58 }
59
60 if j.logDids {
61 j.l.Info("removing did from in-memory filter", "did", did)
62 }
63 j.mu.Lock()
64 delete(j.wantedDids, did)
65 j.mu.Unlock()
66}
67
68type processor func(context.Context, *models.Event) error
69
70func (j *JetstreamClient) withDidFilter(processFunc processor) processor {
71 // since this closure references j.WantedDids; it should auto-update
72 // existing instances of the closure when j.WantedDids is mutated
73 return func(ctx context.Context, evt *models.Event) error {
74
75 j.mu.RLock()
76 // empty filter => all dids allowed
77 matches := len(j.wantedDids) == 0
78 if !matches {
79 if _, ok := j.wantedDids[evt.Did]; ok {
80 matches = true
81 }
82 }
83 j.mu.RUnlock()
84
85 if matches {
86 return processFunc(ctx, evt)
87 } else {
88 return nil
89 }
90 }
91}
92
93func NewJetstreamClient(endpoint, ident string, collections []string, cfg *client.ClientConfig, logger *slog.Logger, db DB, waitForDid, logDids bool) (*JetstreamClient, error) {
94 if cfg == nil {
95 cfg = client.DefaultClientConfig()
96 cfg.WebsocketURL = endpoint
97 cfg.WantedCollections = collections
98 }
99
100 return &JetstreamClient{
101 cfg: cfg,
102 ident: ident,
103 db: db,
104 l: logger,
105 wantedDids: make(map[string]struct{}),
106
107 logDids: logDids,
108
109 // This will make the goroutine in StartJetstream wait until
110 // j.wantedDids has been populated, typically using addDids.
111 waitForDid: waitForDid,
112 }, nil
113}
114
115// StartJetstream starts the jetstream client and processes events using the provided processFunc.
116// The caller is responsible for saving the last time_us to the database (just use your db.UpdateLastTimeUs).
117func (j *JetstreamClient) StartJetstream(ctx context.Context, processFunc func(context.Context, *models.Event) error) error {
118 logger := j.l
119
120 sched := sequential.NewScheduler(j.ident, logger, j.withDidFilter(processFunc))
121
122 client, err := client.NewClient(j.cfg, logger, sched)
123 if err != nil {
124 return fmt.Errorf("failed to create jetstream client: %w", err)
125 }
126 j.client = client
127
128 go func() {
129 if j.waitForDid {
130 for {
131 j.mu.RLock()
132 hasDid := len(j.wantedDids) != 0
133 j.mu.RUnlock()
134 if hasDid {
135 break
136 }
137 time.Sleep(time.Second)
138 }
139 }
140 logger.Info("done waiting for did")
141
142 go j.periodicLastTimeSave(ctx)
143 j.saveIfKilled(ctx)
144
145 j.connectAndRead(ctx)
146 }()
147
148 return nil
149}
150
151func (j *JetstreamClient) connectAndRead(ctx context.Context) {
152 l := log.FromContext(ctx)
153 for {
154 cursor := j.getLastTimeUs(ctx)
155
156 connCtx, cancel := context.WithCancel(ctx)
157 j.cancelMu.Lock()
158 j.cancel = cancel
159 j.cancelMu.Unlock()
160
161 if err := j.client.ConnectAndRead(connCtx, cursor); err != nil {
162 l.Error("error reading jetstream, retry in 3s", "error", err)
163 cancel()
164 time.Sleep(3 * time.Second)
165 continue
166 }
167
168 select {
169 case <-ctx.Done():
170 l.Info("context done, stopping jetstream")
171 return
172 case <-connCtx.Done():
173 l.Info("connection context done, reconnecting")
174 continue
175 }
176 }
177}
178
179// save cursor periodically
180func (j *JetstreamClient) periodicLastTimeSave(ctx context.Context) {
181 ticker := time.NewTicker(time.Minute)
182 defer ticker.Stop()
183
184 for {
185 select {
186 case <-ctx.Done():
187 return
188 case <-ticker.C:
189 j.db.SaveLastTimeUs(time.Now().UnixMicro())
190 }
191 }
192}
193
194func (j *JetstreamClient) getLastTimeUs(ctx context.Context) *int64 {
195 l := log.FromContext(ctx)
196 lastTimeUs, err := j.db.GetLastTimeUs()
197 if err != nil {
198 l.Warn("couldn't get last time us, starting from now", "error", err)
199 lastTimeUs = time.Now().UnixMicro()
200 err = j.db.SaveLastTimeUs(lastTimeUs)
201 if err != nil {
202 l.Error("failed to save last time us", "error", err)
203 }
204 }
205
206 // If last time is older than 2 days, start from now
207 if time.Now().UnixMicro()-lastTimeUs > 2*24*60*60*1000*1000 {
208 lastTimeUs = time.Now().UnixMicro()
209 l.Warn("last time us is older than 2 days; discarding that and starting from now")
210 err = j.db.SaveLastTimeUs(lastTimeUs)
211 if err != nil {
212 l.Error("failed to save last time us", "error", err)
213 }
214 }
215
216 l.Info("found last time_us", "time_us", lastTimeUs)
217 return &lastTimeUs
218}
219
220func (j *JetstreamClient) saveIfKilled(ctx context.Context) context.Context {
221 ctxWithCancel, cancel := context.WithCancel(ctx)
222
223 sigChan := make(chan os.Signal, 1)
224
225 signal.Notify(sigChan,
226 syscall.SIGINT,
227 syscall.SIGTERM,
228 syscall.SIGQUIT,
229 syscall.SIGHUP,
230 syscall.SIGKILL,
231 syscall.SIGSTOP,
232 )
233
234 go func() {
235 sig := <-sigChan
236 j.l.Info("Received signal, initiating graceful shutdown", "signal", sig)
237
238 lastTimeUs := time.Now().UnixMicro()
239 if err := j.db.SaveLastTimeUs(lastTimeUs); err != nil {
240 j.l.Error("Failed to save last time during shutdown", "error", err)
241 }
242 j.l.Info("Saved lastTimeUs before shutdown", "lastTimeUs", lastTimeUs)
243
244 j.cancelMu.Lock()
245 if j.cancel != nil {
246 j.cancel()
247 }
248 j.cancelMu.Unlock()
249
250 cancel()
251
252 os.Exit(0)
253 }()
254
255 return ctxWithCancel
256}