A vibe coded tangled fork which supports pijul.
at 496ecb95e21f8a6971876535b6a2f505a3208890 219 lines 5.7 kB view raw
1package state 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "slices" 9 "time" 10 11 "tangled.org/core/appview/notify" 12 13 "tangled.org/core/api/tangled" 14 "tangled.org/core/appview/cache" 15 "tangled.org/core/appview/config" 16 "tangled.org/core/appview/db" 17 "tangled.org/core/appview/models" 18 ec "tangled.org/core/eventconsumer" 19 "tangled.org/core/eventconsumer/cursor" 20 "tangled.org/core/log" 21 "tangled.org/core/orm" 22 "tangled.org/core/rbac" 23 24 "github.com/go-git/go-git/v5/plumbing" 25 "github.com/posthog/posthog-go" 26) 27 28func Knotstream(ctx context.Context, c *config.Config, d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier) (*ec.Consumer, error) { 29 logger := log.FromContext(ctx) 30 logger = log.SubLogger(logger, "knotstream") 31 32 knots, err := db.GetRegistrations( 33 d, 34 orm.FilterIsNot("registered", "null"), 35 ) 36 if err != nil { 37 return nil, err 38 } 39 40 srcs := make(map[ec.Source]struct{}) 41 for _, k := range knots { 42 s := ec.NewKnotSource(k.Domain) 43 srcs[s] = struct{}{} 44 } 45 46 cache := cache.New(c.Redis.Addr) 47 cursorStore := cursor.NewRedisCursorStore(cache) 48 49 cfg := ec.ConsumerConfig{ 50 Sources: srcs, 51 ProcessFunc: knotIngester(d, enforcer, posthog, notifier, c.Core.Dev), 52 RetryInterval: c.Knotstream.RetryInterval, 53 MaxRetryInterval: c.Knotstream.MaxRetryInterval, 54 ConnectionTimeout: c.Knotstream.ConnectionTimeout, 55 WorkerCount: c.Knotstream.WorkerCount, 56 QueueSize: c.Knotstream.QueueSize, 57 Logger: logger, 58 Dev: c.Core.Dev, 59 CursorStore: &cursorStore, 60 } 61 62 return ec.NewConsumer(cfg), nil 63} 64 65func knotIngester(d *db.DB, enforcer *rbac.Enforcer, posthog posthog.Client, notifier notify.Notifier, dev bool) ec.ProcessFunc { 66 return func(ctx context.Context, source ec.Source, msg ec.Message) error { 67 switch msg.Nsid { 68 case tangled.GitRefUpdateNSID: 69 return ingestRefUpdate(d, enforcer, posthog, notifier, dev, source, msg, ctx) 70 } 71 72 return nil 73 } 74} 75 76func ingestRefUpdate(d *db.DB, enforcer *rbac.Enforcer, pc posthog.Client, notifier notify.Notifier, dev bool, source ec.Source, msg ec.Message, ctx context.Context) error { 77 logger := log.FromContext(ctx) 78 79 var record tangled.GitRefUpdate 80 err := json.Unmarshal(msg.EventJson, &record) 81 if err != nil { 82 return err 83 } 84 85 knownKnots, err := enforcer.GetKnotsForUser(record.CommitterDid) 86 if err != nil { 87 return err 88 } 89 if !slices.Contains(knownKnots, source.Key()) { 90 return fmt.Errorf("%s does not belong to %s, something is fishy", record.CommitterDid, source.Key()) 91 } 92 93 logger.Info("processing gitRefUpdate event", 94 "repo_did", record.RepoDid, 95 "repo_name", record.RepoName, 96 "ref", record.Ref, 97 "old_sha", record.OldSha, 98 "new_sha", record.NewSha) 99 100 // trigger webhook notifications first (before other ops that might fail) 101 var errWebhook error 102 repos, err := db.GetRepos( 103 d, 104 0, 105 orm.FilterEq("did", record.RepoDid), 106 orm.FilterEq("name", record.RepoName), 107 ) 108 if err != nil { 109 errWebhook = fmt.Errorf("failed to lookup repo for webhooks: %w", err) 110 } else if len(repos) == 1 { 111 notifier.Push(ctx, &repos[0], record.Ref, record.OldSha, record.NewSha, record.CommitterDid) 112 } else if len(repos) == 0 { 113 errWebhook = fmt.Errorf("no repo found for webhooks: %s/%s", record.RepoDid, record.RepoName) 114 } 115 116 errPunchcard := populatePunchcard(d, record) 117 errLanguages := updateRepoLanguages(d, record) 118 119 var errPosthog error 120 if !dev && record.CommitterDid != "" { 121 errPosthog = pc.Enqueue(posthog.Capture{ 122 DistinctId: record.CommitterDid, 123 Event: "git_ref_update", 124 }) 125 } 126 127 return errors.Join(errWebhook, errPunchcard, errLanguages, errPosthog) 128} 129 130func populatePunchcard(d *db.DB, record tangled.GitRefUpdate) error { 131 if record.CommitterDid == "" { 132 return nil 133 } 134 135 knownEmails, err := db.GetAllEmails(d, record.CommitterDid) 136 if err != nil { 137 return err 138 } 139 140 count := 0 141 for _, ke := range knownEmails { 142 if record.Meta == nil { 143 continue 144 } 145 if record.Meta.CommitCount == nil { 146 continue 147 } 148 for _, ce := range record.Meta.CommitCount.ByEmail { 149 if ce == nil { 150 continue 151 } 152 if ce.Email == ke.Address || ce.Email == record.CommitterDid { 153 count += int(ce.Count) 154 } 155 } 156 } 157 158 punch := models.Punch{ 159 Did: record.CommitterDid, 160 Date: time.Now(), 161 Count: count, 162 } 163 return db.AddPunch(d, punch) 164} 165 166func updateRepoLanguages(d *db.DB, record tangled.GitRefUpdate) error { 167 if record.Meta == nil || record.Meta.LangBreakdown == nil || record.Meta.LangBreakdown.Inputs == nil { 168 return fmt.Errorf("empty language data for repo: %s/%s", record.RepoDid, record.RepoName) 169 } 170 171 repos, err := db.GetRepos( 172 d, 173 0, 174 orm.FilterEq("did", record.RepoDid), 175 orm.FilterEq("name", record.RepoName), 176 ) 177 if err != nil { 178 return fmt.Errorf("failed to look for repo in DB (%s/%s): %w", record.RepoDid, record.RepoName, err) 179 } 180 if len(repos) != 1 { 181 return fmt.Errorf("incorrect number of repos returned: %d (expected 1)", len(repos)) 182 } 183 repo := repos[0] 184 185 ref := plumbing.ReferenceName(record.Ref) 186 if !ref.IsBranch() { 187 return fmt.Errorf("%s is not a valid reference name", ref) 188 } 189 190 var langs []models.RepoLanguage 191 for _, l := range record.Meta.LangBreakdown.Inputs { 192 if l == nil { 193 continue 194 } 195 196 langs = append(langs, models.RepoLanguage{ 197 RepoAt: repo.RepoAt(), 198 Ref: ref.Short(), 199 IsDefaultRef: record.Meta.IsDefaultRef, 200 Language: l.Lang, 201 Bytes: l.Size, 202 }) 203 } 204 205 tx, err := d.Begin() 206 if err != nil { 207 return err 208 } 209 defer tx.Rollback() 210 211 // update appview's cache 212 err = db.UpdateRepoLanguages(tx, repo.RepoAt(), ref.Short(), langs) 213 if err != nil { 214 fmt.Printf("failed; %s\n", err) 215 // non-fatal 216 } 217 218 return tx.Commit() 219}