diff --git a/api/fx/ingestor/internal/ingestor/service.go b/api/fx/ingestor/internal/ingestor/service.go index 75bc752..7ebc7dd 100644 --- a/api/fx/ingestor/internal/ingestor/service.go +++ b/api/fx/ingestor/internal/ingestor/service.go @@ -85,6 +85,7 @@ func (s *Service) executePoll(ctx context.Context) error { func (s *Service) pollOnce(ctx context.Context) error { var firstErr error + failures := 0 for _, pair := range s.pairs { start := time.Now() err := s.upsertPair(ctx, pair) @@ -96,14 +97,24 @@ func (s *Service) pollOnce(ctx context.Context) error { if firstErr == nil { firstErr = err } + failures++ s.logger.Warn("Failed to ingest pair", zap.String("symbol", pair.Symbol), zap.String("source", pair.Source.String()), + zap.String("provider", pair.Provider), + zap.String("base", pair.Base), + zap.String("quote", pair.Quote), + zap.Bool("invert", pair.Invert), zap.Duration("elapsed", elapsed), zap.Error(err), ) } } + if failures > 0 { + s.logger.Warn("Ingestion poll completed with failures", zap.Int("failures", failures), zap.Int("total", len(s.pairs))) + } else { + s.logger.Info("Ingestion poll completed", zap.Int("total", len(s.pairs))) + } return firstErr } @@ -115,7 +126,7 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error { ticker, err := connector.FetchTicker(ctx, pair.Symbol) if err != nil { - return merrors.InternalWrap(err, "fetch ticker") + return merrors.InternalWrap(err, "fetch ticker: "+pair.Symbol) } bid, err := parseDecimal(ticker.BidPrice) @@ -172,9 +183,14 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error { s.logger.Debug("Snapshot ingested", zap.String("pair", pair.Base+"/"+pair.Quote), zap.String("provider", pair.Provider), + zap.String("source", pair.Source.String()), + zap.String("provider_ref", snapshot.ProviderRef), zap.String("bid", snapshot.Bid), zap.String("ask", snapshot.Ask), zap.String("mid", snapshot.Mid), + zap.String("spread_bps", snapshot.SpreadBps), + zap.Int64("asof_unix_ms", snapshot.AsOfUnixMs), + zap.String("rate_ref", snapshot.RateRef), ) return nil diff --git a/api/fx/ingestor/internal/market/cbr/connector.go b/api/fx/ingestor/internal/market/cbr/connector.go index ed4f6fb..516feb5 100644 --- a/api/fx/ingestor/internal/market/cbr/connector.go +++ b/api/fx/ingestor/internal/market/cbr/connector.go @@ -168,13 +168,13 @@ func (c *cbrConnector) refreshDirectory() error { resp, err := c.client.Do(req) if err != nil { - c.logger.Warn("CBR directory request failed", zap.Error(err)) + c.logger.Warn("CBR directory request failed", zap.Error(err), zap.String("endpoint", endpoint)) return merrors.InternalWrap(err, "cbr: directory request failed") } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - c.logger.Warn("CBR directory returned non-OK status", zap.Int("status", resp.StatusCode)) + c.logger.Warn("CBR directory returned non-OK status", zap.Int("status", resp.StatusCode), zap.String("endpoint", endpoint)) return merrors.Internal("cbr: unexpected status " + strconv.Itoa(resp.StatusCode)) } @@ -183,7 +183,7 @@ func (c *cbrConnector) refreshDirectory() error { var directory valuteDirectory if err := decoder.Decode(&directory); err != nil { - c.logger.Warn("CBR directory decode failed", zap.Error(err)) + c.logger.Warn("CBR directory decode failed", zap.Error(err), zap.String("endpoint", endpoint)) return merrors.InternalWrap(err, "cbr: decode directory") } diff --git a/api/fx/ingestor/main.go b/api/fx/ingestor/main.go index f2ad5dd..ce51c38 100644 --- a/api/fx/ingestor/main.go +++ b/api/fx/ingestor/main.go @@ -40,15 +40,15 @@ func main() { application, err := app.New(logger, *configFile) if err != nil { - logger.Fatal("Failed to initialise application", zap.Error(err)) - } - - if err := application.Run(ctx); err != nil { - if errors.Is(err, context.Canceled) { - logger.Info("FX ingestor stopped") - return + logger.Error("Failed to initialise application", zap.Error(err)) + } else { + if err := application.Run(ctx); err != nil { + if errors.Is(err, context.Canceled) { + logger.Info("FX ingestor stopped") + return + } + logger.Error("Ingestor terminated with error", zap.Error(err)) } - logger.Fatal("Ingestor terminated with error", zap.Error(err)) } logger.Info("FX ingestor stopped")