A vibe coded tangled fork which supports pijul.
1package knotstream
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "tangled.org/core/knotmirror/config"
11 "tangled.org/core/knotmirror/db"
12 "tangled.org/core/knotmirror/models"
13 "tangled.org/core/log"
14)
15
16type KnotStream struct {
17 logger *slog.Logger
18 db *sql.DB
19 slurper *KnotSlurper
20}
21
22func NewKnotStream(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotStream {
23 l = log.SubLogger(l, "knotstream")
24 return &KnotStream{
25 logger: l,
26 db: db,
27 slurper: NewKnotSlurper(l, db, cfg.Slurper),
28 }
29}
30
31func (s *KnotStream) Start(ctx context.Context) {
32 go s.slurper.Run(ctx)
33}
34
35func (s *KnotStream) Shutdown(ctx context.Context) error {
36 return s.slurper.Shutdown(ctx)
37}
38
39func (s *KnotStream) CheckIfSubscribed(hostname string) bool {
40 return s.slurper.CheckIfSubscribed(hostname)
41}
42
43func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error {
44 l := s.logger.With("hostname", hostname, "nossl", noSSL)
45 l.Debug("subscribe")
46 host, err := db.GetHost(ctx, s.db, hostname)
47 if err != nil {
48 return fmt.Errorf("loading host from db: %w", err)
49 }
50
51 if host == nil {
52 host = &models.Host{
53 Hostname: hostname,
54 NoSSL: noSSL,
55 Status: models.HostStatusActive,
56 LastSeq: 0,
57 }
58
59 if err := db.UpsertHost(ctx, s.db, host); err != nil {
60 return fmt.Errorf("adding host to db: %w", err)
61 }
62
63 l.Info("adding new host subscription")
64 }
65
66 if host.Status == models.HostStatusBanned {
67 return fmt.Errorf("cannot subscribe to banned knot")
68 }
69 return s.slurper.Subscribe(ctx, *host)
70}
71
72func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error {
73 hosts, err := db.ListHosts(ctx, s.db, models.HostStatusActive)
74 if err != nil {
75 return fmt.Errorf("listing hosts: %w", err)
76 }
77
78 for _, host := range hosts {
79 l := s.logger.With("hostname", host.Hostname)
80 l.Info("re-subscribing to active host")
81 if err := s.slurper.Subscribe(ctx, host); err != nil {
82 l.Warn("failed to re-subscribe to host", "err", err)
83 }
84 // sleep for a very short period, so we don't open tons of sockets at the same time
85 time.Sleep(1 * time.Millisecond)
86 }
87 return nil
88}