A vibe coded tangled fork which supports pijul.
1package db
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "tangled.org/core/appview/pagination"
11 "tangled.org/core/knotmirror/models"
12 "tangled.org/core/orm"
13)
14
15func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error {
16 if _, err := e.ExecContext(ctx,
17 `insert into repos (did, rkey, cid, name, knot_domain)
18 values (?, ?, ?, ?, ?)`,
19 did, rkey, cid, name, knot,
20 ); err != nil {
21 return fmt.Errorf("inserting repo: %w", err)
22 }
23 return nil
24}
25
26func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error {
27 if _, err := e.ExecContext(ctx,
28 `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after)
29 values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
30 on conflict(did, rkey) do update set
31 cid = excluded.cid,
32 name = excluded.name,
33 knot_domain = excluded.knot_domain,
34 git_rev = excluded.git_rev,
35 repo_sha = excluded.repo_sha,
36 state = excluded.state,
37 error_msg = excluded.error_msg,
38 retry_count = excluded.retry_count,
39 retry_after = excluded.retry_after`,
40 // where repos.cid != excluded.cid`,
41 repo.Did,
42 repo.Rkey,
43 repo.Cid,
44 repo.Name,
45 repo.KnotDomain,
46 repo.GitRev,
47 repo.RepoSha,
48 repo.State,
49 repo.ErrorMsg,
50 repo.RetryCount,
51 repo.RetryAfter,
52 ); err != nil {
53 return fmt.Errorf("upserting repo: %w", err)
54 }
55 return nil
56}
57
58func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error {
59 if _, err := e.ExecContext(ctx,
60 `update repos
61 set state = ?
62 where did = ? and rkey = ?`,
63 state,
64 did, rkey,
65 ); err != nil {
66 return fmt.Errorf("updating repo: %w", err)
67 }
68 return nil
69}
70
71func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error {
72 if _, err := e.ExecContext(ctx,
73 `delete from repos where did = ? and rkey = ?`,
74 did,
75 rkey,
76 ); err != nil {
77 return fmt.Errorf("deleting repo: %w", err)
78 }
79 return nil
80}
81
82func GetRepoByName(ctx context.Context, e *sql.DB, did syntax.DID, name string) (*models.Repo, error) {
83 var repo models.Repo
84 if err := e.QueryRowContext(ctx,
85 `select
86 did,
87 rkey,
88 cid,
89 name,
90 knot_domain,
91 git_rev,
92 repo_sha,
93 state,
94 error_msg,
95 retry_count,
96 retry_after
97 from repos
98 where did = ? and name = ?`,
99 did,
100 name,
101 ).Scan(
102 &repo.Did,
103 &repo.Rkey,
104 &repo.Cid,
105 &repo.Name,
106 &repo.KnotDomain,
107 &repo.GitRev,
108 &repo.RepoSha,
109 &repo.State,
110 &repo.ErrorMsg,
111 &repo.RetryCount,
112 &repo.RetryAfter,
113 ); err != nil {
114 if errors.Is(err, sql.ErrNoRows) {
115 return nil, nil
116 }
117 return nil, fmt.Errorf("querying repo: %w", err)
118 }
119 return &repo, nil
120}
121
122func GetRepoByAtUri(ctx context.Context, e *sql.DB, aturi syntax.ATURI) (*models.Repo, error) {
123 var repo models.Repo
124 if err := e.QueryRowContext(ctx,
125 `select
126 did,
127 rkey,
128 cid,
129 name,
130 knot_domain,
131 git_rev,
132 repo_sha,
133 state,
134 error_msg,
135 retry_count,
136 retry_after
137 from repos
138 where at_uri = ?`,
139 aturi,
140 ).Scan(
141 &repo.Did,
142 &repo.Rkey,
143 &repo.Cid,
144 &repo.Name,
145 &repo.KnotDomain,
146 &repo.GitRev,
147 &repo.RepoSha,
148 &repo.State,
149 &repo.ErrorMsg,
150 &repo.RetryCount,
151 &repo.RetryAfter,
152 ); err != nil {
153 if errors.Is(err, sql.ErrNoRows) {
154 return nil, nil
155 }
156 return nil, fmt.Errorf("querying repo: %w", err)
157 }
158 return &repo, nil
159}
160
161func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, filters ...orm.Filter) ([]models.Repo, error) {
162 var conditions []string
163 var args []any
164
165 for _, filter := range filters {
166 conditions = append(conditions, filter.Condition())
167 args = append(args, filter.Arg()...)
168 }
169
170 whereClause := ""
171 if len(conditions) > 0 {
172 whereClause = "WHERE " + conditions[0]
173 for _, condition := range conditions[1:] {
174 whereClause += " AND " + condition
175 }
176 }
177 pageClause := ""
178 if page.Limit > 0 {
179 pageClause = " limit ? offset ? "
180 args = append(args, page.Limit, page.Offset)
181 }
182
183 query := `
184 select
185 did,
186 rkey,
187 cid,
188 name,
189 knot_domain,
190 git_rev,
191 repo_sha,
192 state,
193 error_msg,
194 retry_count,
195 retry_after
196 from repos
197 ` + whereClause + pageClause
198 rows, err := e.QueryContext(ctx, query, args...)
199 if err != nil {
200 return nil, err
201 }
202 defer rows.Close()
203
204 var repos []models.Repo
205 for rows.Next() {
206 var repo models.Repo
207 if err := rows.Scan(
208 &repo.Did,
209 &repo.Rkey,
210 &repo.Cid,
211 &repo.Name,
212 &repo.KnotDomain,
213 &repo.GitRev,
214 &repo.RepoSha,
215 &repo.State,
216 &repo.ErrorMsg,
217 &repo.RetryCount,
218 &repo.RetryAfter,
219 ); err != nil {
220 return nil, fmt.Errorf("scanning row: %w", err)
221 }
222 repos = append(repos, repo)
223 }
224 if err := rows.Err(); err != nil {
225 return nil, fmt.Errorf("scanning rows: %w ", err)
226 }
227
228 return repos, nil
229}
230
231func GetRepoCountsByState(ctx context.Context, e *sql.DB) (map[models.RepoState]int64, error) {
232 const q = `
233 SELECT state, COUNT(*)
234 FROM repos
235 GROUP BY state
236 `
237
238 rows, err := e.QueryContext(ctx, q)
239 if err != nil {
240 return nil, err
241 }
242 defer rows.Close()
243
244 counts := make(map[models.RepoState]int64)
245
246 for rows.Next() {
247 var state string
248 var count int64
249
250 if err := rows.Scan(&state, &count); err != nil {
251 return nil, err
252 }
253
254 counts[models.RepoState(state)] = count
255 }
256
257 if err := rows.Err(); err != nil {
258 return nil, err
259 }
260
261 for _, s := range models.AllRepoStates {
262 if _, ok := counts[s]; !ok {
263 counts[s] = 0
264 }
265 }
266
267 return counts, nil
268}