A vibe coded tangled fork which supports pijul.
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md>
2
3package tapc
4
5import (
6 "bytes"
7 "context"
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "net/url"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/gorilla/websocket"
16 "tangled.org/core/log"
17)
18
19// type WebsocketOptions struct {
20// maxReconnectSeconds int
21// heartbeatIntervalMs int
22// // onReconnectError
23// }
24
25type Handler interface {
26 OnEvent(ctx context.Context, evt Event) error
27 OnError(ctx context.Context, err error)
28}
29
30type Client struct {
31 Url string
32 AdminPassword string
33 HTTPClient *http.Client
34
35 connectedSig chan struct{}
36}
37
38func NewClient(url, adminPassword string) Client {
39 return Client{
40 Url: url,
41 AdminPassword: adminPassword,
42 HTTPClient: &http.Client{},
43
44 connectedSig: make(chan struct{}),
45 }
46}
47
48func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error {
49 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
50 if err != nil {
51 return err
52 }
53 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body))
54 if err != nil {
55 return err
56 }
57 req.SetBasicAuth("admin", c.AdminPassword)
58 req.Header.Set("Content-Type", "application/json")
59
60 resp, err := c.HTTPClient.Do(req)
61 if err != nil {
62 return err
63 }
64 defer resp.Body.Close()
65 if resp.StatusCode != http.StatusOK {
66 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode)
67 }
68 return nil
69}
70
71func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error {
72 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
73 if err != nil {
74 return err
75 }
76 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body))
77 if err != nil {
78 return err
79 }
80 req.SetBasicAuth("admin", c.AdminPassword)
81 req.Header.Set("Content-Type", "application/json")
82
83 resp, err := c.HTTPClient.Do(req)
84 if err != nil {
85 return err
86 }
87 defer resp.Body.Close()
88 if resp.StatusCode != http.StatusOK {
89 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode)
90 }
91 return nil
92}
93
94func (c *Client) Wait(ctx context.Context) error {
95 select {
96 case <-ctx.Done():
97 return ctx.Err()
98 case <-c.connectedSig:
99 return nil
100 }
101}
102
103func (c *Client) Connect(ctx context.Context, handler Handler) error {
104 l := log.FromContext(ctx)
105
106 u, err := url.Parse(c.Url)
107 if err != nil {
108 return err
109 }
110 if u.Scheme == "https" {
111 u.Scheme = "wss"
112 } else {
113 u.Scheme = "ws"
114 }
115 u.Path = "/channel"
116
117 // TODO: set auth on dial
118
119 url := u.String()
120
121 var backoff int
122 for {
123 select {
124 case <-ctx.Done():
125 return ctx.Err()
126 default:
127 }
128
129 header := http.Header{
130 "Authorization": []string{""},
131 }
132 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
133 if err != nil {
134 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
135 time.Sleep(time.Duration(5+backoff) * time.Second)
136 backoff++
137
138 continue
139 } else {
140 backoff = 0
141 }
142
143 c.connectedSig <- struct{}{}
144
145 l.Info("tap event subscription response", "code", res.StatusCode)
146
147 if err = c.handleConnection(ctx, conn, handler); err != nil {
148 l.Warn("tap connection failed", "err", err, "backoff", backoff)
149 }
150 }
151}
152
153func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error {
154 l := log.FromContext(ctx)
155
156 defer func() {
157 conn.Close()
158 l.Warn("closed tap conection")
159 }()
160 l.Info("established tap conection")
161
162 for {
163 select {
164 case <-ctx.Done():
165 return ctx.Err()
166 default:
167 }
168 _, message, err := conn.ReadMessage()
169 if err != nil {
170 return err
171 }
172
173 var ev Event
174 if err := json.Unmarshal(message, &ev); err != nil {
175 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err))
176 continue
177 }
178 if err := handler.OnEvent(ctx, ev); err != nil {
179 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err))
180 continue
181 }
182
183 ack := map[string]any{
184 "type": "ack",
185 "id": ev.ID,
186 }
187 if err := conn.WriteJSON(ack); err != nil {
188 l.Warn("failed to send ack", "err", err)
189 continue
190 }
191 }
192}