199 lines
6.3 KiB
Go
199 lines
6.3 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/fx/storage"
|
|
"github.com/tech/sendico/fx/storage/model"
|
|
"github.com/tech/sendico/pkg/db/repository"
|
|
"github.com/tech/sendico/pkg/db/repository/builder"
|
|
ri "github.com/tech/sendico/pkg/db/repository/index"
|
|
"github.com/tech/sendico/pkg/db/transaction"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type quotesStore struct {
|
|
logger mlogger.Logger
|
|
repo repository.Repository
|
|
txFactory transaction.Factory
|
|
}
|
|
|
|
func NewQuotes(logger mlogger.Logger, db *mongo.Database, txFactory transaction.Factory) (storage.QuotesStore, error) {
|
|
repo := repository.CreateMongoRepository(db, model.QuotesCollection)
|
|
indexes := []*ri.Definition{
|
|
{
|
|
Keys: []ri.Key{
|
|
{Field: "quoteRef", Sort: ri.Asc},
|
|
},
|
|
Unique: true,
|
|
},
|
|
{
|
|
Keys: []ri.Key{
|
|
{Field: "status", Sort: ri.Asc},
|
|
{Field: "expiresAtUnixMs", Sort: ri.Asc},
|
|
},
|
|
},
|
|
{
|
|
Keys: []ri.Key{
|
|
{Field: "consumedByLedgerTxnRef", Sort: ri.Asc},
|
|
},
|
|
},
|
|
}
|
|
|
|
ttlSeconds := int32(0)
|
|
indexes = append(indexes, &ri.Definition{
|
|
Keys: []ri.Key{
|
|
{Field: "expiresAt", Sort: ri.Asc},
|
|
},
|
|
TTL: &ttlSeconds,
|
|
Name: "quotes_expires_at_ttl",
|
|
})
|
|
|
|
for _, def := range indexes {
|
|
if err := repo.CreateIndex(def); err != nil {
|
|
logger.Error("Failed to ensure quotes index", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
}
|
|
childLogger := logger.Named(model.QuotesCollection)
|
|
childLogger.Debug("Quotes store initialised", zap.String("collection", model.QuotesCollection))
|
|
|
|
return "esStore{
|
|
logger: childLogger,
|
|
repo: repo,
|
|
txFactory: txFactory,
|
|
}, nil
|
|
}
|
|
|
|
func (q *quotesStore) Issue(ctx context.Context, quote *model.Quote) error {
|
|
if quote == nil {
|
|
q.logger.Warn("Attempt to issue nil quote")
|
|
return merrors.InvalidArgument("quotesStore: nil quote")
|
|
}
|
|
if quote.QuoteRef == "" {
|
|
q.logger.Warn("Attempt to issue quote with empty ref")
|
|
return merrors.InvalidArgument("quotesStore: empty quoteRef")
|
|
}
|
|
|
|
if quote.ExpiresAtUnixMs > 0 && quote.ExpiresAt == nil {
|
|
expiry := time.UnixMilli(quote.ExpiresAtUnixMs)
|
|
quote.ExpiresAt = &expiry
|
|
}
|
|
|
|
quote.Status = model.QuoteStatusIssued
|
|
quote.ConsumedByLedgerTxnRef = ""
|
|
quote.ConsumedAtUnixMs = nil
|
|
if err := q.repo.Insert(ctx, quote, repository.Filter("quoteRef", quote.QuoteRef)); err != nil {
|
|
q.logger.Warn("Failed to insert quote", zap.Error(err), zap.String("quote_ref", quote.QuoteRef))
|
|
return err
|
|
}
|
|
q.logger.Debug("Quote issued", zap.String("quote_ref", quote.QuoteRef), zap.Bool("firm", quote.Firm))
|
|
return nil
|
|
}
|
|
|
|
func (q *quotesStore) GetByRef(ctx context.Context, quoteRef string) (*model.Quote, error) {
|
|
if quoteRef == "" {
|
|
q.logger.Warn("Attempt to fetch quote with empty ref")
|
|
return nil, merrors.InvalidArgument("quotesStore: empty quoteRef")
|
|
}
|
|
quote := &model.Quote{}
|
|
if err := q.repo.FindOneByFilter(ctx, repository.Filter("quoteRef", quoteRef), quote); err != nil {
|
|
if errors.Is(err, merrors.ErrNoData) {
|
|
q.logger.Debug("Quote not found", zap.String("quote_ref", quoteRef))
|
|
}
|
|
return nil, err
|
|
}
|
|
q.logger.Debug("Quote loaded", zap.String("quote_ref", quoteRef), zap.String("status", string(quote.Status)))
|
|
return quote, nil
|
|
}
|
|
|
|
func (q *quotesStore) Consume(ctx context.Context, quoteRef, ledgerTxnRef string, when time.Time) (*model.Quote, error) {
|
|
if quoteRef == "" || ledgerTxnRef == "" {
|
|
q.logger.Warn("Attempt to consume quote with missing identifiers", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
|
return nil, merrors.InvalidArgument("quotesStore: missing identifiers")
|
|
}
|
|
|
|
if when.IsZero() {
|
|
when = time.Now()
|
|
}
|
|
|
|
q.logger.Debug("Consuming quote", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
|
txn := q.txFactory.CreateTransaction()
|
|
result, err := txn.Execute(ctx, func(txCtx context.Context) (any, error) {
|
|
quote := &model.Quote{}
|
|
if err := q.repo.FindOneByFilter(txCtx, repository.Filter("quoteRef", quoteRef), quote); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if !quote.Firm {
|
|
q.logger.Warn("Quote not firm", zap.String("quote_ref", quoteRef))
|
|
return nil, storage.ErrQuoteNotFirm
|
|
}
|
|
|
|
if quote.Status == model.QuoteStatusExpired || quote.IsExpired(when) {
|
|
quote.MarkExpired()
|
|
if err := q.repo.Update(txCtx, quote); err != nil {
|
|
return nil, err
|
|
}
|
|
q.logger.Info("Quote expired during consume", zap.String("quote_ref", quoteRef))
|
|
return nil, storage.ErrQuoteExpired
|
|
}
|
|
|
|
if quote.Status == model.QuoteStatusConsumed {
|
|
if quote.ConsumedByLedgerTxnRef == ledgerTxnRef {
|
|
q.logger.Debug("Quote already consumed by ledger", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
|
return quote, nil
|
|
}
|
|
q.logger.Warn("Quote consumed by different ledger", zap.String("quote_ref", quoteRef), zap.String("existing_ledger_ref", quote.ConsumedByLedgerTxnRef))
|
|
return nil, storage.ErrQuoteConsumed
|
|
}
|
|
|
|
quote.MarkConsumed(ledgerTxnRef, when)
|
|
if err := q.repo.Update(txCtx, quote); err != nil {
|
|
return nil, err
|
|
}
|
|
q.logger.Info("Quote consumed", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
|
return quote, nil
|
|
})
|
|
if err != nil {
|
|
q.logger.Warn("Quote consumption failed", zap.Error(err), zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef))
|
|
return nil, err
|
|
}
|
|
quote, _ := result.(*model.Quote)
|
|
if quote == nil {
|
|
return nil, merrors.Internal("quotesStore: transaction returned nil quote")
|
|
}
|
|
return quote, nil
|
|
}
|
|
|
|
func (q *quotesStore) ExpireIssuedBefore(ctx context.Context, cutoff time.Time) (int, error) {
|
|
if cutoff.IsZero() {
|
|
q.logger.Warn("Attempt to expire quotes with zero cutoff")
|
|
return 0, merrors.InvalidArgument("quotesStore: cutoff time is zero")
|
|
}
|
|
|
|
filter := repository.Query().
|
|
Filter(repository.Field("status"), model.QuoteStatusIssued).
|
|
Comparison(repository.Field("expiresAtUnixMs"), builder.Lt, cutoff.UnixMilli())
|
|
|
|
patch := repository.Patch().
|
|
Set(repository.Field("status"), model.QuoteStatusExpired).
|
|
Unset(repository.Field("consumedByLedgerTxnRef")).
|
|
Unset(repository.Field("consumedAtUnixMs"))
|
|
|
|
updated, err := q.repo.PatchMany(ctx, filter, patch)
|
|
if err != nil {
|
|
q.logger.Warn("Failed to expire quotes", zap.Error(err))
|
|
return 0, err
|
|
}
|
|
if updated > 0 {
|
|
q.logger.Info("Quotes expired", zap.Int("count", updated))
|
|
}
|
|
return updated, nil
|
|
}
|