A vibe coded tangled fork which supports pijul.
1package knotstream
2
3import (
4 "context"
5 "log/slog"
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "tangled.org/core/log"
11)
12
13type ParallelScheduler struct {
14 concurrency int
15
16 do func(ctx context.Context, task *Task) error
17
18 feeder chan *Task
19 lk sync.Mutex
20 scheduled map[string][]*Task
21 lastSeq atomic.Int64
22
23 logger *slog.Logger
24}
25
26type Task struct {
27 key string
28 message []byte
29}
30
31func NewParallelScheduler(maxC int, ident string, do func(context.Context, *Task) error) *ParallelScheduler {
32 return &ParallelScheduler{
33 concurrency: maxC,
34 do: do,
35 feeder: make(chan *Task),
36 scheduled: make(map[string][]*Task),
37 logger: log.New("parallel-scheduler"),
38 }
39}
40
41func (s *ParallelScheduler) Start(ctx context.Context) {
42 for range s.concurrency {
43 go s.ForEach(ctx, s.do)
44 }
45}
46
47func (s *ParallelScheduler) AddTask(ctx context.Context, task *Task) {
48 s.lk.Lock()
49 if st, ok := s.scheduled[task.key]; ok {
50 // schedule task
51 s.scheduled[task.key] = append(st, task)
52 s.lk.Unlock()
53 return
54 }
55 s.scheduled[task.key] = []*Task{}
56 s.lk.Unlock()
57
58 select {
59 case <-ctx.Done():
60 return
61 case s.feeder <- task:
62 return
63 }
64}
65
66func (s *ParallelScheduler) ForEach(ctx context.Context, fn func(context.Context, *Task) error) {
67 for task := range s.feeder {
68 for task != nil {
69 select {
70 case <-ctx.Done():
71 return
72 default:
73 }
74 if err := fn(ctx, task); err != nil {
75 s.logger.Error("event handler failed", "err", err)
76 }
77
78 s.lk.Lock()
79 func() {
80 rem, ok := s.scheduled[task.key]
81 if !ok {
82 s.logger.Error("should always have an 'active' entry if a worker is processing a job")
83 }
84 if len(rem) == 0 {
85 delete(s.scheduled, task.key)
86 task = nil
87 } else {
88 task = rem[0]
89 s.scheduled[task.key] = rem[1:]
90 }
91
92 // TODO: update seq from received message
93 s.lastSeq.Store(time.Now().UnixNano())
94 }()
95 s.lk.Unlock()
96 }
97 }
98}
99
100func (s *ParallelScheduler) LastSeq() int64 {
101 return s.lastSeq.Load()
102}