A vibe coded tangled fork which supports pijul.
at dda7e5c4760ecd9809464b15499aa4c2bfc41d72 88 lines 2.1 kB view raw
1package knotstream 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log/slog" 8 "time" 9 10 "tangled.org/core/knotmirror/config" 11 "tangled.org/core/knotmirror/db" 12 "tangled.org/core/knotmirror/models" 13 "tangled.org/core/log" 14) 15 16type KnotStream struct { 17 logger *slog.Logger 18 db *sql.DB 19 slurper *KnotSlurper 20} 21 22func NewKnotStream(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotStream { 23 l = log.SubLogger(l, "knotstream") 24 return &KnotStream{ 25 logger: l, 26 db: db, 27 slurper: NewKnotSlurper(l, db, cfg.Slurper), 28 } 29} 30 31func (s *KnotStream) Start(ctx context.Context) { 32 go s.slurper.Run(ctx) 33} 34 35func (s *KnotStream) Shutdown(ctx context.Context) error { 36 return s.slurper.Shutdown(ctx) 37} 38 39func (s *KnotStream) CheckIfSubscribed(hostname string) bool { 40 return s.slurper.CheckIfSubscribed(hostname) 41} 42 43func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error { 44 l := s.logger.With("hostname", hostname, "nossl", noSSL) 45 l.Debug("subscribe") 46 host, err := db.GetHost(ctx, s.db, hostname) 47 if err != nil { 48 return fmt.Errorf("loading host from db: %w", err) 49 } 50 51 if host == nil { 52 host = &models.Host{ 53 Hostname: hostname, 54 NoSSL: noSSL, 55 Status: models.HostStatusActive, 56 LastSeq: 0, 57 } 58 59 if err := db.UpsertHost(ctx, s.db, host); err != nil { 60 return fmt.Errorf("adding host to db: %w", err) 61 } 62 63 l.Info("adding new host subscription") 64 } 65 66 if host.Status == models.HostStatusBanned { 67 return fmt.Errorf("cannot subscribe to banned knot") 68 } 69 return s.slurper.Subscribe(ctx, *host) 70} 71 72func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error { 73 hosts, err := db.ListHosts(ctx, s.db, models.HostStatusActive) 74 if err != nil { 75 return fmt.Errorf("listing hosts: %w", err) 76 } 77 78 for _, host := range hosts { 79 l := s.logger.With("hostname", host.Hostname) 80 l.Info("re-subscribing to active host") 81 if err := s.slurper.Subscribe(ctx, host); err != nil { 82 l.Warn("failed to re-subscribe to host", "err", err) 83 } 84 // sleep for a very short period, so we don't open tons of sockets at the same time 85 time.Sleep(1 * time.Millisecond) 86 } 87 return nil 88}