A vibe coded tangled fork which supports pijul.
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.org/core/appview/cloudflare"
12 "tangled.org/core/appview/notify"
13
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/appview/cache"
16 "tangled.org/core/appview/config"
17 "tangled.org/core/appview/db"
18 "tangled.org/core/appview/models"
19 "tangled.org/core/appview/sites"
20 ec "tangled.org/core/eventconsumer"
21 "tangled.org/core/eventconsumer/cursor"
22 "tangled.org/core/log"
23 "tangled.org/core/orm"
24 "tangled.org/core/rbac"
25
26 "github.com/go-git/go-git/v5/plumbing"
27 "github.com/posthog/posthog-go"
28)
29
30func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, cfClient *cloudflare.Client) (*ec.Consumer, error) {
31 logger := log.FromContext(ctx)
32 logger = log.SubLogger(logger, "knotstream")
33
34 knots, err := db.GetRegistrations(
35 d,
36 orm.FilterIsNot("registered", "null"),
37 )
38 if err != nil {
39 return nil, err
40 }
41
42 srcs := make(map[ec.Source]struct{})
43 for _, k := range knots {
44 s := ec.NewKnotSource(k.Domain)
45 srcs[s] = struct{}{}
46 }
47
48 cache := cache.New(c.Redis.Addr)
49 cursorStore := cursor.NewRedisCursorStore(cache)
50
51 cfg := ec.ConsumerConfig{
52 Sources: srcs,
53 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev, c, cfClient),
54 RetryInterval: c.Knotstream.RetryInterval,
55 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
56 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
57 WorkerCount: c.Knotstream.WorkerCount,
58 QueueSize: c.Knotstream.QueueSize,
59 Logger: logger,
60 Dev: c.Core.Dev,
61 CursorStore: &cursorStore,
62 }
63
64 return ec.NewConsumer(cfg), nil
65}
66
67func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client) ec.ProcessFunc {
68 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
69 switch msg.Nsid {
70 case tangled.GitRefUpdateNSID:
71 return ingestRefUpdate(ctx, d, enforcer, posthog, notifier, dev, c, cfClient, source, msg)
72 }
73
74 return nil
75 }
76}
77
78func ingestRefUpdate(ctx context.Context, d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, c *config.Config, cfClient *cloudflare.Client, source ec.Source, msg ec.Message) error {
79 logger := log.FromContext(ctx)
80
81 var record tangled.GitRefUpdate
82 err := json.Unmarshal(msg.EventJson, &record)
83 if err != nil {
84 return err
85 }
86
87 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
88 if err != nil {
89 return err
90 }
91 if !slices.Contains(knownKnots, source.Key()) {
92 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
93 }
94
95 logger.Info("processing gitRefUpdate event",
96 "repo_did", record.RepoDid,
97 "repo_name", record.RepoName,
98 "ref", record.Ref,
99 "old_sha", record.OldSha,
100 "new_sha", record.NewSha)
101
102 // trigger webhook notifications first (before other ops that might fail)
103 var errWebhook error
104 repos, err := db.GetRepos(
105 d,
106 0,
107 orm.FilterEq("did", record.RepoDid),
108 orm.FilterEq("name", record.RepoName),
109 )
110 if err != nil {
111 errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err)
112 } else if len(repos) == 1 {
113 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
114 }
115
116 errPunchcard := populatePunchcard(d, record)
117 errLanguages := updateRepoLanguages(d, record)
118
119 var errPosthog error
120 if !dev && record.CommitterDid != "" {
121 errPosthog = pc.Enqueue(posthog.Capture{
122 DistinctId: record.CommitterDid,
123 Event: "git_ref_update",
124 })
125 }
126
127 // Trigger a sites redeploy if this push is to the configured sites branch.
128 if cfClient.Enabled() {
129 go triggerSitesDeployIfNeeded(ctx, d, cfClient, c, record, source)
130 }
131
132 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog)
133}
134
135// triggerSitesDeployIfNeeded checks whether the pushed ref matches the sites
136// branch configured for this repo and, if so, syncs the site to R2
137func triggerSitesDeployIfNeeded(ctx context.Context, d *db.DB, cfClient *cloudflare.Client, c *config.Config, record tangled.GitRefUpdate, source ec.Source) {
138 logger := log.FromContext(ctx)
139
140 ref := plumbing.ReferenceName(record.Ref)
141 if !ref.IsBranch() {
142 return
143 }
144 pushedBranch := ref.Short()
145
146 repos, err := db.GetRepos(
147 d,
148 0,
149 orm.FilterEq("did", record.RepoDid),
150 orm.FilterEq("name", record.RepoName),
151 )
152 if err != nil || len(repos) != 1 {
153 return
154 }
155 repo := repos[0]
156
157 siteConfig, err := db.GetRepoSiteConfig(d, repo.RepoAt().String())
158 if err != nil || siteConfig == nil {
159 return
160 }
161 if siteConfig.Branch != pushedBranch {
162 return
163 }
164
165 scheme := "https"
166 if c.Core.Dev {
167 scheme = "http"
168 }
169 knotHost := fmt.Sprintf("%s://%s", scheme, source.Key())
170
171 deploy := &models.SiteDeploy{
172 RepoAt: repo.RepoAt().String(),
173 Branch: siteConfig.Branch,
174 Dir: siteConfig.Dir,
175 CommitSHA: record.NewSha,
176 Trigger: models.SiteDeployTriggerPush,
177 }
178
179 deployErr := sites.Deploy(ctx, cfClient, knotHost, record.RepoDid, record.RepoName, siteConfig.Branch, siteConfig.Dir)
180 if deployErr != nil {
181 logger.Error("sites: R2 sync failed on push", "repo", record.RepoDid+"/"+record.RepoName, "err", deployErr)
182 deploy.Status = models.SiteDeployStatusFailure
183 deploy.Error = deployErr.Error()
184 } else {
185 deploy.Status = models.SiteDeployStatusSuccess
186 }
187
188 if err := db.AddSiteDeploy(d, deploy); err != nil {
189 logger.Error("sites: failed to record deploy", "repo", record.RepoDid+"/"+record.RepoName, "err", err)
190 }
191
192 if deployErr == nil {
193 logger.Info("site deployed to r2", "repo", record.RepoDid+"/"+record.RepoName)
194 }
195}
196
197func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
198 if record.CommitterDid == "" {
199 return nil
200 }
201
202 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
203 if err != nil {
204 return err
205 }
206
207 count := 0
208 for _, ke := range knownEmails {
209 if record.Meta == nil {
210 continue
211 }
212 if record.Meta.CommitCount == nil {
213 continue
214 }
215 for _, ce := range record.Meta.CommitCount.ByEmail {
216 if ce == nil {
217 continue
218 }
219 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
220 count += int(ce.Count)
221 }
222 }
223 }
224
225 punch := models.Punch{
226 Did: record.CommitterDid,
227 Date: time.Now(),
228 Count: count,
229 }
230 return db.AddPunch(d, punch)
231}
232
233func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
234 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
235 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
236 }
237
238 repos, err := db.GetRepos(
239 d,
240 0,
241 orm.FilterEq("did", record.RepoDid),
242 orm.FilterEq("name", record.RepoName),
243 )
244 if err != nil {
245 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
246 }
247 if len(repos) != 1 {
248 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
249 }
250 repo := repos[0]
251
252 ref := plumbing.ReferenceName(record.Ref)
253 if !ref.IsBranch() {
254 return fmt.Errorf("%s is not a valid reference name", ref)
255 }
256
257 var langs []models.RepoLanguage
258 for _, l := range record.Meta.LangBreakdown.Inputs {
259 if l == nil {
260 continue
261 }
262
263 langs = append(langs, models.RepoLanguage{
264 RepoAt: repo.RepoAt(),
265 Ref: ref.Short(),
266 IsDefaultRef: record.Meta.IsDefaultRef,
267 Language: l.Lang,
268 Bytes: l.Size,
269 })
270 }
271
272 tx, err := d.Begin()
273 if err != nil {
274 return err
275 }
276 defer tx.Rollback()
277
278 // update appview's cache
279 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
280 if err != nil {
281 fmt.Printf("failed; %s\n", err)
282 // non-fatal
283 }
284
285 return tx.Commit()
286}