A vibe coded tangled fork which supports pijul.

knotmirror: switch to postgres for concurrent writes

Signed-off-by: Seongmin Lee <git@boltless.me>

+97 -94
+4
go.mod
··· 35 35 github.com/hiddeco/sshsig v0.2.0 36 36 github.com/hpcloud/tail v1.0.0 37 37 github.com/ipfs/go-cid v0.5.0 38 + github.com/jackc/pgx/v5 v5.8.0 38 39 github.com/mattn/go-sqlite3 v1.14.24 39 40 github.com/microcosm-cc/bluemonday v1.0.27 40 41 github.com/openbao/openbao/api/v2 v2.3.0 ··· 160 161 github.com/ipfs/go-log v1.0.5 // indirect 161 162 github.com/ipfs/go-log/v2 v2.6.0 // indirect 162 163 github.com/ipfs/go-metrics-interface v0.3.0 // indirect 164 + github.com/jackc/pgpassfile v1.0.0 // indirect 165 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect 166 + github.com/jackc/puddle/v2 v2.2.2 // indirect 163 167 github.com/json-iterator/go v1.1.12 // indirect 164 168 github.com/kevinburke/ssh_config v1.2.0 // indirect 165 169 github.com/klauspost/compress v1.18.0 // indirect
+8
go.sum
··· 347 347 github.com/ipfs/go-log/v2 v2.6.0/go.mod h1:p+Efr3qaY5YXpx9TX7MoLCSEZX5boSWj9wh86P5HJa8= 348 348 github.com/ipfs/go-metrics-interface v0.3.0 h1:YwG7/Cy4R94mYDUuwsBfeziJCVm9pBMJ6q/JR9V40TU= 349 349 github.com/ipfs/go-metrics-interface v0.3.0/go.mod h1:OxxQjZDGocXVdyTPocns6cOLwHieqej/jos7H4POwoY= 350 + github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= 351 + github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= 352 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= 353 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= 354 + github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= 355 + github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= 356 + github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= 357 + github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= 350 358 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= 351 359 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= 352 360 github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
+7 -25
knotmirror/adminpage.go
··· 13 13 "tangled.org/core/appview/pagination" 14 14 "tangled.org/core/knotmirror/db" 15 15 "tangled.org/core/knotmirror/models" 16 - "tangled.org/core/orm" 17 16 ) 18 17 19 18 //go:embed templates/*.html ··· 56 55 } 57 56 58 57 func (s *AdminServer) handleRepos() http.HandlerFunc { 59 - // TODO: prepare template 60 58 tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/repos.html")) 61 59 return func(w http.ResponseWriter, r *http.Request) { 62 60 pageNum, _ := strconv.Atoi(r.URL.Query().Get("page")) 63 61 if pageNum < 1 { 64 62 pageNum = 1 65 63 } 66 - var ( 67 - did = r.URL.Query().Get("did") 68 - knot = r.URL.Query().Get("knot") 69 - state = r.URL.Query().Get("state") 70 - ) 71 - 72 64 page := pagination.Page{ 73 65 Offset: (pageNum - 1) * repoPageSize, 74 66 Limit: repoPageSize, 75 67 } 76 - var filters []orm.Filter 77 68 78 - if did != "" { 79 - filters = append(filters, orm.FilterEq("did", did)) 80 - } 81 - if knot != "" { 82 - filters = append(filters, orm.FilterEq("knot_domain", knot)) 83 - } 84 - if state != "" { 85 - filters = append(filters, orm.FilterEq("state", state)) 86 - } 69 + var ( 70 + did = r.URL.Query().Get("did") 71 + knot = r.URL.Query().Get("knot") 72 + state = r.URL.Query().Get("state") 73 + ) 87 74 88 - repos, err := db.ListRepos(r.Context(), s.db, page, filters...) 75 + repos, err := db.ListRepos(r.Context(), s.db, page, did, knot, state) 89 76 if err != nil { 90 77 http.Error(w, err.Error(), http.StatusInternalServerError) 91 78 } ··· 112 99 return func(w http.ResponseWriter, r *http.Request) { 113 100 var status = r.URL.Query().Get("status") 114 101 115 - var filters []orm.Filter 116 - if status != "" { 117 - filters = append(filters, orm.FilterEq("status", status)) 118 - } 119 - 120 - hosts, err := db.ListHosts(r.Context(), s.db, filters...) 102 + hosts, err := db.ListHosts(r.Context(), s.db, models.HostStatus(status)) 121 103 if err != nil { 122 104 http.Error(w, err.Error(), http.StatusInternalServerError) 123 105 }
+1 -1
knotmirror/config/config.go
··· 9 9 10 10 type Config struct { 11 11 TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"` 12 - DbPath string `env:"MIRROR_DB_PATH, default=mirror.db"` 12 + DbUrl string `env:"MIRROR_DB_URL, required"` 13 13 KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not scheme is not specified 14 14 KnotSSRF bool `env:"MIRROR_KNOT_SSRF, default=false"` 15 15 GitRepoBasePath string `env:"MIRROR_GIT_BASEPATH, default=repos"`
+22 -16
knotmirror/db/db.go
··· 4 4 "context" 5 5 "database/sql" 6 6 "fmt" 7 - "strings" 8 - _ "github.com/mattn/go-sqlite3" 7 + "time" 8 + 9 + _ "github.com/jackc/pgx/v5/stdlib" 9 10 ) 10 11 11 - func Make(ctx context.Context, dbPath string) (*sql.DB, error) { 12 - // https://github.com/mattn/go-sqlite3#connection-string 13 - opts := []string{ 14 - "_foreign_keys=1", 15 - "_journal_mode=WAL", 16 - "_synchronous=NORMAL", 17 - "_auto_vacuum=incremental", 12 + func Make(ctx context.Context, dbUrl string, maxConns int) (*sql.DB, error) { 13 + db, err := sql.Open("pgx", dbUrl) 14 + if err != nil { 15 + return nil, fmt.Errorf("opening db: %w", err) 18 16 } 19 17 20 - db, err := sql.Open("sqlite3", dbPath+"?"+strings.Join(opts, "&")) 21 - if err != nil { 22 - return nil, err 18 + db.SetMaxOpenConns(maxConns) 19 + db.SetMaxIdleConns(maxConns) 20 + db.SetConnMaxIdleTime(time.Hour) 21 + 22 + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) 23 + defer cancel() 24 + if err := db.PingContext(pingCtx); err != nil { 25 + db.Close() 26 + return nil, fmt.Errorf("ping db: %w", err) 23 27 } 24 28 25 29 conn, err := db.Conn(ctx) ··· 47 51 retry_count integer not null default 0, 48 52 retry_after integer not null default 0, 49 53 50 - unique(did, rkey) 54 + constraint repos_pkey primary key (did, rkey) 51 55 ); 52 56 53 57 -- knot hosts 54 58 create table if not exists hosts ( 55 59 hostname text not null, 56 - no_ssl integer not null default 0, 60 + no_ssl boolean not null default false, 57 61 status text not null default 'active', 58 - last_seq integer not null default -1, 62 + last_seq bigint not null default -1, 59 63 60 - unique(hostname) 64 + constraint hosts_pkey primary key (hostname) 61 65 ); 66 + 67 + create index if not exists idx_repos_aturi on repos (at_uri); 62 68 `) 63 69 if err != nil { 64 70 return nil, fmt.Errorf("initializing db schema: %w", err)
+8 -22
knotmirror/db/hosts.go
··· 6 6 "errors" 7 7 "fmt" 8 8 "log" 9 - "strings" 10 9 11 10 "tangled.org/core/knotmirror/models" 12 - "tangled.org/core/orm" 13 11 ) 14 12 15 13 func UpsertHost(ctx context.Context, e *sql.DB, host *models.Host) error { 16 14 if _, err := e.ExecContext(ctx, 17 15 `insert into hosts (hostname, no_ssl, status, last_seq) 18 - values (?, ?, ?, ?) 16 + values ($1, $2, $3, $4) 19 17 on conflict(hostname) do update set 20 18 no_ssl = excluded.no_ssl, 21 19 status = excluded.status, ··· 35 33 var host models.Host 36 34 if err := e.QueryRowContext(ctx, 37 35 `select hostname, no_ssl, status, last_seq 38 - from hosts where hostname = ?`, 36 + from hosts where hostname = $1`, 39 37 hostname, 40 38 ).Scan( 41 39 &host.Hostname, ··· 62 60 continue 63 61 } 64 62 if _, err := tx.ExecContext(ctx, 65 - `update hosts set last_seq = ? where hostname = ?`, 63 + `update hosts set last_seq = $1 where hostname = $2`, 66 64 cur.LastSeq, 67 65 cur.Hostname, 68 66 ); err != nil { 69 - log.Println("failed to persist host cursor", "host:", cur.Hostname, "lastSeq", cur.LastSeq) 67 + log.Println("failed to persist host cursor", "host", cur.Hostname, "lastSeq", cur.LastSeq, "err", err) 70 68 } 71 69 } 72 70 return tx.Commit() 73 71 } 74 72 75 - func ListHosts(ctx context.Context, e *sql.DB, filters ...orm.Filter) ([]models.Host, error) { 76 - var conditions []string 77 - var args []any 78 - 79 - for _, filter := range filters { 80 - conditions = append(conditions, filter.Condition()) 81 - args = append(args, filter.Arg()...) 82 - } 83 - 84 - whereClause := "" 85 - if len(conditions) > 0 { 86 - whereClause = " where " + strings.Join(conditions, " and ") 87 - } 88 - 73 + func ListHosts(ctx context.Context, e *sql.DB, status models.HostStatus) ([]models.Host, error) { 89 74 rows, err := e.QueryContext(ctx, 90 75 `select hostname, no_ssl, status, last_seq 91 - from hosts` + whereClause, 92 - args..., 76 + from hosts 77 + where status = $1`, 78 + status, 93 79 ) 94 80 if err != nil { 95 81 return nil, fmt.Errorf("querying hosts: %w", err)
+24 -17
knotmirror/db/repos.go
··· 9 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 10 "tangled.org/core/appview/pagination" 11 11 "tangled.org/core/knotmirror/models" 12 - "tangled.org/core/orm" 13 12 ) 14 13 15 14 func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error { 16 15 if _, err := e.ExecContext(ctx, 17 16 `insert into repos (did, rkey, cid, name, knot_domain) 18 - values (?, ?, ?, ?, ?)`, 17 + values ($1, $2, $3, $4, $5)`, 19 18 did, rkey, cid, name, knot, 20 19 ); err != nil { 21 20 return fmt.Errorf("inserting repo: %w", err) ··· 26 25 func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 27 26 if _, err := e.ExecContext(ctx, 28 27 `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 29 - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 28 + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 30 29 on conflict(did, rkey) do update set 31 30 cid = excluded.cid, 32 31 name = excluded.name, ··· 58 57 func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error { 59 58 if _, err := e.ExecContext(ctx, 60 59 `update repos 61 - set state = ? 62 - where did = ? and rkey = ?`, 60 + set state = $1 61 + where did = $2 and rkey = $3`, 63 62 state, 64 63 did, rkey, 65 64 ); err != nil { ··· 70 69 71 70 func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error { 72 71 if _, err := e.ExecContext(ctx, 73 - `delete from repos where did = ? and rkey = ?`, 72 + `delete from repos where did = $1 and rkey = $2`, 74 73 did, 75 74 rkey, 76 75 ); err != nil { ··· 95 94 retry_count, 96 95 retry_after 97 96 from repos 98 - where did = ? and name = ?`, 97 + where did = $1 and name = $2`, 99 98 did, 100 99 name, 101 100 ).Scan( ··· 135 134 retry_count, 136 135 retry_after 137 136 from repos 138 - where at_uri = ?`, 137 + where at_uri = $1`, 139 138 aturi, 140 139 ).Scan( 141 140 &repo.Did, ··· 158 157 return &repo, nil 159 158 } 160 159 161 - func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, filters ...orm.Filter) ([]models.Repo, error) { 160 + func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, did, knot, state string) ([]models.Repo, error) { 162 161 var conditions []string 163 162 var args []any 164 163 165 - for _, filter := range filters { 166 - conditions = append(conditions, filter.Condition()) 167 - args = append(args, filter.Arg()...) 164 + pageClause := "" 165 + if page.Limit > 0 { 166 + pageClause = " limit $1 offset $2 " 167 + args = append(args, page.Limit, page.Offset) 168 168 } 169 169 170 170 whereClause := "" 171 + if did != "" { 172 + conditions = append(conditions, fmt.Sprintf("did = $%d", len(args)+1)) 173 + args = append(args, did) 174 + } 175 + if knot != "" { 176 + conditions = append(conditions, fmt.Sprintf("knot_domain = $%d", len(args)+1)) 177 + args = append(args, knot) 178 + } 179 + if state != "" { 180 + conditions = append(conditions, fmt.Sprintf("state = $%d", len(args)+1)) 181 + args = append(args, state) 182 + } 171 183 if len(conditions) > 0 { 172 184 whereClause = "WHERE " + conditions[0] 173 185 for _, condition := range conditions[1:] { 174 186 whereClause += " AND " + condition 175 187 } 176 - } 177 - pageClause := "" 178 - if page.Limit > 0 { 179 - pageClause = " limit ? offset ? " 180 - args = append(args, page.Limit, page.Offset) 181 188 } 182 189 183 190 query := `
+2 -2
knotmirror/knotmirror.go
··· 28 28 29 29 logger.Debug("config loaded:", "config", cfg) 30 30 31 - db, err := db.Make(ctx, cfg.DbPath) 31 + db, err := db.Make(ctx, cfg.DbUrl, 32) 32 32 if err != nil { 33 33 return fmt.Errorf("initializing db: %w", err) 34 34 } 35 35 36 36 res, err := db.ExecContext(ctx, 37 - `update repos set state = ? where state = ?`, 37 + `update repos set state = $1 where state = $2`, 38 38 models.RepoStateDesynchronized, 39 39 models.RepoStateResyncing, 40 40 )
+1 -2
knotmirror/knotstream/knotstream.go
··· 11 11 "tangled.org/core/knotmirror/db" 12 12 "tangled.org/core/knotmirror/models" 13 13 "tangled.org/core/log" 14 - "tangled.org/core/orm" 15 14 ) 16 15 17 16 type KnotStream struct { ··· 71 70 } 72 71 73 72 func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error { 74 - hosts, err := db.ListHosts(ctx, s.db, orm.FilterEq("status", "active")) 73 + hosts, err := db.ListHosts(ctx, s.db, models.HostStatusActive) 75 74 if err != nil { 76 75 return fmt.Errorf("listing hosts: %w", err) 77 76 }
+8 -4
knotmirror/resyncer.go
··· 10 10 "net/url" 11 11 "os" 12 12 "path" 13 + "strings" 13 14 "sync" 14 15 "time" 15 16 ··· 85 86 now := time.Now().Unix() 86 87 if err := r.db.QueryRowContext(ctx, 87 88 `update repos 88 - set state = ? 89 + set state = $1 89 90 where at_uri = ( 90 91 select at_uri from repos 91 - where state in (?, ?, ?) 92 - and (retry_after = 0 or retry_after < ?) 92 + where state in ($2, $3, $4) 93 + and (retry_after = 0 or retry_after < $5) 93 94 limit 1 94 95 ) 95 96 returning at_uri ··· 216 217 retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 217 218 } 218 219 220 + // remove null bytes 221 + errMsg = strings.ReplaceAll(errMsg, "\x00", "") 222 + 219 223 repo.State = state 220 224 repo.ErrorMsg = errMsg 221 225 repo.RetryCount = retryCount 222 226 repo.RetryAfter = retryAfter 223 227 if dbErr := db.UpsertRepo(ctx, r.db, repo); dbErr != nil { 224 - return dbErr 228 + return fmt.Errorf("failed to update repo state: %w", err) 225 229 } 226 230 return err 227 231 }
+12
nix/gomod2nix.toml
··· 398 398 [mod."github.com/ipfs/go-metrics-interface"] 399 399 version = "v0.3.0" 400 400 hash = "sha256-b3tp3jxecLmJEGx2kW7MiKGlAKPEWg/LJ7hXylSC8jQ=" 401 + [mod."github.com/jackc/pgpassfile"] 402 + version = "v1.0.0" 403 + hash = "sha256-H0nFbC34/3pZUFnuiQk9W7yvAMh6qJDrqvHp+akBPLM=" 404 + [mod."github.com/jackc/pgservicefile"] 405 + version = "v0.0.0-20240606120523-5a60cdf6a761" 406 + hash = "sha256-ETpGsLAA2wcm5xJBayr/mZrCE1YsWbnkbSSX3ptrFn0=" 407 + [mod."github.com/jackc/pgx/v5"] 408 + version = "v5.8.0" 409 + hash = "sha256-Mq5/A/Obcceu6kKxUv30DPC2ZaVvD8Iq/YtmLm1BVec=" 410 + [mod."github.com/jackc/puddle/v2"] 411 + version = "v2.2.2" 412 + hash = "sha256-IUxdu4JYfsCh/qlz2SiUWu7EVPHhyooiVA4oaS2Z6yk=" 401 413 [mod."github.com/json-iterator/go"] 402 414 version = "v1.1.12" 403 415 hash = "sha256-To8A0h+lbfZ/6zM+2PpRpY3+L6725OPC66lffq6fUoM="
-5
nix/pkgs/knot-mirror.nix
··· 1 1 { 2 2 buildGoApplication, 3 3 modules, 4 - sqlite-lib, 5 4 src, 6 5 }: 7 6 buildGoApplication { ··· 12 11 doCheck = false; 13 12 14 13 subPackages = ["cmd/knotmirror"]; 15 - tags = ["libsqlite3"]; 16 14 17 - env.CGO_CFLAGS = "-I ${sqlite-lib}/include "; 18 - env.CGO_LDFLAGS = "-L ${sqlite-lib}/lib"; 19 - CGO_ENABLED = 1; 20 15 meta = { 21 16 mainProgram = "knotmirror"; 22 17 };