A vibe coded tangled fork which supports pijul.

knotmirror: add manual resync/cancel

Signed-off-by: Seongmin Lee <git@boltless.me>

+196 -9
+75 -4
knotmirror/adminpage.go
··· 3 3 import ( 4 4 "database/sql" 5 5 "embed" 6 + "fmt" 7 + "html" 6 8 "html/template" 7 9 "log/slog" 8 10 "net/http" 9 11 "strconv" 10 12 "time" 11 13 14 + "github.com/bluesky-social/indigo/atproto/syntax" 12 15 "github.com/go-chi/chi/v5" 13 16 "tangled.org/core/appview/pagination" 14 17 "tangled.org/core/knotmirror/db" ··· 21 24 const repoPageSize = 20 22 25 23 26 type AdminServer struct { 24 - db *sql.DB 27 + db *sql.DB 28 + resyncer *Resyncer 29 + logger *slog.Logger 25 30 } 26 31 27 - func NewAdminServer(database *sql.DB) *AdminServer { 28 - return &AdminServer{db: database} 32 + func NewAdminServer(l *slog.Logger, database *sql.DB, resyncer *Resyncer) *AdminServer { 33 + return &AdminServer{ 34 + db: database, 35 + resyncer: resyncer, 36 + logger: l, 37 + } 29 38 } 30 39 31 40 func (s *AdminServer) Router() http.Handler { 32 41 r := chi.NewRouter() 33 42 r.Get("/repos", s.handleRepos()) 34 43 r.Get("/hosts", s.handleHosts()) 44 + 45 + // not sure how to use these. should we vibe-code the admin page with React? 46 + r.Post("/api/triggerRepoResync", s.handleRepoResyncTrigger()) 47 + r.Post("/api/cancelRepoResync", s.handleRepoResyncCancel()) 48 + r.Post("/api/testNotif", s.handleTestNotif) 35 49 return r 36 50 } 37 51 ··· 40 54 "add": func(a, b int) int { return a + b }, 41 55 "sub": func(a, b int) int { return a - b }, 42 56 "readt": func(ts int64) string { 43 - if ts == 0 { 57 + if ts <= 0 { 44 58 return "n/a" 45 59 } 46 60 return time.Unix(ts, 0).Format("2006-01-02 15:04") ··· 112 126 } 113 127 } 114 128 } 129 + 130 + func (s *AdminServer) handleRepoResyncTrigger() http.HandlerFunc { 131 + return func(w http.ResponseWriter, r *http.Request) { 132 + var repoQuery = r.FormValue("repo") 133 + 134 + repo, err := syntax.ParseATURI(repoQuery) 135 + if err != nil || repo.RecordKey() == "" { 136 + writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 137 + return 138 + } 139 + 140 + if err := s.resyncer.TriggerResyncJob(r.Context(), repo); err != nil { 141 + s.logger.Error("failed to trigger resync job", "err", err) 142 + writeNotif(w, http.StatusInternalServerError, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 143 + return 144 + } 145 + writeNotif(w, http.StatusOK, "success") 146 + } 147 + } 148 + 149 + func (s *AdminServer) handleRepoResyncCancel() http.HandlerFunc { 150 + return func(w http.ResponseWriter, r *http.Request) { 151 + var repoQuery = r.FormValue("repo") 152 + 153 + repo, err := syntax.ParseATURI(repoQuery) 154 + if err != nil || repo.RecordKey() == "" { 155 + writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 156 + return 157 + } 158 + 159 + s.resyncer.CancelResyncJob(repo) 160 + writeNotif(w, http.StatusOK, "success") 161 + } 162 + } 163 + 164 + func (s *AdminServer) handleTestNotif(w http.ResponseWriter, r *http.Request) { 165 + writeNotif(w, http.StatusOK, "new notifi") 166 + } 167 + 168 + func writeNotif(w http.ResponseWriter, status int, msg string) { 169 + w.Header().Set("Content-Type", "text/html") 170 + w.WriteHeader(status) 171 + 172 + class := "info" 173 + switch { 174 + case status >= 500: 175 + class = "error" 176 + case status >= 400: 177 + class = "warn" 178 + } 179 + 180 + fmt.Fprintf(w, 181 + `<div hx-swap-oob="beforeend:#notifications"><div class="notif %s">%s</div></div>`, 182 + class, 183 + html.EscapeString(msg), 184 + ) 185 + }
+1 -1
knotmirror/knotmirror.go
··· 59 59 knotstream := knotstream.NewKnotStream(logger, db, cfg) 60 60 crawler := NewCrawler(logger, db) 61 61 resyncer := NewResyncer(logger, db, gitm, cfg) 62 - adminpage := NewAdminServer(db) 62 + adminpage := NewAdminServer(logger, db, resyncer) 63 63 64 64 // maintain repository list with tap 65 65 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
+4
knotmirror/models/models.go
··· 62 62 } 63 63 } 64 64 65 + func (s RepoState) IsResyncing() bool { 66 + return s == RepoStateResyncing 67 + } 68 + 65 69 type HostCursor struct { 66 70 Hostname string 67 71 LastSeq int64
+72 -4
knotmirror/resyncer.go
··· 25 25 26 26 claimJobMu sync.Mutex 27 27 28 - repoFetchTimeout time.Duration 28 + runningJobs map[syntax.ATURI]context.CancelFunc 29 + runningJobsMu sync.Mutex 30 + 31 + repoFetchTimeout time.Duration 32 + manualResyncTimeout time.Duration 29 33 30 34 parallelism int 31 35 } ··· 36 40 db: db, 37 41 gitm: gitm, 38 42 43 + runningJobs: make(map[syntax.ATURI]context.CancelFunc), 44 + 39 45 repoFetchTimeout: cfg.GitRepoFetchTimeout, 40 46 parallelism: cfg.ResyncParallelism, 47 + 48 + manualResyncTimeout: 30 * time.Minute, 41 49 } 42 50 } 43 51 ··· 73 81 } 74 82 } 75 83 84 + func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) { 85 + r.runningJobsMu.Lock() 86 + defer r.runningJobsMu.Unlock() 87 + 88 + if _, exists := r.runningJobs[repo]; exists { 89 + return 90 + } 91 + r.runningJobs[repo] = cancel 92 + } 93 + 94 + func (r *Resyncer) unregisterRunning(repo syntax.ATURI) { 95 + r.runningJobsMu.Lock() 96 + defer r.runningJobsMu.Unlock() 97 + 98 + delete(r.runningJobs, repo) 99 + } 100 + 101 + func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) { 102 + r.runningJobsMu.Lock() 103 + defer r.runningJobsMu.Unlock() 104 + 105 + cancel, ok := r.runningJobs[repo] 106 + if !ok { 107 + return 108 + } 109 + delete(r.runningJobs, repo) 110 + cancel() 111 + } 112 + 113 + // TriggerResyncJob manually triggers the resync job 114 + func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error { 115 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 116 + if err != nil { 117 + return fmt.Errorf("failed to get repo: %w", err) 118 + } 119 + if repo == nil { 120 + return fmt.Errorf("repo not found: %s", repoAt) 121 + } 122 + 123 + if repo.State == models.RepoStateResyncing { 124 + return fmt.Errorf("repo already resyncing") 125 + } 126 + 127 + repo.State = models.RepoStatePending 128 + repo.RetryAfter = -1 // resyncer will prioritize this 129 + 130 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 131 + return fmt.Errorf("updating repo state to pending %w", err) 132 + } 133 + return nil 134 + } 135 + 76 136 func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 77 137 // use mutex to prevent duplicated jobs 78 138 r.claimJobMu.Lock() ··· 86 146 where at_uri = ( 87 147 select at_uri from repos 88 148 where state in ($2, $3, $4) 89 - and (retry_after = 0 or retry_after < $5) 149 + and (retry_after = -1 or retry_after = 0 or retry_after < $5) 90 150 limit 1 91 151 ) 92 152 returning at_uri ··· 112 172 resyncsStarted.Inc() 113 173 startTime := time.Now() 114 174 115 - success, err := r.doResync(ctx, repoAt) 175 + jobCtx, cancel := context.WithCancel(ctx) 176 + r.registerRunning(repoAt, cancel) 177 + defer r.unregisterRunning(repoAt) 178 + 179 + success, err := r.doResync(jobCtx, repoAt) 116 180 if !success { 117 181 resyncsFailed.Inc() 118 182 resyncDuration.Observe(time.Since(startTime).Seconds()) ··· 140 204 // TODO: check if Knot is on backoff list. If so, return (false, nil) 141 205 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 142 206 143 - fetchCtx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 207 + timeout := r.repoFetchTimeout 208 + if repo.RetryAfter == -1 { 209 + timeout = r.manualResyncTimeout 210 + } 211 + fetchCtx, cancel := context.WithTimeout(ctx, timeout) 144 212 defer cancel() 145 213 146 214 if err := r.gitm.Sync(fetchCtx, repo); err != nil {
+29
knotmirror/templates/base.html
··· 11 11 th, td { text-align: left; padding: 8px; border: 1px solid #ddd; } 12 12 .pagination { margin-top: 20px; } 13 13 .filters { background: #f4f4f4; padding: 15px; margin-bottom: 20px; } 14 + #notifications { 15 + position: fixed; 16 + bottom: 8px; 17 + right: 8px; 18 + z-index: 1000; 19 + pointer-events: none; 20 + } 21 + .notif { 22 + pointer-events: auto; 23 + background: #333; 24 + color: #fff; 25 + padding: 2px 4px; 26 + margin: 4px 0; 27 + opacity: 0.95; 28 + } 29 + .notif.warn { background: #ed6c02 } 30 + .notif.error { background: #d32f2f } 14 31 </style> 15 32 </head> 16 33 <body> ··· 21 38 <main id="main"> 22 39 {{template "content" .}} 23 40 </main> 41 + <div id="notifications"></div> 42 + <script> 43 + document.body.addEventListener("htmx:oobBeforeSwap", (evt) => { 44 + evt.detail.fragment.querySelectorAll(".notif").forEach((el) => { 45 + console.debug("set timeout to notif element", el) 46 + setTimeout(() => { 47 + console.debug("clearing notif element", el); 48 + el.remove(); 49 + }, 10 * 1000); 50 + }); 51 + }); 52 + </script> 24 53 </body> 25 54 </html> 26 55 {{end}}
+15
knotmirror/templates/repos.html
··· 41 41 <th>Retry</th> 42 42 <th>Retry After</th> 43 43 <th>Error Message</th> 44 + <th>Action</th> 44 45 </tr> 45 46 </thead> 46 47 <tbody> ··· 53 54 <td>{{.RetryCount}}</td> 54 55 <td>{{readt .RetryAfter}}</td> 55 56 <td>{{.ErrorMsg}}</td> 57 + <td> 58 + <form 59 + {{ if .State.IsResyncing -}} 60 + hx-post="/api/cancelRepoResync" 61 + {{- else -}} 62 + hx-post="/api/triggerRepoResync" 63 + {{- end }} 64 + hx-swap="none" 65 + hx-disabled-elt="find button" 66 + > 67 + <input type="hidden" name="repo" value="{{.AtUri}}"> 68 + <button type="submit">{{ if .State.IsResyncing }}cancel{{ else }}resync{{ end }}</button> 69 + </form> 70 + </td> 56 71 </tr> 57 72 {{else}} 58 73 <tr><td colspan="99">No repositories found.</td></tr>