Files
sendico/api/fx/ingestor/internal/ingestor/service.go
2025-12-12 01:07:03 +01:00

224 lines
5.9 KiB
Go

package ingestor
import (
"context"
"math/big"
"time"
"github.com/tech/sendico/fx/ingestor/internal/config"
"github.com/tech/sendico/fx/ingestor/internal/market"
mmodel "github.com/tech/sendico/fx/ingestor/internal/model"
"github.com/tech/sendico/fx/storage"
"github.com/tech/sendico/fx/storage/model"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap"
)
type Service struct {
logger mlogger.Logger
cfg *config.Config
rates storage.RatesStore
pairs []config.Pair
connectors map[mmodel.Driver]mmodel.Connector
metrics *serviceMetrics
}
func New(logger mlogger.Logger, cfg *config.Config, repo storage.Repository) (*Service, error) {
if logger == nil {
return nil, merrors.InvalidArgument("ingestor: nil logger")
}
if cfg == nil {
return nil, merrors.InvalidArgument("ingestor: nil config")
}
if repo == nil {
return nil, merrors.InvalidArgument("ingestor: nil repository")
}
connectors, err := market.BuildConnectors(logger, cfg.Market.Sources)
if err != nil {
return nil, merrors.InternalWrap(err, "build connectors")
}
return &Service{
logger: logger.Named("ingestor"),
cfg: cfg,
rates: repo.Rates(),
pairs: cfg.Pairs(),
connectors: connectors,
metrics: getServiceMetrics(),
}, nil
}
func (s *Service) Run(ctx context.Context) error {
interval := s.cfg.PollInterval()
ticker := time.NewTicker(interval)
defer ticker.Stop()
s.logger.Info("FX ingestion service started", zap.Duration("poll_interval", interval), zap.Int("pairs", len(s.pairs)))
if err := s.executePoll(ctx); err != nil {
s.logger.Warn("Initial poll completed with errors", zap.Error(err))
}
for {
select {
case <-ctx.Done():
s.logger.Info("Context cancelled, stopping ingestor")
return ctx.Err()
case <-ticker.C:
if err := s.executePoll(ctx); err != nil {
s.logger.Warn("Polling cycle completed with errors", zap.Error(err))
}
}
}
}
func (s *Service) executePoll(ctx context.Context) error {
start := time.Now()
err := s.pollOnce(ctx)
if s.metrics != nil {
s.metrics.observePoll(time.Since(start), err)
}
return err
}
func (s *Service) pollOnce(ctx context.Context) error {
var firstErr error
failures := 0
for _, pair := range s.pairs {
start := time.Now()
err := s.upsertPair(ctx, pair)
elapsed := time.Since(start)
if s.metrics != nil {
s.metrics.observePair(pair, elapsed, err)
}
if err != nil {
if firstErr == nil {
firstErr = err
}
failures++
s.logger.Warn("Failed to ingest pair",
zap.String("symbol", pair.Symbol),
zap.String("source", pair.Source.String()),
zap.String("provider", pair.Provider),
zap.String("base", pair.Base),
zap.String("quote", pair.Quote),
zap.Bool("invert", pair.Invert),
zap.Duration("elapsed", elapsed),
zap.Error(err),
)
}
}
if failures > 0 {
s.logger.Warn("Ingestion poll completed with failures", zap.Int("failures", failures), zap.Int("total", len(s.pairs)))
} else {
s.logger.Info("Ingestion poll completed", zap.Int("total", len(s.pairs)))
}
return firstErr
}
func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error {
connector, ok := s.connectors[pair.Source]
if !ok {
return merrors.InvalidArgument("connector not configured for source "+pair.Source.String(), "source")
}
ticker, err := connector.FetchTicker(ctx, pair.Symbol)
if err != nil {
return merrors.InternalWrap(err, "fetch ticker: "+pair.Symbol)
}
bid, err := parseDecimal(ticker.BidPrice)
if err != nil {
return merrors.InvalidArgumentWrap(err, "parse bid price", "bid")
}
ask, err := parseDecimal(ticker.AskPrice)
if err != nil {
return merrors.InvalidArgumentWrap(err, "parse ask price", "ask")
}
if pair.Invert {
bid, ask = invertPrices(bid, ask)
}
if ask.Cmp(bid) < 0 {
// Ensure bid <= ask to keep downstream logic predictable.
bid, ask = ask, bid
}
mid := new(big.Rat).Add(bid, ask)
mid.Quo(mid, big.NewRat(2, 1))
spread := big.NewRat(0, 1)
if mid.Sign() != 0 {
spread.Sub(ask, bid)
if spread.Sign() < 0 {
spread.Neg(spread)
}
spread.Quo(spread, mid)
spread.Mul(spread, big.NewRat(10000, 1)) // basis points
}
now := time.Now().UTC()
asOf := now
snapshot := &model.RateSnapshot{
RateRef: market.BuildRateReference(pair.Provider, pair.Symbol, now),
Pair: model.CurrencyPair{Base: pair.Base, Quote: pair.Quote},
Provider: pair.Provider,
Mid: formatDecimal(mid),
Bid: formatDecimal(bid),
Ask: formatDecimal(ask),
SpreadBps: formatDecimal(spread),
AsOfUnixMs: now.UnixMilli(),
AsOf: &asOf,
Source: ticker.Provider,
ProviderRef: ticker.Symbol,
}
if err := s.rates.UpsertSnapshot(ctx, snapshot); err != nil {
return merrors.InternalWrap(err, "upsert snapshot")
}
s.logger.Debug("Snapshot ingested",
zap.String("pair", pair.Base+"/"+pair.Quote),
zap.String("provider", pair.Provider),
zap.String("source", pair.Source.String()),
zap.String("provider_ref", snapshot.ProviderRef),
zap.String("bid", snapshot.Bid),
zap.String("ask", snapshot.Ask),
zap.String("mid", snapshot.Mid),
zap.String("spread_bps", snapshot.SpreadBps),
zap.Int64("asof_unix_ms", snapshot.AsOfUnixMs),
zap.String("rate_ref", snapshot.RateRef),
)
return nil
}
func parseDecimal(value string) (*big.Rat, error) {
r := new(big.Rat)
if _, ok := r.SetString(value); !ok {
return nil, merrors.InvalidArgument("invalid decimal \""+value+"\"", "value")
}
return r, nil
}
func invertPrices(bid, ask *big.Rat) (*big.Rat, *big.Rat) {
if bid.Sign() == 0 || ask.Sign() == 0 {
return bid, ask
}
one := big.NewRat(1, 1)
invBid := new(big.Rat).Quo(one, ask) // invert ask to get bid
invAsk := new(big.Rat).Quo(one, bid) // invert bid to get ask
return invBid, invAsk
}
func formatDecimal(r *big.Rat) string {
if r == nil {
return "0"
}
// Format with 8 decimal places, trimming trailing zeros.
return r.FloatString(8)
}