A vibe coded tangled fork which supports pijul.
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md>
2
3package tapc
4
5import (
6 "bytes"
7 "context"
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "net/url"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/gorilla/websocket"
16 "tangled.org/core/log"
17)
18
19// type WebsocketOptions struct {
20// maxReconnectSeconds int
21// heartbeatIntervalMs int
22// // onReconnectError
23// }
24
25type Handler interface {
26 OnEvent(ctx context.Context, evt Event) error
27 OnError(ctx context.Context, err error)
28}
29
30type Client struct {
31 Url string
32 AdminPassword string
33 HTTPClient *http.Client
34
35 ready chan struct{}
36}
37
38func NewClient(url, adminPassword string) Client {
39 return Client{
40 Url: url,
41 AdminPassword: adminPassword,
42 HTTPClient: &http.Client{},
43
44 ready: make(chan struct{}),
45 }
46}
47
48func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error {
49 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
50 if err != nil {
51 return err
52 }
53 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body))
54 if err != nil {
55 return err
56 }
57 req.SetBasicAuth("admin", c.AdminPassword)
58 req.Header.Set("Content-Type", "application/json")
59
60 resp, err := c.HTTPClient.Do(req)
61 if err != nil {
62 return err
63 }
64 defer resp.Body.Close()
65 if resp.StatusCode != http.StatusOK {
66 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode)
67 }
68 return nil
69}
70
71func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error {
72 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
73 if err != nil {
74 return err
75 }
76 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body))
77 if err != nil {
78 return err
79 }
80 req.SetBasicAuth("admin", c.AdminPassword)
81 req.Header.Set("Content-Type", "application/json")
82
83 resp, err := c.HTTPClient.Do(req)
84 if err != nil {
85 return err
86 }
87 defer resp.Body.Close()
88 if resp.StatusCode != http.StatusOK {
89 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode)
90 }
91 return nil
92}
93
94func (c *Client) markReady() {
95 select {
96 case <-c.ready:
97 // already closed
98 default:
99 close(c.ready)
100 }
101}
102
103func (c *Client) WaitReady(ctx context.Context) error {
104 select {
105 case <-c.ready:
106 return nil
107 case <-ctx.Done():
108 return ctx.Err()
109 }
110}
111
112func (c *Client) Connect(ctx context.Context, handler Handler) error {
113 l := log.FromContext(ctx)
114
115 u, err := url.Parse(c.Url)
116 if err != nil {
117 return err
118 }
119 if u.Scheme == "https" {
120 u.Scheme = "wss"
121 } else {
122 u.Scheme = "ws"
123 }
124 u.Path = "/channel"
125
126 // TODO: set auth on dial
127
128 url := u.String()
129
130 var backoff int
131 for {
132 select {
133 case <-ctx.Done():
134 return ctx.Err()
135 default:
136 }
137
138 header := http.Header{
139 "Authorization": []string{""},
140 }
141 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
142 if err != nil {
143 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
144 time.Sleep(time.Duration(5+backoff) * time.Second)
145 backoff++
146
147 continue
148 }
149 l.Info("connected to tap service")
150
151 backoff = 0
152 c.markReady()
153
154 l.Info("tap event subscription response", "code", res.StatusCode)
155
156 if err = c.handleConnection(ctx, conn, handler); err != nil {
157 l.Warn("tap connection failed", "err", err, "backoff", backoff)
158 }
159 }
160}
161
162func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error {
163 l := log.FromContext(ctx)
164
165 defer func() {
166 conn.Close()
167 l.Warn("closed tap conection")
168 }()
169 l.Info("established tap conection")
170
171 for {
172 select {
173 case <-ctx.Done():
174 return ctx.Err()
175 default:
176 }
177 _, message, err := conn.ReadMessage()
178 if err != nil {
179 return err
180 }
181
182 var ev Event
183 if err := json.Unmarshal(message, &ev); err != nil {
184 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err))
185 continue
186 }
187 if err := handler.OnEvent(ctx, ev); err != nil {
188 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err))
189 continue
190 }
191
192 ack := map[string]any{
193 "type": "ack",
194 "id": ev.ID,
195 }
196 if err := conn.WriteJSON(ack); err != nil {
197 l.Warn("failed to send ack", "err", err)
198 continue
199 }
200 }
201}