package store import ( "context" "errors" "time" "github.com/tech/sendico/fx/storage" "github.com/tech/sendico/fx/storage/model" "github.com/tech/sendico/pkg/db/repository" "github.com/tech/sendico/pkg/db/repository/builder" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/db/transaction" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" ) type quotesStore struct { logger mlogger.Logger repo repository.Repository txFactory transaction.Factory } func NewQuotes(logger mlogger.Logger, db *mongo.Database, txFactory transaction.Factory) (storage.QuotesStore, error) { repo := repository.CreateMongoRepository(db, model.QuotesCollection) indexes := []*ri.Definition{ { Keys: []ri.Key{ {Field: "quoteRef", Sort: ri.Asc}, }, Unique: true, }, { Keys: []ri.Key{ {Field: "status", Sort: ri.Asc}, {Field: "expiresAtUnixMs", Sort: ri.Asc}, }, }, { Keys: []ri.Key{ {Field: "consumedByLedgerTxnRef", Sort: ri.Asc}, }, }, } ttlSeconds := int32(0) indexes = append(indexes, &ri.Definition{ Keys: []ri.Key{ {Field: "expiresAt", Sort: ri.Asc}, }, TTL: &ttlSeconds, Name: "quotes_expires_at_ttl", }) for _, def := range indexes { if err := repo.CreateIndex(def); err != nil { logger.Error("Failed to ensure quotes index", zap.Error(err)) return nil, err } } childLogger := logger.Named(model.QuotesCollection) childLogger.Debug("Quotes store initialised", zap.String("collection", model.QuotesCollection)) return "esStore{ logger: childLogger, repo: repo, txFactory: txFactory, }, nil } func (q *quotesStore) Issue(ctx context.Context, quote *model.Quote) error { if quote == nil { q.logger.Warn("Attempt to issue nil quote") return merrors.InvalidArgument("quotesStore: nil quote") } if quote.QuoteRef == "" { q.logger.Warn("Attempt to issue quote with empty ref") return merrors.InvalidArgument("quotesStore: empty quoteRef") } if quote.ExpiresAtUnixMs > 0 && quote.ExpiresAt == nil { expiry := time.UnixMilli(quote.ExpiresAtUnixMs) quote.ExpiresAt = &expiry } quote.Status = model.QuoteStatusIssued quote.ConsumedByLedgerTxnRef = "" quote.ConsumedAtUnixMs = nil if err := q.repo.Insert(ctx, quote, repository.Filter("quoteRef", quote.QuoteRef)); err != nil { q.logger.Warn("Failed to insert quote", zap.Error(err), zap.String("quote_ref", quote.QuoteRef)) return err } q.logger.Debug("Quote issued", zap.String("quote_ref", quote.QuoteRef), zap.Bool("firm", quote.Firm)) return nil } func (q *quotesStore) GetByRef(ctx context.Context, quoteRef string) (*model.Quote, error) { if quoteRef == "" { q.logger.Warn("Attempt to fetch quote with empty ref") return nil, merrors.InvalidArgument("quotesStore: empty quoteRef") } quote := &model.Quote{} if err := q.repo.FindOneByFilter(ctx, repository.Filter("quoteRef", quoteRef), quote); err != nil { if errors.Is(err, merrors.ErrNoData) { q.logger.Debug("Quote not found", zap.String("quote_ref", quoteRef)) } return nil, err } q.logger.Debug("Quote loaded", zap.String("quote_ref", quoteRef), zap.String("status", string(quote.Status))) return quote, nil } func (q *quotesStore) Consume(ctx context.Context, quoteRef, ledgerTxnRef string, when time.Time) (*model.Quote, error) { if quoteRef == "" || ledgerTxnRef == "" { q.logger.Warn("Attempt to consume quote with missing identifiers", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef)) return nil, merrors.InvalidArgument("quotesStore: missing identifiers") } if when.IsZero() { when = time.Now() } q.logger.Debug("Consuming quote", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef)) txn := q.txFactory.CreateTransaction() result, err := txn.Execute(ctx, func(txCtx context.Context) (any, error) { quote := &model.Quote{} if err := q.repo.FindOneByFilter(txCtx, repository.Filter("quoteRef", quoteRef), quote); err != nil { return nil, err } if !quote.Firm { q.logger.Warn("Quote not firm", zap.String("quote_ref", quoteRef)) return nil, storage.ErrQuoteNotFirm } if quote.Status == model.QuoteStatusExpired || quote.IsExpired(when) { quote.MarkExpired() if err := q.repo.Update(txCtx, quote); err != nil { return nil, err } q.logger.Info("Quote expired during consume", zap.String("quote_ref", quoteRef)) return nil, storage.ErrQuoteExpired } if quote.Status == model.QuoteStatusConsumed { if quote.ConsumedByLedgerTxnRef == ledgerTxnRef { q.logger.Debug("Quote already consumed by ledger", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef)) return quote, nil } q.logger.Warn("Quote consumed by different ledger", zap.String("quote_ref", quoteRef), zap.String("existing_ledger_ref", quote.ConsumedByLedgerTxnRef)) return nil, storage.ErrQuoteConsumed } quote.MarkConsumed(ledgerTxnRef, when) if err := q.repo.Update(txCtx, quote); err != nil { return nil, err } q.logger.Info("Quote consumed", zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef)) return quote, nil }) if err != nil { q.logger.Warn("Quote consumption failed", zap.Error(err), zap.String("quote_ref", quoteRef), zap.String("ledger_ref", ledgerTxnRef)) return nil, err } quote, _ := result.(*model.Quote) if quote == nil { return nil, merrors.Internal("quotesStore: transaction returned nil quote") } return quote, nil } func (q *quotesStore) ExpireIssuedBefore(ctx context.Context, cutoff time.Time) (int, error) { if cutoff.IsZero() { q.logger.Warn("Attempt to expire quotes with zero cutoff") return 0, merrors.InvalidArgument("quotesStore: cutoff time is zero") } filter := repository.Query(). Filter(repository.Field("status"), model.QuoteStatusIssued). Comparison(repository.Field("expiresAtUnixMs"), builder.Lt, cutoff.UnixMilli()) patch := repository.Patch(). Set(repository.Field("status"), model.QuoteStatusExpired). Unset(repository.Field("consumedByLedgerTxnRef")). Unset(repository.Field("consumedAtUnixMs")) updated, err := q.repo.PatchMany(ctx, filter, patch) if err != nil { q.logger.Warn("Failed to expire quotes", zap.Error(err)) return 0, err } if updated > 0 { q.logger.Info("Quotes expired", zap.Int("count", updated)) } return updated, nil }