package ingestor import ( "context" "math/big" "time" "github.com/tech/sendico/fx/ingestor/internal/config" "github.com/tech/sendico/fx/ingestor/internal/market" mmodel "github.com/tech/sendico/fx/ingestor/internal/model" "github.com/tech/sendico/fx/storage" "github.com/tech/sendico/fx/storage/model" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) type Service struct { logger mlogger.Logger cfg *config.Config rates storage.RatesStore pairs []config.Pair connectors map[mmodel.Driver]mmodel.Connector metrics *serviceMetrics } func New(logger mlogger.Logger, cfg *config.Config, repo storage.Repository) (*Service, error) { if logger == nil { return nil, merrors.InvalidArgument("ingestor: nil logger") } if cfg == nil { return nil, merrors.InvalidArgument("ingestor: nil config") } if repo == nil { return nil, merrors.InvalidArgument("ingestor: nil repository") } connectors, err := market.BuildConnectors(logger, cfg.Market.Sources) if err != nil { return nil, merrors.InternalWrap(err, "build connectors") } return &Service{ logger: logger.Named("ingestor"), cfg: cfg, rates: repo.Rates(), pairs: cfg.Pairs(), connectors: connectors, metrics: getServiceMetrics(), }, nil } func (s *Service) Run(ctx context.Context) error { interval := s.cfg.PollInterval() ticker := time.NewTicker(interval) defer ticker.Stop() s.logger.Info("FX ingestion service started", zap.Duration("poll_interval", interval), zap.Int("pairs", len(s.pairs))) if err := s.executePoll(ctx); err != nil { s.logger.Warn("Initial poll completed with errors", zap.Error(err)) } for { select { case <-ctx.Done(): s.logger.Info("Context cancelled, stopping ingestor") return ctx.Err() case <-ticker.C: if err := s.executePoll(ctx); err != nil { s.logger.Warn("Polling cycle completed with errors", zap.Error(err)) } } } } func (s *Service) executePoll(ctx context.Context) error { start := time.Now() err := s.pollOnce(ctx) if s.metrics != nil { s.metrics.observePoll(time.Since(start), err) } return err } func (s *Service) pollOnce(ctx context.Context) error { var firstErr error failures := 0 for _, pair := range s.pairs { start := time.Now() err := s.upsertPair(ctx, pair) elapsed := time.Since(start) if s.metrics != nil { s.metrics.observePair(pair, elapsed, err) } if err != nil { if firstErr == nil { firstErr = err } failures++ s.logger.Warn("Failed to ingest pair", zap.String("symbol", pair.Symbol), zap.String("source", pair.Source.String()), zap.String("provider", pair.Provider), zap.String("base", pair.Base), zap.String("quote", pair.Quote), zap.Bool("invert", pair.Invert), zap.Duration("elapsed", elapsed), zap.Error(err), ) } } if failures > 0 { s.logger.Warn("Ingestion poll completed with failures", zap.Int("failures", failures), zap.Int("total", len(s.pairs))) } else { s.logger.Info("Ingestion poll completed", zap.Int("total", len(s.pairs))) } return firstErr } func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error { connector, ok := s.connectors[pair.Source] if !ok { return merrors.InvalidArgument("connector not configured for source "+pair.Source.String(), "source") } ticker, err := connector.FetchTicker(ctx, pair.Symbol) if err != nil { return merrors.InternalWrap(err, "fetch ticker: "+pair.Symbol) } bid, err := parseDecimal(ticker.BidPrice) if err != nil { return merrors.InvalidArgumentWrap(err, "parse bid price", "bid") } ask, err := parseDecimal(ticker.AskPrice) if err != nil { return merrors.InvalidArgumentWrap(err, "parse ask price", "ask") } if pair.Invert { bid, ask = invertPrices(bid, ask) } if ask.Cmp(bid) < 0 { // Ensure bid <= ask to keep downstream logic predictable. bid, ask = ask, bid } mid := new(big.Rat).Add(bid, ask) mid.Quo(mid, big.NewRat(2, 1)) spread := big.NewRat(0, 1) if mid.Sign() != 0 { spread.Sub(ask, bid) if spread.Sign() < 0 { spread.Neg(spread) } spread.Quo(spread, mid) spread.Mul(spread, big.NewRat(10000, 1)) // basis points } now := time.Now().UTC() asOf := now snapshot := &model.RateSnapshot{ RateRef: market.BuildRateReference(pair.Provider, pair.Symbol, now), Pair: model.CurrencyPair{Base: pair.Base, Quote: pair.Quote}, Provider: pair.Provider, Mid: formatDecimal(mid), Bid: formatDecimal(bid), Ask: formatDecimal(ask), SpreadBps: formatDecimal(spread), AsOfUnixMs: now.UnixMilli(), AsOf: &asOf, Source: ticker.Provider, ProviderRef: ticker.Symbol, } if err := s.rates.UpsertSnapshot(ctx, snapshot); err != nil { return merrors.InternalWrap(err, "upsert snapshot") } s.logger.Debug("Snapshot ingested", zap.String("pair", pair.Base+"/"+pair.Quote), zap.String("provider", pair.Provider), zap.String("source", pair.Source.String()), zap.String("provider_ref", snapshot.ProviderRef), zap.String("bid", snapshot.Bid), zap.String("ask", snapshot.Ask), zap.String("mid", snapshot.Mid), zap.String("spread_bps", snapshot.SpreadBps), zap.Int64("asof_unix_ms", snapshot.AsOfUnixMs), zap.String("rate_ref", snapshot.RateRef), ) return nil } func parseDecimal(value string) (*big.Rat, error) { r := new(big.Rat) if _, ok := r.SetString(value); !ok { return nil, merrors.InvalidArgument("invalid decimal \""+value+"\"", "value") } return r, nil } func invertPrices(bid, ask *big.Rat) (*big.Rat, *big.Rat) { if bid.Sign() == 0 || ask.Sign() == 0 { return bid, ask } one := big.NewRat(1, 1) invBid := new(big.Rat).Quo(one, ask) // invert ask to get bid invAsk := new(big.Rat).Quo(one, bid) // invert bid to get ask return invBid, invAsk } func formatDecimal(r *big.Rat) string { if r == nil { return "0" } // Format with 8 decimal places, trimming trailing zeros. return r.FloatString(8) }