A vibe coded tangled fork which supports pijul.
at e620e86a00c541e18e9e8097f97e51962631c5cd 268 lines 5.5 kB view raw
1package db 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/pagination" 11 "tangled.org/core/knotmirror/models" 12 "tangled.org/core/orm" 13) 14 15func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error { 16 if _, err := e.ExecContext(ctx, 17 `insert into repos (did, rkey, cid, name, knot_domain) 18 values (?, ?, ?, ?, ?)`, 19 did, rkey, cid, name, knot, 20 ); err != nil { 21 return fmt.Errorf("inserting repo: %w", err) 22 } 23 return nil 24} 25 26func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 27 if _, err := e.ExecContext(ctx, 28 `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 29 values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 30 on conflict(did, rkey) do update set 31 cid = excluded.cid, 32 name = excluded.name, 33 knot_domain = excluded.knot_domain, 34 git_rev = excluded.git_rev, 35 repo_sha = excluded.repo_sha, 36 state = excluded.state, 37 error_msg = excluded.error_msg, 38 retry_count = excluded.retry_count, 39 retry_after = excluded.retry_after`, 40 // where repos.cid != excluded.cid`, 41 repo.Did, 42 repo.Rkey, 43 repo.Cid, 44 repo.Name, 45 repo.KnotDomain, 46 repo.GitRev, 47 repo.RepoSha, 48 repo.State, 49 repo.ErrorMsg, 50 repo.RetryCount, 51 repo.RetryAfter, 52 ); err != nil { 53 return fmt.Errorf("upserting repo: %w", err) 54 } 55 return nil 56} 57 58func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error { 59 if _, err := e.ExecContext(ctx, 60 `update repos 61 set state = ? 62 where did = ? and rkey = ?`, 63 state, 64 did, rkey, 65 ); err != nil { 66 return fmt.Errorf("updating repo: %w", err) 67 } 68 return nil 69} 70 71func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error { 72 if _, err := e.ExecContext(ctx, 73 `delete from repos where did = ? and rkey = ?`, 74 did, 75 rkey, 76 ); err != nil { 77 return fmt.Errorf("deleting repo: %w", err) 78 } 79 return nil 80} 81 82func GetRepoByName(ctx context.Context, e *sql.DB, did syntax.DID, name string) (*models.Repo, error) { 83 var repo models.Repo 84 if err := e.QueryRowContext(ctx, 85 `select 86 did, 87 rkey, 88 cid, 89 name, 90 knot_domain, 91 git_rev, 92 repo_sha, 93 state, 94 error_msg, 95 retry_count, 96 retry_after 97 from repos 98 where did = ? and name = ?`, 99 did, 100 name, 101 ).Scan( 102 &repo.Did, 103 &repo.Rkey, 104 &repo.Cid, 105 &repo.Name, 106 &repo.KnotDomain, 107 &repo.GitRev, 108 &repo.RepoSha, 109 &repo.State, 110 &repo.ErrorMsg, 111 &repo.RetryCount, 112 &repo.RetryAfter, 113 ); err != nil { 114 if errors.Is(err, sql.ErrNoRows) { 115 return nil, nil 116 } 117 return nil, fmt.Errorf("querying repo: %w", err) 118 } 119 return &repo, nil 120} 121 122func GetRepoByAtUri(ctx context.Context, e *sql.DB, aturi syntax.ATURI) (*models.Repo, error) { 123 var repo models.Repo 124 if err := e.QueryRowContext(ctx, 125 `select 126 did, 127 rkey, 128 cid, 129 name, 130 knot_domain, 131 git_rev, 132 repo_sha, 133 state, 134 error_msg, 135 retry_count, 136 retry_after 137 from repos 138 where at_uri = ?`, 139 aturi, 140 ).Scan( 141 &repo.Did, 142 &repo.Rkey, 143 &repo.Cid, 144 &repo.Name, 145 &repo.KnotDomain, 146 &repo.GitRev, 147 &repo.RepoSha, 148 &repo.State, 149 &repo.ErrorMsg, 150 &repo.RetryCount, 151 &repo.RetryAfter, 152 ); err != nil { 153 if errors.Is(err, sql.ErrNoRows) { 154 return nil, nil 155 } 156 return nil, fmt.Errorf("querying repo: %w", err) 157 } 158 return &repo, nil 159} 160 161func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, filters ...orm.Filter) ([]models.Repo, error) { 162 var conditions []string 163 var args []any 164 165 for _, filter := range filters { 166 conditions = append(conditions, filter.Condition()) 167 args = append(args, filter.Arg()...) 168 } 169 170 whereClause := "" 171 if len(conditions) > 0 { 172 whereClause = "WHERE " + conditions[0] 173 for _, condition := range conditions[1:] { 174 whereClause += " AND " + condition 175 } 176 } 177 pageClause := "" 178 if page.Limit > 0 { 179 pageClause = " limit ? offset ? " 180 args = append(args, page.Limit, page.Offset) 181 } 182 183 query := ` 184 select 185 did, 186 rkey, 187 cid, 188 name, 189 knot_domain, 190 git_rev, 191 repo_sha, 192 state, 193 error_msg, 194 retry_count, 195 retry_after 196 from repos 197 ` + whereClause + pageClause 198 rows, err := e.QueryContext(ctx, query, args...) 199 if err != nil { 200 return nil, err 201 } 202 defer rows.Close() 203 204 var repos []models.Repo 205 for rows.Next() { 206 var repo models.Repo 207 if err := rows.Scan( 208 &repo.Did, 209 &repo.Rkey, 210 &repo.Cid, 211 &repo.Name, 212 &repo.KnotDomain, 213 &repo.GitRev, 214 &repo.RepoSha, 215 &repo.State, 216 &repo.ErrorMsg, 217 &repo.RetryCount, 218 &repo.RetryAfter, 219 ); err != nil { 220 return nil, fmt.Errorf("scanning row: %w", err) 221 } 222 repos = append(repos, repo) 223 } 224 if err := rows.Err(); err != nil { 225 return nil, fmt.Errorf("scanning rows: %w ", err) 226 } 227 228 return repos, nil 229} 230 231func GetRepoCountsByState(ctx context.Context, e *sql.DB) (map[models.RepoState]int64, error) { 232 const q = ` 233 SELECT state, COUNT(*) 234 FROM repos 235 GROUP BY state 236 ` 237 238 rows, err := e.QueryContext(ctx, q) 239 if err != nil { 240 return nil, err 241 } 242 defer rows.Close() 243 244 counts := make(map[models.RepoState]int64) 245 246 for rows.Next() { 247 var state string 248 var count int64 249 250 if err := rows.Scan(&state, &count); err != nil { 251 return nil, err 252 } 253 254 counts[models.RepoState(state)] = count 255 } 256 257 if err := rows.Err(); err != nil { 258 return nil, err 259 } 260 261 for _, s := range models.AllRepoStates { 262 if _, ok := counts[s]; !ok { 263 counts[s] = 0 264 } 265 } 266 267 return counts, nil 268}