A vibe coded tangled fork which supports pijul.
at 3b1c134c75dcc90060f2c83aa675e123e3e863c8 177 lines 3.9 kB view raw
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 3package tapc 4 5import ( 6 "bytes" 7 "context" 8 "encoding/json" 9 "fmt" 10 "net/http" 11 "net/url" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/gorilla/websocket" 16 "tangled.org/core/log" 17) 18 19// type WebsocketOptions struct { 20// maxReconnectSeconds int 21// heartbeatIntervalMs int 22// // onReconnectError 23// } 24 25type Handler interface { 26 OnEvent(ctx context.Context, evt Event) error 27 OnError(ctx context.Context, err error) 28} 29 30type Client struct { 31 Url string 32 AdminPassword string 33 HTTPClient *http.Client 34} 35 36func NewClient(url, adminPassword string) Client { 37 return Client{ 38 Url: url, 39 AdminPassword: adminPassword, 40 HTTPClient: &http.Client{}, 41 } 42} 43 44func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 45 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 46 if err != nil { 47 return err 48 } 49 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 50 if err != nil { 51 return err 52 } 53 req.SetBasicAuth("admin", c.AdminPassword) 54 req.Header.Set("Content-Type", "application/json") 55 56 resp, err := c.HTTPClient.Do(req) 57 if err != nil { 58 return err 59 } 60 defer resp.Body.Close() 61 if resp.StatusCode != http.StatusOK { 62 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 63 } 64 return nil 65} 66 67func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 68 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 69 if err != nil { 70 return err 71 } 72 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 73 if err != nil { 74 return err 75 } 76 req.SetBasicAuth("admin", c.AdminPassword) 77 req.Header.Set("Content-Type", "application/json") 78 79 resp, err := c.HTTPClient.Do(req) 80 if err != nil { 81 return err 82 } 83 defer resp.Body.Close() 84 if resp.StatusCode != http.StatusOK { 85 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 86 } 87 return nil 88} 89 90func (c *Client) Connect(ctx context.Context, handler Handler) error { 91 l := log.FromContext(ctx) 92 93 u, err := url.Parse(c.Url) 94 if err != nil { 95 return err 96 } 97 if u.Scheme == "https" { 98 u.Scheme = "wss" 99 } else { 100 u.Scheme = "ws" 101 } 102 u.Path = "/channel" 103 104 // TODO: set auth on dial 105 106 url := u.String() 107 108 var backoff int 109 for { 110 select { 111 case <-ctx.Done(): 112 return ctx.Err() 113 default: 114 } 115 116 header := http.Header{ 117 "Authorization": []string{""}, 118 } 119 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 120 if err != nil { 121 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 122 time.Sleep(time.Duration(5+backoff) * time.Second) 123 backoff++ 124 125 continue 126 } else { 127 backoff = 0 128 } 129 130 l.Info("tap event subscription response", "code", res.StatusCode) 131 132 if err = c.handleConnection(ctx, conn, handler); err != nil { 133 l.Warn("tap connection failed", "err", err, "backoff", backoff) 134 } 135 } 136} 137 138func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error { 139 l := log.FromContext(ctx) 140 141 defer func() { 142 conn.Close() 143 l.Warn("closed tap conection") 144 }() 145 l.Info("established tap conection") 146 147 for { 148 select { 149 case <-ctx.Done(): 150 return ctx.Err() 151 default: 152 } 153 _, message, err := conn.ReadMessage() 154 if err != nil { 155 return err 156 } 157 158 var ev Event 159 if err := json.Unmarshal(message, &ev); err != nil { 160 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 161 continue 162 } 163 if err := handler.OnEvent(ctx, ev); err != nil { 164 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 165 continue 166 } 167 168 ack := map[string]any{ 169 "type": "ack", 170 "id": ev.ID, 171 } 172 if err := conn.WriteJSON(ack); err != nil { 173 l.Warn("failed to send ack", "err", err) 174 continue 175 } 176 } 177}