Merge pull request 'improved fx/ingestor logging' (#93) from logging-92 into main
Some checks failed
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/chain_gateway Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline failed
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/nats Pipeline was successful
ci/woodpecker/push/mntx_gateway Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful

Reviewed-on: #93
This commit was merged in pull request #93.
This commit is contained in:
2025-12-12 00:07:25 +00:00
3 changed files with 28 additions and 12 deletions

View File

@@ -85,6 +85,7 @@ func (s *Service) executePoll(ctx context.Context) error {
func (s *Service) pollOnce(ctx context.Context) error {
var firstErr error
failures := 0
for _, pair := range s.pairs {
start := time.Now()
err := s.upsertPair(ctx, pair)
@@ -96,14 +97,24 @@ func (s *Service) pollOnce(ctx context.Context) error {
if firstErr == nil {
firstErr = err
}
failures++
s.logger.Warn("Failed to ingest pair",
zap.String("symbol", pair.Symbol),
zap.String("source", pair.Source.String()),
zap.String("provider", pair.Provider),
zap.String("base", pair.Base),
zap.String("quote", pair.Quote),
zap.Bool("invert", pair.Invert),
zap.Duration("elapsed", elapsed),
zap.Error(err),
)
}
}
if failures > 0 {
s.logger.Warn("Ingestion poll completed with failures", zap.Int("failures", failures), zap.Int("total", len(s.pairs)))
} else {
s.logger.Info("Ingestion poll completed", zap.Int("total", len(s.pairs)))
}
return firstErr
}
@@ -115,7 +126,7 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error {
ticker, err := connector.FetchTicker(ctx, pair.Symbol)
if err != nil {
return merrors.InternalWrap(err, "fetch ticker")
return merrors.InternalWrap(err, "fetch ticker: "+pair.Symbol)
}
bid, err := parseDecimal(ticker.BidPrice)
@@ -172,9 +183,14 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error {
s.logger.Debug("Snapshot ingested",
zap.String("pair", pair.Base+"/"+pair.Quote),
zap.String("provider", pair.Provider),
zap.String("source", pair.Source.String()),
zap.String("provider_ref", snapshot.ProviderRef),
zap.String("bid", snapshot.Bid),
zap.String("ask", snapshot.Ask),
zap.String("mid", snapshot.Mid),
zap.String("spread_bps", snapshot.SpreadBps),
zap.Int64("asof_unix_ms", snapshot.AsOfUnixMs),
zap.String("rate_ref", snapshot.RateRef),
)
return nil

View File

@@ -168,13 +168,13 @@ func (c *cbrConnector) refreshDirectory() error {
resp, err := c.client.Do(req)
if err != nil {
c.logger.Warn("CBR directory request failed", zap.Error(err))
c.logger.Warn("CBR directory request failed", zap.Error(err), zap.String("endpoint", endpoint))
return merrors.InternalWrap(err, "cbr: directory request failed")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
c.logger.Warn("CBR directory returned non-OK status", zap.Int("status", resp.StatusCode))
c.logger.Warn("CBR directory returned non-OK status", zap.Int("status", resp.StatusCode), zap.String("endpoint", endpoint))
return merrors.Internal("cbr: unexpected status " + strconv.Itoa(resp.StatusCode))
}
@@ -183,7 +183,7 @@ func (c *cbrConnector) refreshDirectory() error {
var directory valuteDirectory
if err := decoder.Decode(&directory); err != nil {
c.logger.Warn("CBR directory decode failed", zap.Error(err))
c.logger.Warn("CBR directory decode failed", zap.Error(err), zap.String("endpoint", endpoint))
return merrors.InternalWrap(err, "cbr: decode directory")
}

View File

@@ -40,15 +40,15 @@ func main() {
application, err := app.New(logger, *configFile)
if err != nil {
logger.Fatal("Failed to initialise application", zap.Error(err))
}
if err := application.Run(ctx); err != nil {
if errors.Is(err, context.Canceled) {
logger.Info("FX ingestor stopped")
return
logger.Error("Failed to initialise application", zap.Error(err))
} else {
if err := application.Run(ctx); err != nil {
if errors.Is(err, context.Canceled) {
logger.Info("FX ingestor stopped")
return
}
logger.Error("Ingestor terminated with error", zap.Error(err))
}
logger.Fatal("Ingestor terminated with error", zap.Error(err))
}
logger.Info("FX ingestor stopped")