A vibe coded tangled fork which supports pijul.
at sl/spindle-rewrite 286 lines 7.8 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "slices" 9 "time" 10 11 "tangled.org/core/appview/cloudflare" 12 "tangled.org/core/appview/notify" 13 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/appview/cache" 16 "tangled.org/core/appview/config" 17 "tangled.org/core/appview/db" 18 "tangled.org/core/appview/models" 19 "tangled.org/core/appview/sites" 20 ec "tangled.org/core/eventconsumer" 21 "tangled.org/core/eventconsumer/cursor" 22 "tangled.org/core/log" 23 "tangled.org/core/orm" 24 "tangled.org/core/rbac" 25 26 "github.com/go-git/go-git/v5/plumbing" 27 "github.com/posthog/posthog-go" 28) 29 30func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) { 31 logger := log.FromContext(ctx) 32 logger = log.SubLogger(logger, "knotstream") 33 34 knots, err := db.GetRegistrations( 35 d, 36 orm.FilterIsNot("registered", "null"), 37 ) 38 if err != nil { 39 return nil, err 40 } 41 42 srcs := make(map[ec.Source]struct{}) 43 for _, k := range knots { 44 s := ec.NewKnotSource(k.Domain) 45 srcs[s] = struct{}{} 46 } 47 48 cache := cache.New(c.Redis.Addr) 49 cursorStore := cursor.NewRedisCursorStore(cache) 50 51 cfg := ec.ConsumerConfig{ 52 Sources: srcs, 53 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient), 54 RetryInterval: c.Knotstream.RetryInterval, 55 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 56 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 57 WorkerCount: c.Knotstream.WorkerCount, 58 QueueSize: c.Knotstream.QueueSize, 59 Logger: logger, 60 Dev: c.Core.Dev, 61 CursorStore: &cursorStore, 62 } 63 64 return ec.NewConsumer(cfg), nil 65} 66 67func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc { 68 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 69 switch msg.Nsid { 70 case tangled.GitRefUpdateNSID: 71 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg) 72 } 73 74 return nil 75 } 76} 77 78func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg ec.Message) error { 79 logger := log.FromContext(ctx) 80 81 var record tangled.GitRefUpdate 82 err := json.Unmarshal(msg.EventJson, &record) 83 if err != nil { 84 return err 85 } 86 87 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 88 if err != nil { 89 return err 90 } 91 if !slices.Contains(knownKnots, source.Key()) { 92 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 93 } 94 95 logger.Info("processing gitRefUpdate event", 96 "repo_did", record.RepoDid, 97 "repo_name", record.RepoName, 98 "ref", record.Ref, 99 "old_sha", record.OldSha, 100 "new_sha", record.NewSha) 101 102 // trigger webhook notifications first (before other ops that might fail) 103 var errWebhook error 104 repos, err := db.GetRepos( 105 d, 106 0, 107 orm.FilterEq("did", record.RepoDid), 108 orm.FilterEq("name", record.RepoName), 109 ) 110 if err != nil { 111 errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err) 112 } else if len(repos) == 1 { 113 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 114 } 115 116 errPunchcard := populatePunchcard(d, record) 117 errLanguages := updateRepoLanguages(d, record) 118 119 var errPosthog error 120 if !dev && record.CommitterDid != "" { 121 errPosthog = pc.Enqueue(posthog.Capture{ 122 DistinctId: record.CommitterDid, 123 Event: "git_ref_update", 124 }) 125 } 126 127 // Trigger a sites redeploy if this push is to the configured sites branch. 128 if cfClient.Enabled() { 129 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source) 130 } 131 132 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog) 133} 134 135// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites 136// branch configured for this repo and, if so, syncs the site to R2 137func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, c *config.Config, record tangled.GitRefUpdate, source ec.Source) { 138 logger := log.FromContext(ctx) 139 140 ref := plumbing.ReferenceName(record.Ref) 141 if !ref.IsBranch() { 142 return 143 } 144 pushedBranch := ref.Short() 145 146 repos, err := db.GetRepos( 147 d, 148 0, 149 orm.FilterEq("did", record.RepoDid), 150 orm.FilterEq("name", record.RepoName), 151 ) 152 if err != nil || len(repos) != 1 { 153 return 154 } 155 repo := repos[0] 156 157 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoAt().String()) 158 if err != nil || siteConfig == nil { 159 return 160 } 161 if siteConfig.Branch != pushedBranch { 162 return 163 } 164 165 scheme := "https" 166 if c.Core.Dev { 167 scheme = "http" 168 } 169 knotHost := fmt.Sprintf("%s://%s", scheme, source.Key()) 170 171 deploy := &models.SiteDeploy{ 172 RepoAt: repo.RepoAt().String(), 173 Branch: siteConfig.Branch, 174 Dir: siteConfig.Dir, 175 CommitSHA: record.NewSha, 176 Trigger: models.SiteDeployTriggerPush, 177 } 178 179 deployErr := sites.Deploy(ctx, cfClient, knotHost, record.RepoDid, record.RepoName, siteConfig.Branch, siteConfig.Dir) 180 if deployErr != nil { 181 logger.Error("sites: R2 sync failed on push", "repo", record.RepoDid+"/"+record.RepoName, "err", deployErr) 182 deploy.Status = models.SiteDeployStatusFailure 183 deploy.Error = deployErr.Error() 184 } else { 185 deploy.Status = models.SiteDeployStatusSuccess 186 } 187 188 if err := db.AddSiteDeploy(d, deploy); err != nil { 189 logger.Error("sites: failed to record deploy", "repo", record.RepoDid+"/"+record.RepoName, "err", err) 190 } 191 192 if deployErr == nil { 193 logger.Info("site deployed to r2", "repo", record.RepoDid+"/"+record.RepoName) 194 } 195} 196 197func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 198 if record.CommitterDid == "" { 199 return nil 200 } 201 202 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 203 if err != nil { 204 return err 205 } 206 207 count := 0 208 for _, ke := range knownEmails { 209 if record.Meta == nil { 210 continue 211 } 212 if record.Meta.CommitCount == nil { 213 continue 214 } 215 for _, ce := range record.Meta.CommitCount.ByEmail { 216 if ce == nil { 217 continue 218 } 219 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 220 count += int(ce.Count) 221 } 222 } 223 } 224 225 punch := models.Punch{ 226 Did: record.CommitterDid, 227 Date: time.Now(), 228 Count: count, 229 } 230 return db.AddPunch(d, punch) 231} 232 233func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 234 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 235 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 236 } 237 238 repos, err := db.GetRepos( 239 d, 240 0, 241 orm.FilterEq("did", record.RepoDid), 242 orm.FilterEq("name", record.RepoName), 243 ) 244 if err != nil { 245 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 246 } 247 if len(repos) != 1 { 248 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 249 } 250 repo := repos[0] 251 252 ref := plumbing.ReferenceName(record.Ref) 253 if !ref.IsBranch() { 254 return fmt.Errorf("%s is not a valid reference name", ref) 255 } 256 257 var langs []models.RepoLanguage 258 for _, l := range record.Meta.LangBreakdown.Inputs { 259 if l == nil { 260 continue 261 } 262 263 langs = append(langs, models.RepoLanguage{ 264 RepoAt: repo.RepoAt(), 265 Ref: ref.Short(), 266 IsDefaultRef: record.Meta.IsDefaultRef, 267 Language: l.Lang, 268 Bytes: l.Size, 269 }) 270 } 271 272 tx, err := d.Begin() 273 if err != nil { 274 return err 275 } 276 defer tx.Rollback() 277 278 // update appview's cache 279 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 280 if err != nil { 281 fmt.Printf("failed; %s\n", err) 282 // non-fatal 283 } 284 285 return tx.Commit() 286}