A vibe coded tangled fork which supports pijul.
1package knotstream
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log/slog"
8 "time"
9
10 "tangled.org/core/knotmirror/config"
11 "tangled.org/core/knotmirror/db"
12 "tangled.org/core/knotmirror/models"
13 "tangled.org/core/log"
14 "tangled.org/core/orm"
15)
16
17type KnotStream struct {
18 logger *slog.Logger
19 db *sql.DB
20 slurper *KnotSlurper
21}
22
23func NewKnotStream(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotStream {
24 l = log.SubLogger(l, "knotstream")
25 return &KnotStream{
26 logger: l,
27 db: db,
28 slurper: NewKnotSlurper(l, db, cfg.Slurper),
29 }
30}
31
32func (s *KnotStream) Start(ctx context.Context) {
33 go s.slurper.Run(ctx)
34}
35
36func (s *KnotStream) Shutdown(ctx context.Context) error {
37 return s.slurper.Shutdown(ctx)
38}
39
40func (s *KnotStream) CheckIfSubscribed(hostname string) bool {
41 return s.slurper.CheckIfSubscribed(hostname)
42}
43
44func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error {
45 l := s.logger.With("hostname", hostname, "nossl", noSSL)
46 l.Debug("subscribe")
47 host, err := db.GetHost(ctx, s.db, hostname)
48 if err != nil {
49 return fmt.Errorf("loading host from db: %w", err)
50 }
51
52 if host == nil {
53 host = &models.Host{
54 Hostname: hostname,
55 NoSSL: noSSL,
56 Status: models.HostStatusActive,
57 LastSeq: 0,
58 }
59
60 if err := db.UpsertHost(ctx, s.db, host); err != nil {
61 return fmt.Errorf("adding host to db: %w", err)
62 }
63
64 l.Info("adding new host subscription")
65 }
66
67 if host.Status == models.HostStatusBanned {
68 return fmt.Errorf("cannot subscribe to banned knot")
69 }
70 return s.slurper.Subscribe(ctx, *host)
71}
72
73func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error {
74 hosts, err := db.ListHosts(ctx, s.db, orm.FilterEq("status", "active"))
75 if err != nil {
76 return fmt.Errorf("listing hosts: %w", err)
77 }
78
79 for _, host := range hosts {
80 l := s.logger.With("hostname", host.Hostname)
81 l.Info("re-subscribing to active host")
82 if err := s.slurper.Subscribe(ctx, host); err != nil {
83 l.Warn("failed to re-subscribe to host", "err", err)
84 }
85 // sleep for a very short period, so we don't open tons of sockets at the same time
86 time.Sleep(1 * time.Millisecond)
87 }
88 return nil
89}