A vibe coded tangled fork which supports pijul.
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md>
2
3package tapc
4
5import (
6 "bytes"
7 "context"
8 "encoding/json"
9 "fmt"
10 "net/http"
11 "net/url"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/syntax"
15 "github.com/gorilla/websocket"
16 "tangled.org/core/log"
17)
18
19// type WebsocketOptions struct {
20// maxReconnectSeconds int
21// heartbeatIntervalMs int
22// // onReconnectError
23// }
24
25type Handler interface {
26 OnEvent(ctx context.Context, evt Event) error
27 OnError(ctx context.Context, err error)
28}
29
30type Client struct {
31 Url string
32 AdminPassword string
33 HTTPClient *http.Client
34}
35
36func NewClient(url, adminPassword string) Client {
37 return Client{
38 Url: url,
39 AdminPassword: adminPassword,
40 HTTPClient: &http.Client{},
41 }
42}
43
44func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error {
45 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
46 if err != nil {
47 return err
48 }
49 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body))
50 if err != nil {
51 return err
52 }
53 req.SetBasicAuth("admin", c.AdminPassword)
54 req.Header.Set("Content-Type", "application/json")
55
56 resp, err := c.HTTPClient.Do(req)
57 if err != nil {
58 return err
59 }
60 defer resp.Body.Close()
61 if resp.StatusCode != http.StatusOK {
62 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode)
63 }
64 return nil
65}
66
67func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error {
68 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids})
69 if err != nil {
70 return err
71 }
72 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body))
73 if err != nil {
74 return err
75 }
76 req.SetBasicAuth("admin", c.AdminPassword)
77 req.Header.Set("Content-Type", "application/json")
78
79 resp, err := c.HTTPClient.Do(req)
80 if err != nil {
81 return err
82 }
83 defer resp.Body.Close()
84 if resp.StatusCode != http.StatusOK {
85 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode)
86 }
87 return nil
88}
89
90func (c *Client) Connect(ctx context.Context, handler Handler) error {
91 l := log.FromContext(ctx)
92
93 u, err := url.Parse(c.Url)
94 if err != nil {
95 return err
96 }
97 if u.Scheme == "https" {
98 u.Scheme = "wss"
99 } else {
100 u.Scheme = "ws"
101 }
102 u.Path = "/channel"
103
104 // TODO: set auth on dial
105
106 url := u.String()
107
108 var backoff int
109 for {
110 select {
111 case <-ctx.Done():
112 return ctx.Err()
113 default:
114 }
115
116 header := http.Header{
117 "Authorization": []string{""},
118 }
119 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header)
120 if err != nil {
121 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff)
122 time.Sleep(time.Duration(5+backoff) * time.Second)
123 backoff++
124
125 continue
126 }
127 l.Info("connected to tap service")
128
129 l.Info("tap event subscription response", "code", res.StatusCode)
130
131 if err = c.handleConnection(ctx, conn, handler); err != nil {
132 l.Warn("tap connection failed", "err", err, "backoff", backoff)
133 }
134 }
135}
136
137func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error {
138 l := log.FromContext(ctx)
139
140 defer func() {
141 conn.Close()
142 l.Warn("closed tap conection")
143 }()
144 l.Info("established tap conection")
145
146 for {
147 select {
148 case <-ctx.Done():
149 return ctx.Err()
150 default:
151 }
152 _, message, err := conn.ReadMessage()
153 if err != nil {
154 return err
155 }
156
157 var ev Event
158 if err := json.Unmarshal(message, &ev); err != nil {
159 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err))
160 continue
161 }
162 if err := handler.OnEvent(ctx, ev); err != nil {
163 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err))
164 continue
165 }
166
167 ack := map[string]any{
168 "type": "ack",
169 "id": ev.ID,
170 }
171 if err := conn.WriteJSON(ack); err != nil {
172 l.Warn("failed to send ack", "err", err)
173 continue
174 }
175 }
176}