package knotmirror import ( "context" "database/sql" "encoding/json" "fmt" "log/slog" "net/netip" "net/url" "time" "tangled.org/core/api/tangled" "tangled.org/core/knotmirror/config" "tangled.org/core/knotmirror/db" "tangled.org/core/knotmirror/knotstream" "tangled.org/core/knotmirror/models" "tangled.org/core/log" "tangled.org/core/tapc" ) type Tap struct { logger *slog.Logger cfg *config.Config tap tapc.Client db *sql.DB gitm GitMirrorManager ks *knotstream.KnotStream } func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap { return &Tap{ logger: log.SubLogger(l, "tapclient"), cfg: cfg, tap: tapc.NewClient(cfg.TapUrl, ""), db: db, gitm: gitm, ks: ks, } } func (t *Tap) Start(ctx context.Context) { // TODO: better reconnect logic go func() { for { t.tap.Connect(ctx, &tapc.SimpleIndexer{ EventHandler: t.processEvent, }) time.Sleep(time.Second) } }() } func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { l := t.logger.With("component", "tapIndexer") var err error switch evt.Type { case tapc.EvtRecord: switch evt.Record.Collection.String() { case tangled.RepoNSID: err = t.processRepo(ctx, evt.Record) } } if err != nil { l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) return err } return nil } func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { switch evt.Action { case tapc.RecordCreateAction, tapc.RecordUpdateAction: record := tangled.Repo{} if err := json.Unmarshal(evt.Record, &record); err != nil { return fmt.Errorf("parsing record: %w", err) } status := models.RepoStatePending errMsg := "" u, err := url.Parse("http://" + record.Knot) // parsing with fake scheme if err != nil { status = models.RepoStateSuspended errMsg = "failed to parse knot url" } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) { status = models.RepoStateSuspended errMsg = "suspending non-public knot" } repo := &models.Repo{ Did: evt.Did, Rkey: evt.Rkey, Cid: evt.CID, Name: record.Name, KnotDomain: record.Knot, State: status, ErrorMsg: errMsg, RetryAfter: 0, // clear retry info RetryCount: 0, } if evt.Action == tapc.RecordUpdateAction { // update git repo remote url if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil { return fmt.Errorf("updating git repo remote url: %w", err) } } if err := db.UpsertRepo(ctx, t.db, repo); err != nil { return fmt.Errorf("upserting repo to db: %w", err) } if !t.ks.CheckIfSubscribed(record.Knot) { if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil { return fmt.Errorf("subscribing to knot: %w", err) } } case tapc.RecordDeleteAction: if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { return fmt.Errorf("deleting repo from db: %w", err) } } return nil } // isPrivate checks if host is private network. It doesn't perform DNS resolution func isPrivate(host string) bool { if host == "localhost" { return true } addr, err := netip.ParseAddr(host) if err != nil { return false } return isPrivateAddr(addr) } func isPrivateAddr(addr netip.Addr) bool { return addr.IsLoopback() || addr.IsPrivate() || addr.IsLinkLocalUnicast() || addr.IsLinkLocalMulticast() }