A vibe coded tangled fork which supports pijul.
at 42d2a2e1cccb37c6b7f8b789a8aaa95e755b2b85 201 lines 4.3 kB view raw
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 3package tapc 4 5import ( 6 "bytes" 7 "context" 8 "encoding/json" 9 "fmt" 10 "net/http" 11 "net/url" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/gorilla/websocket" 16 "tangled.org/core/log" 17) 18 19// type WebsocketOptions struct { 20// maxReconnectSeconds int 21// heartbeatIntervalMs int 22// // onReconnectError 23// } 24 25type Handler interface { 26 OnEvent(ctx context.Context, evt Event) error 27 OnError(ctx context.Context, err error) 28} 29 30type Client struct { 31 Url string 32 AdminPassword string 33 HTTPClient *http.Client 34 35 ready chan struct{} 36} 37 38func NewClient(url, adminPassword string) Client { 39 return Client{ 40 Url: url, 41 AdminPassword: adminPassword, 42 HTTPClient: &http.Client{}, 43 44 ready: make(chan struct{}), 45 } 46} 47 48func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 49 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 50 if err != nil { 51 return err 52 } 53 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 54 if err != nil { 55 return err 56 } 57 req.SetBasicAuth("admin", c.AdminPassword) 58 req.Header.Set("Content-Type", "application/json") 59 60 resp, err := c.HTTPClient.Do(req) 61 if err != nil { 62 return err 63 } 64 defer resp.Body.Close() 65 if resp.StatusCode != http.StatusOK { 66 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 67 } 68 return nil 69} 70 71func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 72 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 73 if err != nil { 74 return err 75 } 76 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 77 if err != nil { 78 return err 79 } 80 req.SetBasicAuth("admin", c.AdminPassword) 81 req.Header.Set("Content-Type", "application/json") 82 83 resp, err := c.HTTPClient.Do(req) 84 if err != nil { 85 return err 86 } 87 defer resp.Body.Close() 88 if resp.StatusCode != http.StatusOK { 89 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 90 } 91 return nil 92} 93 94func (c *Client) markReady() { 95 select { 96 case <-c.ready: 97 // already closed 98 default: 99 close(c.ready) 100 } 101} 102 103func (c *Client) WaitReady(ctx context.Context) error { 104 select { 105 case <-c.ready: 106 return nil 107 case <-ctx.Done(): 108 return ctx.Err() 109 } 110} 111 112func (c *Client) Connect(ctx context.Context, handler Handler) error { 113 l := log.FromContext(ctx) 114 115 u, err := url.Parse(c.Url) 116 if err != nil { 117 return err 118 } 119 if u.Scheme == "https" { 120 u.Scheme = "wss" 121 } else { 122 u.Scheme = "ws" 123 } 124 u.Path = "/channel" 125 126 // TODO: set auth on dial 127 128 url := u.String() 129 130 var backoff int 131 for { 132 select { 133 case <-ctx.Done(): 134 return ctx.Err() 135 default: 136 } 137 138 header := http.Header{ 139 "Authorization": []string{""}, 140 } 141 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 142 if err != nil { 143 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 144 time.Sleep(time.Duration(5+backoff) * time.Second) 145 backoff++ 146 147 continue 148 } 149 l.Info("connected to tap service") 150 151 backoff = 0 152 c.markReady() 153 154 l.Info("tap event subscription response", "code", res.StatusCode) 155 156 if err = c.handleConnection(ctx, conn, handler); err != nil { 157 l.Warn("tap connection failed", "err", err, "backoff", backoff) 158 } 159 } 160} 161 162func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error { 163 l := log.FromContext(ctx) 164 165 defer func() { 166 conn.Close() 167 l.Warn("closed tap conection") 168 }() 169 l.Info("established tap conection") 170 171 for { 172 select { 173 case <-ctx.Done(): 174 return ctx.Err() 175 default: 176 } 177 _, message, err := conn.ReadMessage() 178 if err != nil { 179 return err 180 } 181 182 var ev Event 183 if err := json.Unmarshal(message, &ev); err != nil { 184 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 185 continue 186 } 187 if err := handler.OnEvent(ctx, ev); err != nil { 188 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 189 continue 190 } 191 192 ack := map[string]any{ 193 "type": "ack", 194 "id": ev.ID, 195 } 196 if err := conn.WriteJSON(ack); err != nil { 197 l.Warn("failed to send ack", "err", err) 198 continue 199 } 200 } 201}