A vibe coded tangled fork which supports pijul.
at 1237bf9f58e4ba5d13d5437f2f82a2078572e229 102 lines 2.0 kB view raw
1package knotstream 2 3import ( 4 "context" 5 "log/slog" 6 "sync" 7 "sync/atomic" 8 "time" 9 10 "tangled.org/core/log" 11) 12 13type ParallelScheduler struct { 14 concurrency int 15 16 do func(ctx context.Context, task *Task) error 17 18 feeder chan *Task 19 lk sync.Mutex 20 scheduled map[string][]*Task 21 lastSeq atomic.Int64 22 23 logger *slog.Logger 24} 25 26type Task struct { 27 key string 28 message []byte 29} 30 31func NewParallelScheduler(maxC int, ident string, do func(context.Context, *Task) error) *ParallelScheduler { 32 return &ParallelScheduler{ 33 concurrency: maxC, 34 do: do, 35 feeder: make(chan *Task), 36 scheduled: make(map[string][]*Task), 37 logger: log.New("parallel-scheduler"), 38 } 39} 40 41func (s *ParallelScheduler) Start(ctx context.Context) { 42 for range s.concurrency { 43 go s.ForEach(ctx, s.do) 44 } 45} 46 47func (s *ParallelScheduler) AddTask(ctx context.Context, task *Task) { 48 s.lk.Lock() 49 if st, ok := s.scheduled[task.key]; ok { 50 // schedule task 51 s.scheduled[task.key] = append(st, task) 52 s.lk.Unlock() 53 return 54 } 55 s.scheduled[task.key] = []*Task{} 56 s.lk.Unlock() 57 58 select { 59 case <-ctx.Done(): 60 return 61 case s.feeder <- task: 62 return 63 } 64} 65 66func (s *ParallelScheduler) ForEach(ctx context.Context, fn func(context.Context, *Task) error) { 67 for task := range s.feeder { 68 for task != nil { 69 select { 70 case <-ctx.Done(): 71 return 72 default: 73 } 74 if err := fn(ctx, task); err != nil { 75 s.logger.Error("event handler failed", "err", err) 76 } 77 78 s.lk.Lock() 79 func() { 80 rem, ok := s.scheduled[task.key] 81 if !ok { 82 s.logger.Error("should always have an 'active' entry if a worker is processing a job") 83 } 84 if len(rem) == 0 { 85 delete(s.scheduled, task.key) 86 task = nil 87 } else { 88 task = rem[0] 89 s.scheduled[task.key] = rem[1:] 90 } 91 92 // TODO: update seq from received message 93 s.lastSeq.Store(time.Now().UnixNano()) 94 }() 95 s.lk.Unlock() 96 } 97 } 98} 99 100func (s *ParallelScheduler) LastSeq() int64 { 101 return s.lastSeq.Load() 102}