A vibe coded tangled fork which supports pijul.
at 7033a0e257d514dcfd36aa332e934845ca4afa5d 192 lines 4.2 kB view raw
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 3package tapc 4 5import ( 6 "bytes" 7 "context" 8 "encoding/json" 9 "fmt" 10 "net/http" 11 "net/url" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/gorilla/websocket" 16 "tangled.org/core/log" 17) 18 19// type WebsocketOptions struct { 20// maxReconnectSeconds int 21// heartbeatIntervalMs int 22// // onReconnectError 23// } 24 25type Handler interface { 26 OnEvent(ctx context.Context, evt Event) error 27 OnError(ctx context.Context, err error) 28} 29 30type Client struct { 31 Url string 32 AdminPassword string 33 HTTPClient *http.Client 34 35 connectedSig chan struct{} 36} 37 38func NewClient(url, adminPassword string) Client { 39 return Client{ 40 Url: url, 41 AdminPassword: adminPassword, 42 HTTPClient: &http.Client{}, 43 44 connectedSig: make(chan struct{}), 45 } 46} 47 48func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 49 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 50 if err != nil { 51 return err 52 } 53 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 54 if err != nil { 55 return err 56 } 57 req.SetBasicAuth("admin", c.AdminPassword) 58 req.Header.Set("Content-Type", "application/json") 59 60 resp, err := c.HTTPClient.Do(req) 61 if err != nil { 62 return err 63 } 64 defer resp.Body.Close() 65 if resp.StatusCode != http.StatusOK { 66 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 67 } 68 return nil 69} 70 71func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 72 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 73 if err != nil { 74 return err 75 } 76 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 77 if err != nil { 78 return err 79 } 80 req.SetBasicAuth("admin", c.AdminPassword) 81 req.Header.Set("Content-Type", "application/json") 82 83 resp, err := c.HTTPClient.Do(req) 84 if err != nil { 85 return err 86 } 87 defer resp.Body.Close() 88 if resp.StatusCode != http.StatusOK { 89 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 90 } 91 return nil 92} 93 94func (c *Client) Wait(ctx context.Context) error { 95 select { 96 case <-ctx.Done(): 97 return ctx.Err() 98 case <-c.connectedSig: 99 return nil 100 } 101} 102 103func (c *Client) Connect(ctx context.Context, handler Handler) error { 104 l := log.FromContext(ctx) 105 106 u, err := url.Parse(c.Url) 107 if err != nil { 108 return err 109 } 110 if u.Scheme == "https" { 111 u.Scheme = "wss" 112 } else { 113 u.Scheme = "ws" 114 } 115 u.Path = "/channel" 116 117 // TODO: set auth on dial 118 119 url := u.String() 120 121 var backoff int 122 for { 123 select { 124 case <-ctx.Done(): 125 return ctx.Err() 126 default: 127 } 128 129 header := http.Header{ 130 "Authorization": []string{""}, 131 } 132 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 133 if err != nil { 134 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 135 time.Sleep(time.Duration(5+backoff) * time.Second) 136 backoff++ 137 138 continue 139 } else { 140 backoff = 0 141 } 142 143 c.connectedSig <- struct{}{} 144 145 l.Info("tap event subscription response", "code", res.StatusCode) 146 147 if err = c.handleConnection(ctx, conn, handler); err != nil { 148 l.Warn("tap connection failed", "err", err, "backoff", backoff) 149 } 150 } 151} 152 153func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error { 154 l := log.FromContext(ctx) 155 156 defer func() { 157 conn.Close() 158 l.Warn("closed tap conection") 159 }() 160 l.Info("established tap conection") 161 162 for { 163 select { 164 case <-ctx.Done(): 165 return ctx.Err() 166 default: 167 } 168 _, message, err := conn.ReadMessage() 169 if err != nil { 170 return err 171 } 172 173 var ev Event 174 if err := json.Unmarshal(message, &ev); err != nil { 175 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 176 continue 177 } 178 if err := handler.OnEvent(ctx, ev); err != nil { 179 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 180 continue 181 } 182 183 ack := map[string]any{ 184 "type": "ack", 185 "id": ev.ID, 186 } 187 if err := conn.WriteJSON(ack); err != nil { 188 l.Warn("failed to send ack", "err", err) 189 continue 190 } 191 } 192}