A vibe coded tangled fork which supports pijul.
1package state
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "slices"
9 "time"
10
11 "tangled.org/core/appview/notify"
12
13 "tangled.org/core/api/tangled"
14 "tangled.org/core/appview/cache"
15 "tangled.org/core/appview/config"
16 "tangled.org/core/appview/db"
17 "tangled.org/core/appview/models"
18 ec "tangled.org/core/eventconsumer"
19 "tangled.org/core/eventconsumer/cursor"
20 "tangled.org/core/log"
21 "tangled.org/core/orm"
22 "tangled.org/core/rbac"
23
24 "github.com/go-git/go-git/v5/plumbing"
25 "github.com/posthog/posthog-go"
26)
27
28func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) {
29 logger := log.FromContext(ctx)
30 logger = log.SubLogger(logger, "knotstream")
31
32 knots, err := db.GetRegistrations(
33 d,
34 orm.FilterIsNot("registered", "null"),
35 )
36 if err != nil {
37 return nil, err
38 }
39
40 srcs := make(map[ec.Source]struct{})
41 for _, k := range knots {
42 s := ec.NewKnotSource(k.Domain)
43 srcs[s] = struct{}{}
44 }
45
46 cache := cache.New(c.Redis.Addr)
47 cursorStore := cursor.NewRedisCursorStore(cache)
48
49 cfg := ec.ConsumerConfig{
50 Sources: srcs,
51 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev),
52 RetryInterval: c.Knotstream.RetryInterval,
53 MaxRetryInterval: c.Knotstream.MaxRetryInterval,
54 ConnectionTimeout: c.Knotstream.ConnectionTimeout,
55 WorkerCount: c.Knotstream.WorkerCount,
56 QueueSize: c.Knotstream.QueueSize,
57 Logger: logger,
58 Dev: c.Core.Dev,
59 CursorStore: &cursorStore,
60 }
61
62 return ec.NewConsumer(cfg), nil
63}
64
65func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc {
66 return func(ctx context.Context, source ec.Source, msg ec.Message) error {
67 switch msg.Nsid {
68 case tangled.GitRefUpdateNSID:
69 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx)
70 }
71
72 return nil
73 }
74}
75
76func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error {
77 logger := log.FromContext(ctx)
78
79 var record tangled.GitRefUpdate
80 err := json.Unmarshal(msg.EventJson, &record)
81 if err != nil {
82 return err
83 }
84
85 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid)
86 if err != nil {
87 return err
88 }
89 if !slices.Contains(knownKnots, source.Key()) {
90 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key())
91 }
92
93 logger.Info("processing gitRefUpdate event",
94 "repo_did", record.RepoDid,
95 "repo_name", record.RepoName,
96 "ref", record.Ref,
97 "old_sha", record.OldSha,
98 "new_sha", record.NewSha)
99
100 // trigger webhook notifications first (before other ops that might fail)
101 var errWebhook error
102 repos, err := db.GetRepos(
103 d,
104 0,
105 orm.FilterEq("did", record.RepoDid),
106 orm.FilterEq("name", record.RepoName),
107 )
108 if err != nil {
109 errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err)
110 } else if len(repos) == 1 {
111 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid)
112 } else if len(repos) == 0 {
113 errWebhook = fmt.Errorf("no repo found for webhooks: %s/%s", record.RepoDid, record.RepoName)
114 }
115
116 errPunchcard := populatePunchcard(d, record)
117 errLanguages := updateRepoLanguages(d, record)
118
119 var errPosthog error
120 if !dev && record.CommitterDid != "" {
121 errPosthog = pc.Enqueue(posthog.Capture{
122 DistinctId: record.CommitterDid,
123 Event: "git_ref_update",
124 })
125 }
126
127 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog)
128}
129
130func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error {
131 if record.CommitterDid == "" {
132 return nil
133 }
134
135 knownEmails, err := db.GetAllEmails(d, record.CommitterDid)
136 if err != nil {
137 return err
138 }
139
140 count := 0
141 for _, ke := range knownEmails {
142 if record.Meta == nil {
143 continue
144 }
145 if record.Meta.CommitCount == nil {
146 continue
147 }
148 for _, ce := range record.Meta.CommitCount.ByEmail {
149 if ce == nil {
150 continue
151 }
152 if ce.Email == ke.Address || ce.Email == record.CommitterDid {
153 count += int(ce.Count)
154 }
155 }
156 }
157
158 punch := models.Punch{
159 Did: record.CommitterDid,
160 Date: time.Now(),
161 Count: count,
162 }
163 return db.AddPunch(d, punch)
164}
165
166func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error {
167 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil {
168 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName)
169 }
170
171 repos, err := db.GetRepos(
172 d,
173 0,
174 orm.FilterEq("did", record.RepoDid),
175 orm.FilterEq("name", record.RepoName),
176 )
177 if err != nil {
178 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err)
179 }
180 if len(repos) != 1 {
181 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos))
182 }
183 repo := repos[0]
184
185 ref := plumbing.ReferenceName(record.Ref)
186 if !ref.IsBranch() {
187 return fmt.Errorf("%s is not a valid reference name", ref)
188 }
189
190 var langs []models.RepoLanguage
191 for _, l := range record.Meta.LangBreakdown.Inputs {
192 if l == nil {
193 continue
194 }
195
196 langs = append(langs, models.RepoLanguage{
197 RepoAt: repo.RepoAt(),
198 Ref: ref.Short(),
199 IsDefaultRef: record.Meta.IsDefaultRef,
200 Language: l.Lang,
201 Bytes: l.Size,
202 })
203 }
204
205 tx, err := d.Begin()
206 if err != nil {
207 return err
208 }
209 defer tx.Rollback()
210
211 // update appview's cache
212 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs)
213 if err != nil {
214 fmt.Printf("failed; %s\n", err)
215 // non-fatal
216 }
217
218 return tx.Commit()
219}