improved FX logging #197
@@ -130,7 +130,7 @@ func (s *Service) getQuoteResponder(ctx context.Context, req *oraclev1.GetQuoteR
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, merrors.ErrNoData):
|
case errors.Is(err, merrors.ErrNoData):
|
||||||
logger.Warn("pair not supported", zap.String("pair", pairKey.Base+"/"+pairKey.Quote))
|
logger.Warn("Pair not supported", zap.String("pair", pairKey.Base+"/"+pairKey.Quote))
|
||||||
return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, merrors.InvalidArgument("pair_not_supported"))
|
return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, merrors.InvalidArgument("pair_not_supported"))
|
||||||
default:
|
default:
|
||||||
logger.Warn("GetQuote failed to load pair", zap.Error(err))
|
logger.Warn("GetQuote failed to load pair", zap.Error(err))
|
||||||
@@ -150,7 +150,7 @@ func (s *Service) getQuoteResponder(ctx context.Context, req *oraclev1.GetQuoteR
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, merrors.ErrNoData):
|
case errors.Is(err, merrors.ErrNoData):
|
||||||
logger.Warn("rate not found", zap.String("pair", pairKey.Base+"/"+pairKey.Quote), zap.String("provider", provider))
|
logger.Warn("Rate not found", zap.String("pair", pairKey.Base+"/"+pairKey.Quote), zap.String("provider", provider))
|
||||||
return gsresponse.FailedPrecondition[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, "rate_not_found", err)
|
return gsresponse.FailedPrecondition[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, "rate_not_found", err)
|
||||||
default:
|
default:
|
||||||
logger.Warn("GetQuote failed to load rate", zap.Error(err), zap.String("pair", pairKey.Base+"/"+pairKey.Quote), zap.String("provider", provider))
|
logger.Warn("GetQuote failed to load rate", zap.Error(err), zap.String("pair", pairKey.Base+"/"+pairKey.Quote), zap.String("provider", provider))
|
||||||
|
|||||||
@@ -50,28 +50,28 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Store, error) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
if err := s.Ping(ctx); err != nil {
|
if err := s.Ping(ctx); err != nil {
|
||||||
s.logger.Error("mongo ping failed during store init", zap.Error(err))
|
s.logger.Error("Mongo ping failed during store init", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ratesStore, err := store.NewRates(s.logger, db)
|
ratesStore, err := store.NewRates(s.logger, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("failed to initialize rates store", zap.Error(err))
|
s.logger.Error("Failed to initialize rates store", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
quotesStore, err := store.NewQuotes(s.logger, db, txFactory)
|
quotesStore, err := store.NewQuotes(s.logger, db, txFactory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("failed to initialize quotes store", zap.Error(err))
|
s.logger.Error("Failed to initialize quotes store", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
pairsStore, err := store.NewPair(s.logger, db)
|
pairsStore, err := store.NewPair(s.logger, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("failed to initialize pair store", zap.Error(err))
|
s.logger.Error("Failed to initialize pair store", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
currencyStore, err := store.NewCurrency(s.logger, db)
|
currencyStore, err := store.NewCurrency(s.logger, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("failed to initialize currency store", zap.Error(err))
|
s.logger.Error("Failed to initialize currency store", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,7 +80,7 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Store, error) {
|
|||||||
s.pairs = pairsStore
|
s.pairs = pairsStore
|
||||||
s.currencies = currencyStore
|
s.currencies = currencyStore
|
||||||
|
|
||||||
s.logger.Info("mongo storage ready")
|
s.logger.Info("Mongo storage ready")
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -29,11 +29,11 @@ func NewCurrency(logger mlogger.Logger, db *mongo.Database) (storage.CurrencySto
|
|||||||
Unique: true,
|
Unique: true,
|
||||||
}
|
}
|
||||||
if err := repo.CreateIndex(index); err != nil {
|
if err := repo.CreateIndex(index); err != nil {
|
||||||
logger.Error("failed to ensure currencies index", zap.Error(err))
|
logger.Error("Failed to ensure currencies index", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
childLogger := logger.Named(model.CurrenciesCollection)
|
childLogger := logger.Named(model.CurrenciesCollection)
|
||||||
childLogger.Debug("currency store initialised", zap.String("collection", model.CurrenciesCollection))
|
childLogger.Debug("Currency store initialised", zap.String("collection", model.CurrenciesCollection))
|
||||||
|
|
||||||
return ¤cyStore{
|
return ¤cyStore{
|
||||||
logger: childLogger,
|
logger: childLogger,
|
||||||
@@ -43,17 +43,17 @@ func NewCurrency(logger mlogger.Logger, db *mongo.Database) (storage.CurrencySto
|
|||||||
|
|
||||||
func (c *currencyStore) Get(ctx context.Context, code string) (*model.Currency, error) {
|
func (c *currencyStore) Get(ctx context.Context, code string) (*model.Currency, error) {
|
||||||
if code == "" {
|
if code == "" {
|
||||||
c.logger.Warn("attempt to fetch currency with empty code")
|
c.logger.Warn("Attempt to fetch currency with empty code")
|
||||||
return nil, merrors.InvalidArgument("currencyStore: empty code")
|
return nil, merrors.InvalidArgument("currencyStore: empty code")
|
||||||
}
|
}
|
||||||
result := &model.Currency{}
|
result := &model.Currency{}
|
||||||
if err := c.repo.FindOneByFilter(ctx, repository.Filter("code", code), result); err != nil {
|
if err := c.repo.FindOneByFilter(ctx, repository.Filter("code", code), result); err != nil {
|
||||||
if errors.Is(err, merrors.ErrNoData) {
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
c.logger.Debug("currency not found", zap.String("code", code))
|
c.logger.Debug("Currency not found", zap.String("code", code))
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c.logger.Debug("currency loaded", zap.String("code", code))
|
c.logger.Debug("Currency loaded", zap.String("code", code))
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -77,20 +77,20 @@ func (c *currencyStore) List(ctx context.Context, codes ...string) ([]*model.Cur
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Error("failed to list currencies", zap.Error(err))
|
c.logger.Warn("Failed to list currencies", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c.logger.Debug("listed currencies", zap.Int("count", len(currencies)))
|
c.logger.Debug("Listed currencies", zap.Int("count", len(currencies)))
|
||||||
return currencies, nil
|
return currencies, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *currencyStore) Upsert(ctx context.Context, currency *model.Currency) error {
|
func (c *currencyStore) Upsert(ctx context.Context, currency *model.Currency) error {
|
||||||
if currency == nil {
|
if currency == nil {
|
||||||
c.logger.Warn("attempt to upsert nil currency")
|
c.logger.Warn("Attempt to upsert nil currency")
|
||||||
return merrors.InvalidArgument("currencyStore: nil currency")
|
return merrors.InvalidArgument("currencyStore: nil currency")
|
||||||
}
|
}
|
||||||
if currency.Code == "" {
|
if currency.Code == "" {
|
||||||
c.logger.Warn("attempt to upsert currency with empty code")
|
c.logger.Warn("Attempt to upsert currency with empty code")
|
||||||
return merrors.InvalidArgument("currencyStore: empty code")
|
return merrors.InvalidArgument("currencyStore: empty code")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -98,16 +98,16 @@ func (c *currencyStore) Upsert(ctx context.Context, currency *model.Currency) er
|
|||||||
filter := repository.Filter("code", currency.Code)
|
filter := repository.Filter("code", currency.Code)
|
||||||
if err := c.repo.FindOneByFilter(ctx, filter, existing); err != nil {
|
if err := c.repo.FindOneByFilter(ctx, filter, existing); err != nil {
|
||||||
if errors.Is(err, merrors.ErrNoData) {
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
c.logger.Debug("inserting new currency", zap.String("code", currency.Code))
|
c.logger.Debug("Inserting new currency", zap.String("code", currency.Code))
|
||||||
return c.repo.Insert(ctx, currency, filter)
|
return c.repo.Insert(ctx, currency, filter)
|
||||||
}
|
}
|
||||||
c.logger.Error("failed to fetch currency", zap.Error(err), zap.String("code", currency.Code))
|
c.logger.Warn("Failed to fetch currency", zap.Error(err), zap.String("code", currency.Code))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if existing.GetID() != nil {
|
if existing.GetID() != nil {
|
||||||
currency.SetID(*existing.GetID())
|
currency.SetID(*existing.GetID())
|
||||||
}
|
}
|
||||||
c.logger.Debug("updating currency", zap.String("code", currency.Code))
|
c.logger.Debug("Updating currency", zap.String("code", currency.Code))
|
||||||
return c.repo.Update(ctx, currency)
|
return c.repo.Update(ctx, currency)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,10 +29,10 @@ func NewPair(logger mlogger.Logger, db *mongo.Database) (storage.PairStore, erro
|
|||||||
Unique: true,
|
Unique: true,
|
||||||
}
|
}
|
||||||
if err := repo.CreateIndex(index); err != nil {
|
if err := repo.CreateIndex(index); err != nil {
|
||||||
logger.Error("failed to ensure pairs index", zap.Error(err))
|
logger.Error("Failed to ensure pairs index", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
logger.Debug("pair store initialised", zap.String("collection", model.PairsCollection))
|
logger.Debug("Pair store initialised", zap.String("collection", model.PairsCollection))
|
||||||
|
|
||||||
return &pairStore{
|
return &pairStore{
|
||||||
logger: logger.Named(model.PairsCollection),
|
logger: logger.Named(model.PairsCollection),
|
||||||
@@ -53,16 +53,16 @@ func (p *pairStore) ListEnabled(ctx context.Context) ([]*model.Pair, error) {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.logger.Error("failed to list enabled pairs", zap.Error(err))
|
p.logger.Warn("Failed to list enabled pairs", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
p.logger.Debug("listed enabled pairs", zap.Int("count", len(pairs)))
|
p.logger.Debug("Listed enabled pairs", zap.Int("count", len(pairs)))
|
||||||
return pairs, nil
|
return pairs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pairStore) Get(ctx context.Context, pair model.CurrencyPair) (*model.Pair, error) {
|
func (p *pairStore) Get(ctx context.Context, pair model.CurrencyPair) (*model.Pair, error) {
|
||||||
if pair.Base == "" || pair.Quote == "" {
|
if pair.Base == "" || pair.Quote == "" {
|
||||||
p.logger.Warn("attempt to fetch pair with empty currency", zap.String("base", pair.Base), zap.String("quote", pair.Quote))
|
p.logger.Warn("Attempt to fetch pair with empty currency", zap.String("base", pair.Base), zap.String("quote", pair.Quote))
|
||||||
return nil, merrors.InvalidArgument("pairStore: incomplete pair")
|
return nil, merrors.InvalidArgument("pairStore: incomplete pair")
|
||||||
}
|
}
|
||||||
result := &model.Pair{}
|
result := &model.Pair{}
|
||||||
@@ -71,21 +71,21 @@ func (p *pairStore) Get(ctx context.Context, pair model.CurrencyPair) (*model.Pa
|
|||||||
Filter(repository.Field("pair").Dot("quote"), pair.Quote)
|
Filter(repository.Field("pair").Dot("quote"), pair.Quote)
|
||||||
if err := p.repo.FindOneByFilter(ctx, query, result); err != nil {
|
if err := p.repo.FindOneByFilter(ctx, query, result); err != nil {
|
||||||
if errors.Is(err, merrors.ErrNoData) {
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
p.logger.Debug("pair not found", zap.String("base", pair.Base), zap.String("quote", pair.Quote))
|
p.logger.Debug("Pair not found", zap.String("base", pair.Base), zap.String("quote", pair.Quote))
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
p.logger.Debug("pair loaded", zap.String("base", pair.Base), zap.String("quote", pair.Quote))
|
p.logger.Debug("Pair loaded", zap.String("base", pair.Base), zap.String("quote", pair.Quote))
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *pairStore) Upsert(ctx context.Context, pair *model.Pair) error {
|
func (p *pairStore) Upsert(ctx context.Context, pair *model.Pair) error {
|
||||||
if pair == nil {
|
if pair == nil {
|
||||||
p.logger.Warn("attempt to upsert nil pair")
|
p.logger.Warn("Attempt to upsert nil pair")
|
||||||
return merrors.InvalidArgument("pairStore: nil pair")
|
return merrors.InvalidArgument("pairStore: nil pair")
|
||||||
}
|
}
|
||||||
if pair.Pair.Base == "" || pair.Pair.Quote == "" {
|
if pair.Pair.Base == "" || pair.Pair.Quote == "" {
|
||||||
p.logger.Warn("attempt to upsert pair with empty currency", zap.String("base", pair.Pair.Base), zap.String("quote", pair.Pair.Quote))
|
p.logger.Warn("Attempt to upsert pair with empty currency", zap.String("base", pair.Pair.Base), zap.String("quote", pair.Pair.Quote))
|
||||||
return merrors.InvalidArgument("pairStore: incomplete pair")
|
return merrors.InvalidArgument("pairStore: incomplete pair")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,16 +96,16 @@ func (p *pairStore) Upsert(ctx context.Context, pair *model.Pair) error {
|
|||||||
err := p.repo.FindOneByFilter(ctx, query, existing)
|
err := p.repo.FindOneByFilter(ctx, query, existing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, merrors.ErrNoData) {
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
p.logger.Debug("inserting new pair", zap.String("base", pair.Pair.Base), zap.String("quote", pair.Pair.Quote))
|
p.logger.Debug("Inserting new pair", zap.String("base", pair.Pair.Base), zap.String("quote", pair.Pair.Quote))
|
||||||
return p.repo.Insert(ctx, pair, query)
|
return p.repo.Insert(ctx, pair, query)
|
||||||
}
|
}
|
||||||
p.logger.Error("failed to fetch pair", zap.Error(err), zap.String("base", pair.Pair.Base), zap.String("quote", pair.Pair.Quote))
|
p.logger.Warn("Failed to fetch pair", zap.Error(err), zap.String("base", pair.Pair.Base), zap.String("quote", pair.Pair.Quote))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if existing.GetID() != nil {
|
if existing.GetID() != nil {
|
||||||
pair.SetID(*existing.GetID())
|
pair.SetID(*existing.GetID())
|
||||||
}
|
}
|
||||||
p.logger.Debug("updating pair", zap.String("base", pair.Pair.Base), zap.String("quote", pair.Pair.Quote))
|
p.logger.Debug("Updating pair", zap.String("base", pair.Pair.Base), zap.String("quote", pair.Pair.Quote))
|
||||||
return p.repo.Update(ctx, pair)
|
return p.repo.Update(ctx, pair)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,12 +56,12 @@ func NewQuotes(logger mlogger.Logger, db *mongo.Database, txFactory transaction.
|
|||||||
|
|
||||||
for _, def := range indexes {
|
for _, def := range indexes {
|
||||||
if err := repo.CreateIndex(def); err != nil {
|
if err := repo.CreateIndex(def); err != nil {
|
||||||
logger.Error("failed to ensure quotes index", zap.Error(err))
|
logger.Error("Failed to ensure quotes index", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
childLogger := logger.Named(model.QuotesCollection)
|
childLogger := logger.Named(model.QuotesCollection)
|
||||||
childLogger.Debug("quotes store initialised", zap.String("collection", model.QuotesCollection))
|
childLogger.Debug("Quotes store initialised", zap.String("collection", model.QuotesCollection))
|
||||||
|
|
||||||
return "esStore{
|
return "esStore{
|
||||||
logger: childLogger,
|
logger: childLogger,
|
||||||
@@ -72,11 +72,11 @@ func NewQuotes(logger mlogger.Logger, db *mongo.Database, txFactory transaction.
|
|||||||
|
|
||||||
func (q *quotesStore) Issue(ctx context.Context, quote *model.Quote) error {
|
func (q *quotesStore) Issue(ctx context.Context, quote *model.Quote) error {
|
||||||
if quote == nil {
|
if quote == nil {
|
||||||
q.logger.Warn("attempt to issue nil quote")
|
q.logger.Warn("Attempt to issue nil quote")
|
||||||
return merrors.InvalidArgument("quotesStore: nil quote")
|
return merrors.InvalidArgument("quotesStore: nil quote")
|
||||||
}
|
}
|
||||||
if quote.QuoteRef == "" {
|
if quote.QuoteRef == "" {
|
||||||
q.logger.Warn("attempt to issue quote with empty ref")
|
q.logger.Warn("Attempt to issue quote with empty ref")
|
||||||
return merrors.InvalidArgument("quotesStore: empty quoteRef")
|
return merrors.InvalidArgument("quotesStore: empty quoteRef")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -89,32 +89,32 @@ func (q *quotesStore) Issue(ctx context.Context, quote *model.Quote) error {
|
|||||||
quote.ConsumedByLedgerTxnRef = ""
|
quote.ConsumedByLedgerTxnRef = ""
|
||||||
quote.ConsumedAtUnixMs = nil
|
quote.ConsumedAtUnixMs = nil
|
||||||
if err := q.repo.Insert(ctx, quote, repository.Filter("quoteRef", quote.QuoteRef)); err != nil {
|
if err := q.repo.Insert(ctx, quote, repository.Filter("quoteRef", quote.QuoteRef)); err != nil {
|
||||||
q.logger.Error("failed to insert quote", zap.Error(err), zap.String("quote_ref", quote.QuoteRef))
|
q.logger.Warn("Failed to insert quote", zap.Error(err), zap.String("quote_ref", quote.QuoteRef))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
q.logger.Debug("quote issued", zap.String("quote_ref", quote.QuoteRef), zap.Bool("firm", quote.Firm))
|
q.logger.Debug("Quote issued", zap.String("quote_ref", quote.QuoteRef), zap.Bool("firm", quote.Firm))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *quotesStore) GetByRef(ctx context.Context, quoteRef string) (*model.Quote, error) {
|
func (q *quotesStore) GetByRef(ctx context.Context, quoteRef string) (*model.Quote, error) {
|
||||||
if quoteRef == "" {
|
if quoteRef == "" {
|
||||||
q.logger.Warn("attempt to fetch quote with empty ref")
|
q.logger.Warn("Attempt to fetch quote with empty ref")
|
||||||
return nil, merrors.InvalidArgument("quotesStore: empty quoteRef")
|
return nil, merrors.InvalidArgument("quotesStore: empty quoteRef")
|
||||||
}
|
}
|
||||||
quote := &model.Quote{}
|
quote := &model.Quote{}
|
||||||
if err := q.repo.FindOneByFilter(ctx, repository.Filter("quoteRef", quoteRef), quote); err != nil {
|
if err := q.repo.FindOneByFilter(ctx, repository.Filter("quoteRef", quoteRef), quote); err != nil {
|
||||||
if errors.Is(err, merrors.ErrNoData) {
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
q.logger.Debug("quote not found", zap.String("quote_ref", quoteRef))
|
q.logger.Debug("Quote not found", zap.String("quote_ref", quoteRef))
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
q.logger.Debug("quote loaded", zap.String("quote_ref", quoteRef), zap.String("status", string(quote.Status)))
|
q.logger.Debug("Quote loaded", zap.String("quote_ref", quoteRef), zap.String("status", string(quote.Status)))
|
||||||
return quote, nil
|
return quote, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *quotesStore) Consume(ctx context.Context, quoteRef, ledgerTxnRef string, when time.Time) (*model.Quote, error) {
|
func (q *quotesStore) Consume(ctx context.Context, quoteRef, ledgerTxnRef string, when time.Time) (*model.Quote, error) {
|
||||||
if quoteRef == "" || ledgerTxnRef == "" {
|
if quoteRef == "" || ledgerTxnRef == "" {
|
||||||
q.logger.Warn("attempt to consume quote with missing identifiers", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
q.logger.Warn("Attempt to consume quote with missing identifiers", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
||||||
return nil, merrors.InvalidArgument("quotesStore: missing identifiers")
|
return nil, merrors.InvalidArgument("quotesStore: missing identifiers")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -122,7 +122,7 @@ func (q *quotesStore) Consume(ctx context.Context, quoteRef, ledgerTxnRef string
|
|||||||
when = time.Now()
|
when = time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
q.logger.Debug("consuming quote", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
q.logger.Debug("Consuming quote", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
||||||
txn := q.txFactory.CreateTransaction()
|
txn := q.txFactory.CreateTransaction()
|
||||||
result, err := txn.Execute(ctx, func(txCtx context.Context) (any, error) {
|
result, err := txn.Execute(ctx, func(txCtx context.Context) (any, error) {
|
||||||
quote := &model.Quote{}
|
quote := &model.Quote{}
|
||||||
@@ -131,7 +131,7 @@ func (q *quotesStore) Consume(ctx context.Context, quoteRef, ledgerTxnRef string
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !quote.Firm {
|
if !quote.Firm {
|
||||||
q.logger.Warn("quote not firm", zap.String("quote_ref", quoteRef))
|
q.logger.Warn("Quote not firm", zap.String("quote_ref", quoteRef))
|
||||||
return nil, storage.ErrQuoteNotFirm
|
return nil, storage.ErrQuoteNotFirm
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -140,16 +140,16 @@ func (q *quotesStore) Consume(ctx context.Context, quoteRef, ledgerTxnRef string
|
|||||||
if err := q.repo.Update(txCtx, quote); err != nil {
|
if err := q.repo.Update(txCtx, quote); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
q.logger.Info("quote expired during consume", zap.String("quote_ref", quoteRef))
|
q.logger.Info("Quote expired during consume", zap.String("quote_ref", quoteRef))
|
||||||
return nil, storage.ErrQuoteExpired
|
return nil, storage.ErrQuoteExpired
|
||||||
}
|
}
|
||||||
|
|
||||||
if quote.Status == model.QuoteStatusConsumed {
|
if quote.Status == model.QuoteStatusConsumed {
|
||||||
if quote.ConsumedByLedgerTxnRef == ledgerTxnRef {
|
if quote.ConsumedByLedgerTxnRef == ledgerTxnRef {
|
||||||
q.logger.Debug("quote already consumed by ledger", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
q.logger.Debug("Quote already consumed by ledger", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
||||||
return quote, nil
|
return quote, nil
|
||||||
}
|
}
|
||||||
q.logger.Warn("quote consumed by different ledger", zap.String("quote_ref", quoteRef), zap.String("existing_ledger_ref", quote.ConsumedByLedgerTxnRef))
|
q.logger.Warn("Quote consumed by different ledger", zap.String("quote_ref", quoteRef), zap.String("existing_ledger_ref", quote.ConsumedByLedgerTxnRef))
|
||||||
return nil, storage.ErrQuoteConsumed
|
return nil, storage.ErrQuoteConsumed
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -157,11 +157,11 @@ func (q *quotesStore) Consume(ctx context.Context, quoteRef, ledgerTxnRef string
|
|||||||
if err := q.repo.Update(txCtx, quote); err != nil {
|
if err := q.repo.Update(txCtx, quote); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
q.logger.Info("quote consumed", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
q.logger.Info("Quote consumed", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
||||||
return quote, nil
|
return quote, nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
q.logger.Error("quote consumption failed", zap.Error(err), zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
q.logger.Warn("Quote consumption failed", zap.Error(err), zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
quote, _ := result.(*model.Quote)
|
quote, _ := result.(*model.Quote)
|
||||||
@@ -173,7 +173,7 @@ func (q *quotesStore) Consume(ctx context.Context, quoteRef, ledgerTxnRef string
|
|||||||
|
|
||||||
func (q *quotesStore) ExpireIssuedBefore(ctx context.Context, cutoff time.Time) (int, error) {
|
func (q *quotesStore) ExpireIssuedBefore(ctx context.Context, cutoff time.Time) (int, error) {
|
||||||
if cutoff.IsZero() {
|
if cutoff.IsZero() {
|
||||||
q.logger.Warn("attempt to expire quotes with zero cutoff")
|
q.logger.Warn("Attempt to expire quotes with zero cutoff")
|
||||||
return 0, merrors.InvalidArgument("quotesStore: cutoff time is zero")
|
return 0, merrors.InvalidArgument("quotesStore: cutoff time is zero")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -188,11 +188,11 @@ func (q *quotesStore) ExpireIssuedBefore(ctx context.Context, cutoff time.Time)
|
|||||||
|
|
||||||
updated, err := q.repo.PatchMany(ctx, filter, patch)
|
updated, err := q.repo.PatchMany(ctx, filter, patch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
q.logger.Error("failed to expire quotes", zap.Error(err))
|
q.logger.Warn("Failed to expire quotes", zap.Error(err))
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
if updated > 0 {
|
if updated > 0 {
|
||||||
q.logger.Info("quotes expired", zap.Int("count", updated))
|
q.logger.Info("Quotes expired", zap.Int("count", updated))
|
||||||
}
|
}
|
||||||
return updated, nil
|
return updated, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,11 +51,11 @@ func NewRates(logger mlogger.Logger, db *mongo.Database) (storage.RatesStore, er
|
|||||||
|
|
||||||
for _, def := range indexes {
|
for _, def := range indexes {
|
||||||
if err := repo.CreateIndex(def); err != nil {
|
if err := repo.CreateIndex(def); err != nil {
|
||||||
logger.Error("failed to ensure rates index", zap.Error(err))
|
logger.Error("Failed to ensure rates index", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.Debug("rates store initialised", zap.String("collection", model.RatesCollection))
|
logger.Debug("Rates store initialised", zap.String("collection", model.RatesCollection))
|
||||||
return &ratesStore{
|
return &ratesStore{
|
||||||
logger: logger.Named(model.RatesCollection),
|
logger: logger.Named(model.RatesCollection),
|
||||||
repo: repo,
|
repo: repo,
|
||||||
@@ -64,11 +64,11 @@ func NewRates(logger mlogger.Logger, db *mongo.Database) (storage.RatesStore, er
|
|||||||
|
|
||||||
func (r *ratesStore) UpsertSnapshot(ctx context.Context, snapshot *model.RateSnapshot) error {
|
func (r *ratesStore) UpsertSnapshot(ctx context.Context, snapshot *model.RateSnapshot) error {
|
||||||
if snapshot == nil {
|
if snapshot == nil {
|
||||||
r.logger.Warn("attempt to upsert nil snapshot")
|
r.logger.Warn("Attempt to upsert nil snapshot")
|
||||||
return merrors.InvalidArgument("ratesStore: nil snapshot")
|
return merrors.InvalidArgument("ratesStore: nil snapshot")
|
||||||
}
|
}
|
||||||
if snapshot.RateRef == "" {
|
if snapshot.RateRef == "" {
|
||||||
r.logger.Warn("attempt to upsert snapshot with empty rate_ref")
|
r.logger.Warn("Attempt to upsert snapshot with empty rate_ref")
|
||||||
return merrors.InvalidArgument("ratesStore: empty rateRef")
|
return merrors.InvalidArgument("ratesStore: empty rateRef")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,17 +82,17 @@ func (r *ratesStore) UpsertSnapshot(ctx context.Context, snapshot *model.RateSna
|
|||||||
err := r.repo.FindOneByFilter(ctx, filter, existing)
|
err := r.repo.FindOneByFilter(ctx, filter, existing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, merrors.ErrNoData) {
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
r.logger.Debug("inserting new rate snapshot", zap.String("rate_ref", snapshot.RateRef))
|
r.logger.Debug("Inserting new rate snapshot", zap.String("rate_ref", snapshot.RateRef))
|
||||||
return r.repo.Insert(ctx, snapshot, filter)
|
return r.repo.Insert(ctx, snapshot, filter)
|
||||||
}
|
}
|
||||||
r.logger.Error("failed to query rate snapshot", zap.Error(err), zap.String("rate_ref", snapshot.RateRef))
|
r.logger.Warn("Failed to query rate snapshot", zap.Error(err), zap.String("rate_ref", snapshot.RateRef))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if existing.GetID() != nil {
|
if existing.GetID() != nil {
|
||||||
snapshot.SetID(*existing.GetID())
|
snapshot.SetID(*existing.GetID())
|
||||||
}
|
}
|
||||||
r.logger.Debug("updating rate snapshot", zap.String("rate_ref", snapshot.RateRef))
|
r.logger.Debug("Updating rate snapshot", zap.String("rate_ref", snapshot.RateRef))
|
||||||
return r.repo.Update(ctx, snapshot)
|
return r.repo.Update(ctx, snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user