A vibe coded tangled fork which supports pijul.
at sl/knotmirror 176 lines 3.9 kB view raw
1/// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 3package tapc 4 5import ( 6 "bytes" 7 "context" 8 "encoding/json" 9 "fmt" 10 "net/http" 11 "net/url" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 15 "github.com/gorilla/websocket" 16 "tangled.org/core/log" 17) 18 19// type WebsocketOptions struct { 20// maxReconnectSeconds int 21// heartbeatIntervalMs int 22// // onReconnectError 23// } 24 25type Handler interface { 26 OnEvent(ctx context.Context, evt Event) error 27 OnError(ctx context.Context, err error) 28} 29 30type Client struct { 31 Url string 32 AdminPassword string 33 HTTPClient *http.Client 34} 35 36func NewClient(url, adminPassword string) Client { 37 return Client{ 38 Url: url, 39 AdminPassword: adminPassword, 40 HTTPClient: &http.Client{}, 41 } 42} 43 44func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 45 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 46 if err != nil { 47 return err 48 } 49 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 50 if err != nil { 51 return err 52 } 53 req.SetBasicAuth("admin", c.AdminPassword) 54 req.Header.Set("Content-Type", "application/json") 55 56 resp, err := c.HTTPClient.Do(req) 57 if err != nil { 58 return err 59 } 60 defer resp.Body.Close() 61 if resp.StatusCode != http.StatusOK { 62 return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 63 } 64 return nil 65} 66 67func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 68 body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 69 if err != nil { 70 return err 71 } 72 req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 73 if err != nil { 74 return err 75 } 76 req.SetBasicAuth("admin", c.AdminPassword) 77 req.Header.Set("Content-Type", "application/json") 78 79 resp, err := c.HTTPClient.Do(req) 80 if err != nil { 81 return err 82 } 83 defer resp.Body.Close() 84 if resp.StatusCode != http.StatusOK { 85 return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 86 } 87 return nil 88} 89 90func (c *Client) Connect(ctx context.Context, handler Handler) error { 91 l := log.FromContext(ctx) 92 93 u, err := url.Parse(c.Url) 94 if err != nil { 95 return err 96 } 97 if u.Scheme == "https" { 98 u.Scheme = "wss" 99 } else { 100 u.Scheme = "ws" 101 } 102 u.Path = "/channel" 103 104 // TODO: set auth on dial 105 106 url := u.String() 107 108 var backoff int 109 for { 110 select { 111 case <-ctx.Done(): 112 return ctx.Err() 113 default: 114 } 115 116 header := http.Header{ 117 "Authorization": []string{""}, 118 } 119 conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 120 if err != nil { 121 l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 122 time.Sleep(time.Duration(5+backoff) * time.Second) 123 backoff++ 124 125 continue 126 } 127 l.Info("connected to tap service") 128 129 l.Info("tap event subscription response", "code", res.StatusCode) 130 131 if err = c.handleConnection(ctx, conn, handler); err != nil { 132 l.Warn("tap connection failed", "err", err, "backoff", backoff) 133 } 134 } 135} 136 137func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error { 138 l := log.FromContext(ctx) 139 140 defer func() { 141 conn.Close() 142 l.Warn("closed tap conection") 143 }() 144 l.Info("established tap conection") 145 146 for { 147 select { 148 case <-ctx.Done(): 149 return ctx.Err() 150 default: 151 } 152 _, message, err := conn.ReadMessage() 153 if err != nil { 154 return err 155 } 156 157 var ev Event 158 if err := json.Unmarshal(message, &ev); err != nil { 159 handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 160 continue 161 } 162 if err := handler.OnEvent(ctx, ev); err != nil { 163 handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 164 continue 165 } 166 167 ack := map[string]any{ 168 "type": "ack", 169 "id": ev.ID, 170 } 171 if err := conn.WriteJSON(ack); err != nil { 172 l.Warn("failed to send ack", "err", err) 173 continue 174 } 175 } 176}