From 51514159f523daaa37802baa1832d6b80495e828 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Fri, 20 Feb 2026 17:19:31 +0100 Subject: [PATCH] set 10 min quotations timeout --- api/fx/oracle/config.dev.yml | 2 + api/fx/oracle/config.yml | 2 + .../internal/server/internal/serverimp.go | 33 ++++++-- .../oracle/internal/service/oracle/service.go | 58 +++++++++++--- .../internal/service/oracle/service_test.go | 56 +++++++++++++ api/payments/orchestrator/config.dev.yml | 2 + api/payments/orchestrator/config.yml | 2 + .../internal/server/internal/config.go | 13 ++++ .../internal/server/internal/dependencies.go | 1 + .../internal/service/orchestrator/options.go | 9 +++ .../internal/service/orchestrator/service.go | 23 ++++-- .../service/orchestrator/service_helpers.go | 22 +++++- .../orchestrator/service_helpers_test.go | 78 +++++++++++++++++++ .../internal/service/quotation/service.go | 4 +- 14 files changed, 279 insertions(+), 26 deletions(-) diff --git a/api/fx/oracle/config.dev.yml b/api/fx/oracle/config.dev.yml index a5b8aa57..b8b3b7c8 100644 --- a/api/fx/oracle/config.dev.yml +++ b/api/fx/oracle/config.dev.yml @@ -11,6 +11,8 @@ grpc: metrics: address: ":9400" +max_quote_ttl_ms: 600000 + database: driver: mongodb settings: diff --git a/api/fx/oracle/config.yml b/api/fx/oracle/config.yml index 452e6006..8f86ade4 100644 --- a/api/fx/oracle/config.yml +++ b/api/fx/oracle/config.yml @@ -11,6 +11,8 @@ grpc: metrics: address: ":9400" +max_quote_ttl_ms: 600000 + database: driver: mongodb settings: diff --git a/api/fx/oracle/internal/server/internal/serverimp.go b/api/fx/oracle/internal/server/internal/serverimp.go index 1b39de72..5d6313a5 100644 --- a/api/fx/oracle/internal/server/internal/serverimp.go +++ b/api/fx/oracle/internal/server/internal/serverimp.go @@ -22,11 +22,28 @@ type Imp struct { file string debug bool - config *grpcapp.Config + config *config app *grpcapp.App[storage.Repository] service *oracle.Service } +type config struct { + *grpcapp.Config `yaml:",inline"` + MaxQuoteTTLMs int64 `yaml:"max_quote_ttl_ms"` +} + +const ( + defaultMaxQuoteTTL = 10 * time.Minute + defaultMaxQuoteTTLMillis = int64(defaultMaxQuoteTTL / time.Millisecond) +) + +func (c *config) maxQuoteTTLMillis() int64 { + if c == nil || c.MaxQuoteTTLMs <= 0 { + return defaultMaxQuoteTTLMillis + } + return c.MaxQuoteTTLMs +} + func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) { return &Imp{ logger: logger.Named("server"), @@ -63,12 +80,18 @@ func (i *Imp) Start() error { } serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) { - svc := oracle.NewService(logger, repo, producer, cfg.GRPC.DiscoveryInvokeURI()) + svc := oracle.NewService( + logger, + repo, + producer, + cfg.GRPC.DiscoveryInvokeURI(), + oracle.WithMaxQuoteTTLMillis(cfg.maxQuoteTTLMillis()), + ) i.service = svc return svc, nil } - app, err := grpcapp.NewApp(i.logger, "fx", cfg, i.debug, repoFactory, serviceFactory) + app, err := grpcapp.NewApp(i.logger, "fx", cfg.Config, i.debug, repoFactory, serviceFactory) if err != nil { return err } @@ -77,14 +100,14 @@ func (i *Imp) Start() error { return i.app.Start() } -func (i *Imp) loadConfig() (*grpcapp.Config, error) { +func (i *Imp) loadConfig() (*config, error) { data, err := os.ReadFile(i.file) if err != nil { i.logger.Error("Could not read configuration file", zap.String("config_file", i.file), zap.Error(err)) return nil, err } - cfg := &grpcapp.Config{} + cfg := &config{Config: &grpcapp.Config{}} if err := yaml.Unmarshal(data, cfg); err != nil { i.logger.Error("Failed to parse configuration", zap.Error(err)) return nil, err diff --git a/api/fx/oracle/internal/service/oracle/service.go b/api/fx/oracle/internal/service/oracle/service.go index 496d889a..195df8aa 100644 --- a/api/fx/oracle/internal/service/oracle/service.go +++ b/api/fx/oracle/internal/service/oracle/service.go @@ -28,6 +28,14 @@ func (e serviceError) Error() string { return string(e) } +const ( + defaultMaxQuoteTTL = 10 * time.Minute + defaultMaxQuoteTTLMillis = int64(defaultMaxQuoteTTL / time.Millisecond) +) + +// Option configures oracle service behavior. +type Option func(*Service) + var ( errSideRequired = serviceError("oracle: side is required") errAmountsMutuallyExclusive = serviceError("oracle: exactly one amount must be provided") @@ -38,21 +46,40 @@ var ( ) type Service struct { - logger mlogger.Logger - storage storage.Repository - producer pmessaging.Producer - announcer *discovery.Announcer - invokeURI string + logger mlogger.Logger + storage storage.Repository + producer pmessaging.Producer + announcer *discovery.Announcer + invokeURI string + maxQuoteTTLMillis int64 oraclev1.UnimplementedOracleServer } -func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, invokeURI string) *Service { +// WithMaxQuoteTTLMillis caps firm quote TTL requests to the supplied number of milliseconds. +func WithMaxQuoteTTLMillis(value int64) Option { + return func(s *Service) { + if value > 0 { + s.maxQuoteTTLMillis = value + } + } +} + +func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, invokeURI string, opts ...Option) *Service { initMetrics() svc := &Service{ - logger: logger.Named("oracle"), - storage: repo, - producer: prod, - invokeURI: strings.TrimSpace(invokeURI), + logger: logger.Named("oracle"), + storage: repo, + producer: prod, + invokeURI: strings.TrimSpace(invokeURI), + maxQuoteTTLMillis: defaultMaxQuoteTTLMillis, + } + for _, opt := range opts { + if opt != nil { + opt(svc) + } + } + if svc.maxQuoteTTLMillis <= 0 { + svc.maxQuoteTTLMillis = defaultMaxQuoteTTLMillis } svc.startDiscoveryAnnouncer() return svc @@ -222,7 +249,16 @@ func (s *Service) getQuoteResponder(ctx context.Context, req *oraclev1.GetQuoteR expiresAt := int64(0) if req.GetFirm() { - expiry, err := computeExpiry(now, req.GetTtlMs()) + ttlMs := req.GetTtlMs() + if ttlMs > s.maxQuoteTTLMillis { + logger.Info( + "Clamping requested firm quote ttl to configured maximum", + zap.Int64("requested_ttl_ms", ttlMs), + zap.Int64("max_ttl_ms", s.maxQuoteTTLMillis), + ) + ttlMs = s.maxQuoteTTLMillis + } + expiry, err := computeExpiry(now, ttlMs) if err != nil { return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } diff --git a/api/fx/oracle/internal/service/oracle/service_test.go b/api/fx/oracle/internal/service/oracle/service_test.go index a8c5f2de..edddd0c4 100644 --- a/api/fx/oracle/internal/service/oracle/service_test.go +++ b/api/fx/oracle/internal/service/oracle/service_test.go @@ -181,6 +181,62 @@ func TestServiceGetQuoteFirm(t *testing.T) { } } +func TestServiceGetQuoteFirm_ClampsTTLToConfiguredMax(t *testing.T) { + const ( + configuredMaxTTL = 1 * time.Second + requestedTTL = 1 * time.Minute + ) + + repo := &repositoryStub{} + repo.pairs = &pairStoreStub{ + getFn: func(ctx context.Context, pair model.CurrencyPair) (*model.Pair, error) { + return &model.Pair{ + Pair: pair, + BaseMeta: model.CurrencySettings{Code: pair.Base, Decimals: 2, Rounding: model.RoundingModeHalfEven}, + QuoteMeta: model.CurrencySettings{Code: pair.Quote, Decimals: 2, Rounding: model.RoundingModeHalfEven}, + }, nil + }, + } + repo.rates = &ratesStoreStub{ + latestFn: func(ctx context.Context, pair model.CurrencyPair, provider string) (*model.RateSnapshot, error) { + return &model.RateSnapshot{ + Pair: pair, + Provider: provider, + Ask: "1.10", + Bid: "1.08", + RateRef: "rate#1", + AsOfUnixMs: time.Now().UnixMilli(), + }, nil + }, + } + repo.quotes = "esStoreStub{} + repo.currencies = currencyStoreStub{} + + svc := NewService(zap.NewNop(), repo, nil, "", WithMaxQuoteTTLMillis(int64(configuredMaxTTL/time.Millisecond))) + start := time.Now() + + resp, err := svc.GetQuote(context.Background(), &oraclev1.GetQuoteRequest{ + Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"}, + Side: fxv1.Side_BUY_BASE_SELL_QUOTE, + AmountInput: &oraclev1.GetQuoteRequest_BaseAmount{BaseAmount: &moneyv1.Money{ + Currency: "USD", + Amount: "100", + }}, + Firm: true, + TtlMs: int64(requestedTTL / time.Millisecond), + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + expiry := time.UnixMilli(resp.GetQuote().GetExpiresAtUnixMs()) + if expiry.Before(start) { + t.Fatalf("expected expiry after request start, got %s", expiry) + } + if expiry.After(start.Add(5 * time.Second)) { + t.Fatalf("expected clamped expiry close to 1s max ttl, got %s", expiry) + } +} + func TestServiceGetQuoteRateNotFound(t *testing.T) { repo := &repositoryStub{ pairs: &pairStoreStub{ diff --git a/api/payments/orchestrator/config.dev.yml b/api/payments/orchestrator/config.dev.yml index 1ba39798..e75ea97e 100644 --- a/api/payments/orchestrator/config.dev.yml +++ b/api/payments/orchestrator/config.dev.yml @@ -38,6 +38,8 @@ messaging: # Retain quote records after expiry to allow long-running payments to complete. quote_retention_hours: 72 +max_fx_quote_ttl_ms: 600000 + # Service endpoints are sourced from discovery; no static overrides. card_gateways: monetix: diff --git a/api/payments/orchestrator/config.yml b/api/payments/orchestrator/config.yml index 72c47f58..e7fb1e14 100644 --- a/api/payments/orchestrator/config.yml +++ b/api/payments/orchestrator/config.yml @@ -38,6 +38,8 @@ messaging: # Retain quote records after expiry to allow long-running payments to complete. quote_retention_hours: 72 +max_fx_quote_ttl_ms: 600000 + # Service endpoints are sourced from discovery; no static overrides. card_gateways: monetix: diff --git a/api/payments/orchestrator/internal/server/internal/config.go b/api/payments/orchestrator/internal/server/internal/config.go index 267e95ec..5ca78f2c 100644 --- a/api/payments/orchestrator/internal/server/internal/config.go +++ b/api/payments/orchestrator/internal/server/internal/config.go @@ -23,6 +23,7 @@ type config struct { FeeAccounts map[string]string `yaml:"fee_ledger_accounts"` GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"` QuoteRetentionHrs int `yaml:"quote_retention_hours"` + MaxFXQuoteTTLMs int64 `yaml:"max_fx_quote_ttl_ms"` } type clientConfig struct { @@ -78,6 +79,11 @@ type limitsOverrideCfg struct { MaxOps int `yaml:"max_ops"` } +const ( + defaultMaxFXQuoteTTL = 10 * time.Minute + defaultMaxFXQuoteTTLMillis = int64(defaultMaxFXQuoteTTL / time.Millisecond) +) + func (c clientConfig) callTimeout() time.Duration { if c.CallTimeoutSecs <= 0 { return 3 * time.Second @@ -92,6 +98,13 @@ func (c *config) quoteRetention() time.Duration { return time.Duration(c.QuoteRetentionHrs) * time.Hour } +func (c *config) maxFXQuoteTTLMillis() int64 { + if c == nil || c.MaxFXQuoteTTLMs <= 0 { + return defaultMaxFXQuoteTTLMillis + } + return c.MaxFXQuoteTTLMs +} + func (i *Imp) loadConfig() (*config, error) { data, err := os.ReadFile(i.file) if err != nil { diff --git a/api/payments/orchestrator/internal/server/internal/dependencies.go b/api/payments/orchestrator/internal/server/internal/dependencies.go index 2bccd833..ad919c0c 100644 --- a/api/payments/orchestrator/internal/server/internal/dependencies.go +++ b/api/payments/orchestrator/internal/server/internal/dependencies.go @@ -55,6 +55,7 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest if deps.quotationClient != nil { opts = append(opts, orchestrator.WithQuotationService(deps.quotationClient)) } + opts = append(opts, orchestrator.WithMaxFXQuoteTTLMillis(cfg.maxFXQuoteTTLMillis())) if deps.gatewayInvokeResolver != nil { opts = append(opts, orchestrator.WithGatewayInvokeResolver(deps.gatewayInvokeResolver)) } diff --git a/api/payments/orchestrator/internal/service/orchestrator/options.go b/api/payments/orchestrator/internal/service/orchestrator/options.go index 691e62bf..f404b9d2 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/options.go +++ b/api/payments/orchestrator/internal/service/orchestrator/options.go @@ -422,6 +422,15 @@ func WithClock(clock clockpkg.Clock) Option { } } +// WithMaxFXQuoteTTLMillis caps forwarded FX quote TTL requests. +func WithMaxFXQuoteTTLMillis(value int64) Option { + return func(s *Service) { + if value > 0 { + s.maxFXQuoteTTLMillis = value + } + } +} + func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainResolver GatewayInvokeResolver, providerResolver GatewayInvokeResolver, logger mlogger.Logger) railGatewayDependency { result := railGatewayDependency{ byID: map[string]rail.RailGateway{}, diff --git a/api/payments/orchestrator/internal/service/orchestrator/service.go b/api/payments/orchestrator/internal/service/orchestrator/service.go index e7314f6f..ace88660 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "time" "github.com/tech/sendico/payments/orchestrator/internal/service/plan_builder" "github.com/tech/sendico/payments/storage" @@ -23,6 +24,11 @@ func (e serviceError) Error() string { return string(e) } +const ( + defaultMaxFXQuoteTTL = 10 * time.Minute + defaultMaxFXQuoteTTLMillis = int64(defaultMaxFXQuoteTTL / time.Millisecond) +) + var ( errStorageUnavailable = serviceError("payments.orchestrator: storage not initialised") errQuotationUnavailable = serviceError("payments.orchestrator: quotation service not configured") @@ -30,9 +36,10 @@ var ( // Service orchestrates payments across ledger, billing, FX, and chain domains. type Service struct { - logger mlogger.Logger - storage storage.Repository - clock clockpkg.Clock + logger mlogger.Logger + storage storage.Repository + clock clockpkg.Clock + maxFXQuoteTTLMillis int64 deps serviceDependencies h handlerSet @@ -72,9 +79,10 @@ type componentSet struct { // NewService constructs a payment orchestrator service. func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option) *Service { svc := &Service{ - logger: logger.Named("payment_orchestrator"), - storage: repo, - clock: clockpkg.NewSystem(), + logger: logger.Named("payment_orchestrator"), + storage: repo, + clock: clockpkg.NewSystem(), + maxFXQuoteTTLMillis: defaultMaxFXQuoteTTLMillis, } initMetrics() @@ -88,6 +96,9 @@ func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option) if svc.clock == nil { svc.clock = clockpkg.NewSystem() } + if svc.maxFXQuoteTTLMillis <= 0 { + svc.maxFXQuoteTTLMillis = defaultMaxFXQuoteTTLMillis + } engine := defaultPaymentEngine{svc: svc} svc.h.commands = newPaymentCommandFactory(engine, svc.logger) diff --git a/api/payments/orchestrator/internal/service/orchestrator/service_helpers.go b/api/payments/orchestrator/internal/service/orchestrator/service_helpers.go index 0b84a04f..ea3ebefa 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service_helpers.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service_helpers.go @@ -108,6 +108,23 @@ type quoteResolutionError struct { func (e quoteResolutionError) Error() string { return e.err.Error() } +func (s *Service) clampFXIntentTTL(intent *sharedv1.PaymentIntent) *sharedv1.PaymentIntent { + if intent == nil { + return nil + } + cloned, ok := proto.Clone(intent).(*sharedv1.PaymentIntent) + if !ok || cloned == nil { + return intent + } + if s == nil || s.maxFXQuoteTTLMillis <= 0 { + return cloned + } + if fx := cloned.GetFx(); fx != nil && fx.GetTtlMs() > s.maxFXQuoteTTLMillis { + fx.TtlMs = s.maxFXQuoteTTLMillis + } + return cloned +} + func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInput) (*sharedv1.PaymentQuote, *sharedv1.PaymentIntent, *model.PaymentPlan, error) { if ref := strings.TrimSpace(in.QuoteRef); ref != "" { quotesStore, err := ensureQuotesStore(s.storage) @@ -149,10 +166,11 @@ func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInp if in.Intent == nil { return nil, nil, nil, merrors.InvalidArgument("intent is required") } + intent := s.clampFXIntentTTL(in.Intent) req := "ationv1.QuotePaymentRequest{ Meta: in.Meta, IdempotencyKey: in.IdempotencyKey, - Intent: in.Intent, + Intent: intent, PreviewOnly: false, } if !s.deps.quotation.available() { @@ -175,7 +193,7 @@ func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInp OrgRef: in.OrgRef, OrgID: in.OrgID, Meta: in.Meta, - Intent: in.Intent, + Intent: intent, QuoteRef: ref, IdempotencyKey: in.IdempotencyKey, }) diff --git a/api/payments/orchestrator/internal/service/orchestrator/service_helpers_test.go b/api/payments/orchestrator/internal/service/orchestrator/service_helpers_test.go index 2f0208d1..3461166e 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service_helpers_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service_helpers_test.go @@ -13,6 +13,7 @@ import ( mloggerfactory "github.com/tech/sendico/pkg/mlogger/factory" "github.com/tech/sendico/pkg/model/account_role" paymenttypes "github.com/tech/sendico/pkg/payments/types" + fxv1 "github.com/tech/sendico/pkg/proto/common/fx/v1" moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" paymentv1 "github.com/tech/sendico/pkg/proto/common/payment/v1" connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" @@ -219,6 +220,83 @@ func TestResolvePaymentQuote_QuoteRefSkipsQuoteRecompute(t *testing.T) { } } +func TestResolvePaymentQuote_ClampsForwardedFXTTL(t *testing.T) { + const ( + requestedTTL = int64((15 * time.Minute) / time.Millisecond) + maxTTL = int64((10 * time.Minute) / time.Millisecond) + ) + + org := bson.NewObjectID() + intent := &sharedv1.PaymentIntent{ + Ref: "ref-1", + Amount: &moneyv1.Money{Currency: "USD", Amount: "1"}, + SettlementCurrency: "EUR", + Fx: &sharedv1.FXIntent{ + Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"}, + Side: fxv1.Side_SELL_BASE_BUY_QUOTE, + Firm: true, + TtlMs: requestedTTL, + }, + } + intent = protoIntentFromModel(intentFromProto(intent)) + intent.Fx.TtlMs = requestedTTL + + recordIntent := protoIntentFromModel(intentFromProto(intent)) + recordIntent.Fx.TtlMs = maxTTL + + var capturedTTLMs int64 + svc := &Service{ + storage: stubRepo{ + quotes: &helperQuotesStore{ + records: map[string]*model.PaymentQuoteRecord{ + "q1": { + QuoteRef: "q1", + Intent: intentFromProto(recordIntent), + Quote: &model.PaymentQuoteSnapshot{QuoteRef: "q1"}, + }, + }, + }, + }, + clock: clockpkg.NewSystem(), + maxFXQuoteTTLMillis: maxTTL, + deps: serviceDependencies{ + quotation: quotationDependency{ + client: &helperQuotationClient{ + quotePaymentFn: func(ctx context.Context, req *quotationv1.QuotePaymentRequest, opts ...grpc.CallOption) (*quotationv1.QuotePaymentResponse, error) { + capturedTTLMs = req.GetIntent().GetFx().GetTtlMs() + return "ationv1.QuotePaymentResponse{ + Quote: &sharedv1.PaymentQuote{ + QuoteRef: "q1", + }, + IdempotencyKey: req.GetIdempotencyKey(), + }, nil + }, + }, + }, + }, + } + + _, resolvedIntent, _, err := svc.resolvePaymentQuote(context.Background(), quoteResolutionInput{ + OrgRef: org.Hex(), + OrgID: org, + Meta: &sharedv1.RequestMeta{OrganizationRef: org.Hex()}, + Intent: intent, + IdempotencyKey: "idem-1", + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if capturedTTLMs != maxTTL { + t.Fatalf("expected forwarded ttl_ms to be clamped to %d, got %d", maxTTL, capturedTTLMs) + } + if intent.GetFx().GetTtlMs() != requestedTTL { + t.Fatalf("expected original intent ttl to stay unchanged, got %d", intent.GetFx().GetTtlMs()) + } + if resolvedIntent == nil || resolvedIntent.GetFx().GetTtlMs() != maxTTL { + t.Fatalf("expected resolved intent ttl to match stored clamped value") + } +} + func TestInitiatePaymentIdempotency(t *testing.T) { logger := mloggerfactory.NewLogger(false) org := bson.NewObjectID() diff --git a/api/payments/quotation/internal/service/quotation/service.go b/api/payments/quotation/internal/service/quotation/service.go index 418f9aba..90512bac 100644 --- a/api/payments/quotation/internal/service/quotation/service.go +++ b/api/payments/quotation/internal/service/quotation/service.go @@ -21,8 +21,8 @@ func (e serviceError) Error() string { } const ( - defaultFeeQuoteTTLMillis int64 = 120000 - defaultOracleTTLMillis int64 = 60000 + defaultFeeQuoteTTLMillis int64 = 600000 + defaultOracleTTLMillis int64 = 600000 ) var (