A vibe coded tangled fork which supports pijul.
at e620e86a00c541e18e9e8097f97e51962631c5cd 89 lines 2.2 kB view raw
1package knotstream 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "tangled.org/core/knotmirror/config" 11 "tangled.org/core/knotmirror/db" 12 "tangled.org/core/knotmirror/models" 13 "tangled.org/core/log" 14 "tangled.org/core/orm" 15) 16 17type KnotStream struct { 18 logger *slog.Logger 19 db *sql.DB 20 slurper *KnotSlurper 21} 22 23func NewKnotStream(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotStream { 24 l = log.SubLogger(l, "knotstream") 25 return &KnotStream{ 26 logger: l, 27 db: db, 28 slurper: NewKnotSlurper(l, db, cfg.Slurper), 29 } 30} 31 32func (s *KnotStream) Start(ctx context.Context) { 33 go s.slurper.Run(ctx) 34} 35 36func (s *KnotStream) Shutdown(ctx context.Context) error { 37 return s.slurper.Shutdown(ctx) 38} 39 40func (s *KnotStream) CheckIfSubscribed(hostname string) bool { 41 return s.slurper.CheckIfSubscribed(hostname) 42} 43 44func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error { 45 l := s.logger.With("hostname", hostname, "nossl", noSSL) 46 l.Debug("subscribe") 47 host, err := db.GetHost(ctx, s.db, hostname) 48 if err != nil { 49 return fmt.Errorf("loading host from db: %w", err) 50 } 51 52 if host == nil { 53 host = &models.Host{ 54 Hostname: hostname, 55 NoSSL: noSSL, 56 Status: models.HostStatusActive, 57 LastSeq: 0, 58 } 59 60 if err := db.UpsertHost(ctx, s.db, host); err != nil { 61 return fmt.Errorf("adding host to db: %w", err) 62 } 63 64 l.Info("adding new host subscription") 65 } 66 67 if host.Status == models.HostStatusBanned { 68 return fmt.Errorf("cannot subscribe to banned knot") 69 } 70 return s.slurper.Subscribe(ctx, *host) 71} 72 73func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error { 74 hosts, err := db.ListHosts(ctx, s.db, orm.FilterEq("status", "active")) 75 if err != nil { 76 return fmt.Errorf("listing hosts: %w", err) 77 } 78 79 for _, host := range hosts { 80 l := s.logger.With("hostname", host.Hostname) 81 l.Info("re-subscribing to active host") 82 if err := s.slurper.Subscribe(ctx, host); err != nil { 83 l.Warn("failed to re-subscribe to host", "err", err) 84 } 85 // sleep for a very short period, so we don't open tons of sockets at the same time 86 time.Sleep(1 * time.Millisecond) 87 } 88 return nil 89}