improved fx/ingestor logging
This commit is contained in:
@@ -85,6 +85,7 @@ func (s *Service) executePoll(ctx context.Context) error {
|
|||||||
|
|
||||||
func (s *Service) pollOnce(ctx context.Context) error {
|
func (s *Service) pollOnce(ctx context.Context) error {
|
||||||
var firstErr error
|
var firstErr error
|
||||||
|
failures := 0
|
||||||
for _, pair := range s.pairs {
|
for _, pair := range s.pairs {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := s.upsertPair(ctx, pair)
|
err := s.upsertPair(ctx, pair)
|
||||||
@@ -96,14 +97,24 @@ func (s *Service) pollOnce(ctx context.Context) error {
|
|||||||
if firstErr == nil {
|
if firstErr == nil {
|
||||||
firstErr = err
|
firstErr = err
|
||||||
}
|
}
|
||||||
|
failures++
|
||||||
s.logger.Warn("Failed to ingest pair",
|
s.logger.Warn("Failed to ingest pair",
|
||||||
zap.String("symbol", pair.Symbol),
|
zap.String("symbol", pair.Symbol),
|
||||||
zap.String("source", pair.Source.String()),
|
zap.String("source", pair.Source.String()),
|
||||||
|
zap.String("provider", pair.Provider),
|
||||||
|
zap.String("base", pair.Base),
|
||||||
|
zap.String("quote", pair.Quote),
|
||||||
|
zap.Bool("invert", pair.Invert),
|
||||||
zap.Duration("elapsed", elapsed),
|
zap.Duration("elapsed", elapsed),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if failures > 0 {
|
||||||
|
s.logger.Warn("Ingestion poll completed with failures", zap.Int("failures", failures), zap.Int("total", len(s.pairs)))
|
||||||
|
} else {
|
||||||
|
s.logger.Info("Ingestion poll completed", zap.Int("total", len(s.pairs)))
|
||||||
|
}
|
||||||
return firstErr
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,7 +126,7 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error {
|
|||||||
|
|
||||||
ticker, err := connector.FetchTicker(ctx, pair.Symbol)
|
ticker, err := connector.FetchTicker(ctx, pair.Symbol)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return merrors.InternalWrap(err, "fetch ticker")
|
return merrors.InternalWrap(err, "fetch ticker: "+pair.Symbol)
|
||||||
}
|
}
|
||||||
|
|
||||||
bid, err := parseDecimal(ticker.BidPrice)
|
bid, err := parseDecimal(ticker.BidPrice)
|
||||||
@@ -172,9 +183,14 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error {
|
|||||||
s.logger.Debug("Snapshot ingested",
|
s.logger.Debug("Snapshot ingested",
|
||||||
zap.String("pair", pair.Base+"/"+pair.Quote),
|
zap.String("pair", pair.Base+"/"+pair.Quote),
|
||||||
zap.String("provider", pair.Provider),
|
zap.String("provider", pair.Provider),
|
||||||
|
zap.String("source", pair.Source.String()),
|
||||||
|
zap.String("provider_ref", snapshot.ProviderRef),
|
||||||
zap.String("bid", snapshot.Bid),
|
zap.String("bid", snapshot.Bid),
|
||||||
zap.String("ask", snapshot.Ask),
|
zap.String("ask", snapshot.Ask),
|
||||||
zap.String("mid", snapshot.Mid),
|
zap.String("mid", snapshot.Mid),
|
||||||
|
zap.String("spread_bps", snapshot.SpreadBps),
|
||||||
|
zap.Int64("asof_unix_ms", snapshot.AsOfUnixMs),
|
||||||
|
zap.String("rate_ref", snapshot.RateRef),
|
||||||
)
|
)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -168,13 +168,13 @@ func (c *cbrConnector) refreshDirectory() error {
|
|||||||
|
|
||||||
resp, err := c.client.Do(req)
|
resp, err := c.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn("CBR directory request failed", zap.Error(err))
|
c.logger.Warn("CBR directory request failed", zap.Error(err), zap.String("endpoint", endpoint))
|
||||||
return merrors.InternalWrap(err, "cbr: directory request failed")
|
return merrors.InternalWrap(err, "cbr: directory request failed")
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
c.logger.Warn("CBR directory returned non-OK status", zap.Int("status", resp.StatusCode))
|
c.logger.Warn("CBR directory returned non-OK status", zap.Int("status", resp.StatusCode), zap.String("endpoint", endpoint))
|
||||||
return merrors.Internal("cbr: unexpected status " + strconv.Itoa(resp.StatusCode))
|
return merrors.Internal("cbr: unexpected status " + strconv.Itoa(resp.StatusCode))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,7 +183,7 @@ func (c *cbrConnector) refreshDirectory() error {
|
|||||||
|
|
||||||
var directory valuteDirectory
|
var directory valuteDirectory
|
||||||
if err := decoder.Decode(&directory); err != nil {
|
if err := decoder.Decode(&directory); err != nil {
|
||||||
c.logger.Warn("CBR directory decode failed", zap.Error(err))
|
c.logger.Warn("CBR directory decode failed", zap.Error(err), zap.String("endpoint", endpoint))
|
||||||
return merrors.InternalWrap(err, "cbr: decode directory")
|
return merrors.InternalWrap(err, "cbr: decode directory")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,15 +40,15 @@ func main() {
|
|||||||
|
|
||||||
application, err := app.New(logger, *configFile)
|
application, err := app.New(logger, *configFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatal("Failed to initialise application", zap.Error(err))
|
logger.Error("Failed to initialise application", zap.Error(err))
|
||||||
}
|
} else {
|
||||||
|
if err := application.Run(ctx); err != nil {
|
||||||
if err := application.Run(ctx); err != nil {
|
if errors.Is(err, context.Canceled) {
|
||||||
if errors.Is(err, context.Canceled) {
|
logger.Info("FX ingestor stopped")
|
||||||
logger.Info("FX ingestor stopped")
|
return
|
||||||
return
|
}
|
||||||
|
logger.Error("Ingestor terminated with error", zap.Error(err))
|
||||||
}
|
}
|
||||||
logger.Fatal("Ingestor terminated with error", zap.Error(err))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("FX ingestor stopped")
|
logger.Info("FX ingestor stopped")
|
||||||
|
|||||||
Reference in New Issue
Block a user