set 10 min quotations timeout #545
@@ -11,6 +11,8 @@ grpc:
|
|||||||
metrics:
|
metrics:
|
||||||
address: ":9400"
|
address: ":9400"
|
||||||
|
|
||||||
|
max_quote_ttl_ms: 600000
|
||||||
|
|
||||||
database:
|
database:
|
||||||
driver: mongodb
|
driver: mongodb
|
||||||
settings:
|
settings:
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ grpc:
|
|||||||
metrics:
|
metrics:
|
||||||
address: ":9400"
|
address: ":9400"
|
||||||
|
|
||||||
|
max_quote_ttl_ms: 600000
|
||||||
|
|
||||||
database:
|
database:
|
||||||
driver: mongodb
|
driver: mongodb
|
||||||
settings:
|
settings:
|
||||||
|
|||||||
@@ -22,11 +22,28 @@ type Imp struct {
|
|||||||
file string
|
file string
|
||||||
debug bool
|
debug bool
|
||||||
|
|
||||||
config *grpcapp.Config
|
config *config
|
||||||
app *grpcapp.App[storage.Repository]
|
app *grpcapp.App[storage.Repository]
|
||||||
service *oracle.Service
|
service *oracle.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type config struct {
|
||||||
|
*grpcapp.Config `yaml:",inline"`
|
||||||
|
MaxQuoteTTLMs int64 `yaml:"max_quote_ttl_ms"`
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultMaxQuoteTTL = 10 * time.Minute
|
||||||
|
defaultMaxQuoteTTLMillis = int64(defaultMaxQuoteTTL / time.Millisecond)
|
||||||
|
)
|
||||||
|
|
||||||
|
func (c *config) maxQuoteTTLMillis() int64 {
|
||||||
|
if c == nil || c.MaxQuoteTTLMs <= 0 {
|
||||||
|
return defaultMaxQuoteTTLMillis
|
||||||
|
}
|
||||||
|
return c.MaxQuoteTTLMs
|
||||||
|
}
|
||||||
|
|
||||||
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
|
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
|
||||||
return &Imp{
|
return &Imp{
|
||||||
logger: logger.Named("server"),
|
logger: logger.Named("server"),
|
||||||
@@ -63,12 +80,18 @@ func (i *Imp) Start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
|
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
|
||||||
svc := oracle.NewService(logger, repo, producer, cfg.GRPC.DiscoveryInvokeURI())
|
svc := oracle.NewService(
|
||||||
|
logger,
|
||||||
|
repo,
|
||||||
|
producer,
|
||||||
|
cfg.GRPC.DiscoveryInvokeURI(),
|
||||||
|
oracle.WithMaxQuoteTTLMillis(cfg.maxQuoteTTLMillis()),
|
||||||
|
)
|
||||||
i.service = svc
|
i.service = svc
|
||||||
return svc, nil
|
return svc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
app, err := grpcapp.NewApp(i.logger, "fx", cfg, i.debug, repoFactory, serviceFactory)
|
app, err := grpcapp.NewApp(i.logger, "fx", cfg.Config, i.debug, repoFactory, serviceFactory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -77,14 +100,14 @@ func (i *Imp) Start() error {
|
|||||||
return i.app.Start()
|
return i.app.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Imp) loadConfig() (*grpcapp.Config, error) {
|
func (i *Imp) loadConfig() (*config, error) {
|
||||||
data, err := os.ReadFile(i.file)
|
data, err := os.ReadFile(i.file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
i.logger.Error("Could not read configuration file", zap.String("config_file", i.file), zap.Error(err))
|
i.logger.Error("Could not read configuration file", zap.String("config_file", i.file), zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg := &grpcapp.Config{}
|
cfg := &config{Config: &grpcapp.Config{}}
|
||||||
if err := yaml.Unmarshal(data, cfg); err != nil {
|
if err := yaml.Unmarshal(data, cfg); err != nil {
|
||||||
i.logger.Error("Failed to parse configuration", zap.Error(err))
|
i.logger.Error("Failed to parse configuration", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -28,6 +28,14 @@ func (e serviceError) Error() string {
|
|||||||
return string(e)
|
return string(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultMaxQuoteTTL = 10 * time.Minute
|
||||||
|
defaultMaxQuoteTTLMillis = int64(defaultMaxQuoteTTL / time.Millisecond)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Option configures oracle service behavior.
|
||||||
|
type Option func(*Service)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errSideRequired = serviceError("oracle: side is required")
|
errSideRequired = serviceError("oracle: side is required")
|
||||||
errAmountsMutuallyExclusive = serviceError("oracle: exactly one amount must be provided")
|
errAmountsMutuallyExclusive = serviceError("oracle: exactly one amount must be provided")
|
||||||
@@ -43,16 +51,35 @@ type Service struct {
|
|||||||
producer pmessaging.Producer
|
producer pmessaging.Producer
|
||||||
announcer *discovery.Announcer
|
announcer *discovery.Announcer
|
||||||
invokeURI string
|
invokeURI string
|
||||||
|
maxQuoteTTLMillis int64
|
||||||
oraclev1.UnimplementedOracleServer
|
oraclev1.UnimplementedOracleServer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, invokeURI string) *Service {
|
// WithMaxQuoteTTLMillis caps firm quote TTL requests to the supplied number of milliseconds.
|
||||||
|
func WithMaxQuoteTTLMillis(value int64) Option {
|
||||||
|
return func(s *Service) {
|
||||||
|
if value > 0 {
|
||||||
|
s.maxQuoteTTLMillis = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, invokeURI string, opts ...Option) *Service {
|
||||||
initMetrics()
|
initMetrics()
|
||||||
svc := &Service{
|
svc := &Service{
|
||||||
logger: logger.Named("oracle"),
|
logger: logger.Named("oracle"),
|
||||||
storage: repo,
|
storage: repo,
|
||||||
producer: prod,
|
producer: prod,
|
||||||
invokeURI: strings.TrimSpace(invokeURI),
|
invokeURI: strings.TrimSpace(invokeURI),
|
||||||
|
maxQuoteTTLMillis: defaultMaxQuoteTTLMillis,
|
||||||
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
if opt != nil {
|
||||||
|
opt(svc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if svc.maxQuoteTTLMillis <= 0 {
|
||||||
|
svc.maxQuoteTTLMillis = defaultMaxQuoteTTLMillis
|
||||||
}
|
}
|
||||||
svc.startDiscoveryAnnouncer()
|
svc.startDiscoveryAnnouncer()
|
||||||
return svc
|
return svc
|
||||||
@@ -222,7 +249,16 @@ func (s *Service) getQuoteResponder(ctx context.Context, req *oraclev1.GetQuoteR
|
|||||||
|
|
||||||
expiresAt := int64(0)
|
expiresAt := int64(0)
|
||||||
if req.GetFirm() {
|
if req.GetFirm() {
|
||||||
expiry, err := computeExpiry(now, req.GetTtlMs())
|
ttlMs := req.GetTtlMs()
|
||||||
|
if ttlMs > s.maxQuoteTTLMillis {
|
||||||
|
logger.Info(
|
||||||
|
"Clamping requested firm quote ttl to configured maximum",
|
||||||
|
zap.Int64("requested_ttl_ms", ttlMs),
|
||||||
|
zap.Int64("max_ttl_ms", s.maxQuoteTTLMillis),
|
||||||
|
)
|
||||||
|
ttlMs = s.maxQuoteTTLMillis
|
||||||
|
}
|
||||||
|
expiry, err := computeExpiry(now, ttlMs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err)
|
return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -181,6 +181,62 @@ func TestServiceGetQuoteFirm(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServiceGetQuoteFirm_ClampsTTLToConfiguredMax(t *testing.T) {
|
||||||
|
const (
|
||||||
|
configuredMaxTTL = 1 * time.Second
|
||||||
|
requestedTTL = 1 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
repo := &repositoryStub{}
|
||||||
|
repo.pairs = &pairStoreStub{
|
||||||
|
getFn: func(ctx context.Context, pair model.CurrencyPair) (*model.Pair, error) {
|
||||||
|
return &model.Pair{
|
||||||
|
Pair: pair,
|
||||||
|
BaseMeta: model.CurrencySettings{Code: pair.Base, Decimals: 2, Rounding: model.RoundingModeHalfEven},
|
||||||
|
QuoteMeta: model.CurrencySettings{Code: pair.Quote, Decimals: 2, Rounding: model.RoundingModeHalfEven},
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
repo.rates = &ratesStoreStub{
|
||||||
|
latestFn: func(ctx context.Context, pair model.CurrencyPair, provider string) (*model.RateSnapshot, error) {
|
||||||
|
return &model.RateSnapshot{
|
||||||
|
Pair: pair,
|
||||||
|
Provider: provider,
|
||||||
|
Ask: "1.10",
|
||||||
|
Bid: "1.08",
|
||||||
|
RateRef: "rate#1",
|
||||||
|
AsOfUnixMs: time.Now().UnixMilli(),
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
repo.quotes = "esStoreStub{}
|
||||||
|
repo.currencies = currencyStoreStub{}
|
||||||
|
|
||||||
|
svc := NewService(zap.NewNop(), repo, nil, "", WithMaxQuoteTTLMillis(int64(configuredMaxTTL/time.Millisecond)))
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
resp, err := svc.GetQuote(context.Background(), &oraclev1.GetQuoteRequest{
|
||||||
|
Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"},
|
||||||
|
Side: fxv1.Side_BUY_BASE_SELL_QUOTE,
|
||||||
|
AmountInput: &oraclev1.GetQuoteRequest_BaseAmount{BaseAmount: &moneyv1.Money{
|
||||||
|
Currency: "USD",
|
||||||
|
Amount: "100",
|
||||||
|
}},
|
||||||
|
Firm: true,
|
||||||
|
TtlMs: int64(requestedTTL / time.Millisecond),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
expiry := time.UnixMilli(resp.GetQuote().GetExpiresAtUnixMs())
|
||||||
|
if expiry.Before(start) {
|
||||||
|
t.Fatalf("expected expiry after request start, got %s", expiry)
|
||||||
|
}
|
||||||
|
if expiry.After(start.Add(5 * time.Second)) {
|
||||||
|
t.Fatalf("expected clamped expiry close to 1s max ttl, got %s", expiry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestServiceGetQuoteRateNotFound(t *testing.T) {
|
func TestServiceGetQuoteRateNotFound(t *testing.T) {
|
||||||
repo := &repositoryStub{
|
repo := &repositoryStub{
|
||||||
pairs: &pairStoreStub{
|
pairs: &pairStoreStub{
|
||||||
|
|||||||
@@ -38,6 +38,8 @@ messaging:
|
|||||||
# Retain quote records after expiry to allow long-running payments to complete.
|
# Retain quote records after expiry to allow long-running payments to complete.
|
||||||
quote_retention_hours: 72
|
quote_retention_hours: 72
|
||||||
|
|
||||||
|
max_fx_quote_ttl_ms: 600000
|
||||||
|
|
||||||
# Service endpoints are sourced from discovery; no static overrides.
|
# Service endpoints are sourced from discovery; no static overrides.
|
||||||
card_gateways:
|
card_gateways:
|
||||||
monetix:
|
monetix:
|
||||||
|
|||||||
@@ -38,6 +38,8 @@ messaging:
|
|||||||
# Retain quote records after expiry to allow long-running payments to complete.
|
# Retain quote records after expiry to allow long-running payments to complete.
|
||||||
quote_retention_hours: 72
|
quote_retention_hours: 72
|
||||||
|
|
||||||
|
max_fx_quote_ttl_ms: 600000
|
||||||
|
|
||||||
# Service endpoints are sourced from discovery; no static overrides.
|
# Service endpoints are sourced from discovery; no static overrides.
|
||||||
card_gateways:
|
card_gateways:
|
||||||
monetix:
|
monetix:
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ type config struct {
|
|||||||
FeeAccounts map[string]string `yaml:"fee_ledger_accounts"`
|
FeeAccounts map[string]string `yaml:"fee_ledger_accounts"`
|
||||||
GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"`
|
GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"`
|
||||||
QuoteRetentionHrs int `yaml:"quote_retention_hours"`
|
QuoteRetentionHrs int `yaml:"quote_retention_hours"`
|
||||||
|
MaxFXQuoteTTLMs int64 `yaml:"max_fx_quote_ttl_ms"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientConfig struct {
|
type clientConfig struct {
|
||||||
@@ -78,6 +79,11 @@ type limitsOverrideCfg struct {
|
|||||||
MaxOps int `yaml:"max_ops"`
|
MaxOps int `yaml:"max_ops"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultMaxFXQuoteTTL = 10 * time.Minute
|
||||||
|
defaultMaxFXQuoteTTLMillis = int64(defaultMaxFXQuoteTTL / time.Millisecond)
|
||||||
|
)
|
||||||
|
|
||||||
func (c clientConfig) callTimeout() time.Duration {
|
func (c clientConfig) callTimeout() time.Duration {
|
||||||
if c.CallTimeoutSecs <= 0 {
|
if c.CallTimeoutSecs <= 0 {
|
||||||
return 3 * time.Second
|
return 3 * time.Second
|
||||||
@@ -92,6 +98,13 @@ func (c *config) quoteRetention() time.Duration {
|
|||||||
return time.Duration(c.QuoteRetentionHrs) * time.Hour
|
return time.Duration(c.QuoteRetentionHrs) * time.Hour
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *config) maxFXQuoteTTLMillis() int64 {
|
||||||
|
if c == nil || c.MaxFXQuoteTTLMs <= 0 {
|
||||||
|
return defaultMaxFXQuoteTTLMillis
|
||||||
|
}
|
||||||
|
return c.MaxFXQuoteTTLMs
|
||||||
|
}
|
||||||
|
|
||||||
func (i *Imp) loadConfig() (*config, error) {
|
func (i *Imp) loadConfig() (*config, error) {
|
||||||
data, err := os.ReadFile(i.file)
|
data, err := os.ReadFile(i.file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest
|
|||||||
if deps.quotationClient != nil {
|
if deps.quotationClient != nil {
|
||||||
opts = append(opts, orchestrator.WithQuotationService(deps.quotationClient))
|
opts = append(opts, orchestrator.WithQuotationService(deps.quotationClient))
|
||||||
}
|
}
|
||||||
|
opts = append(opts, orchestrator.WithMaxFXQuoteTTLMillis(cfg.maxFXQuoteTTLMillis()))
|
||||||
if deps.gatewayInvokeResolver != nil {
|
if deps.gatewayInvokeResolver != nil {
|
||||||
opts = append(opts, orchestrator.WithGatewayInvokeResolver(deps.gatewayInvokeResolver))
|
opts = append(opts, orchestrator.WithGatewayInvokeResolver(deps.gatewayInvokeResolver))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -422,6 +422,15 @@ func WithClock(clock clockpkg.Clock) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMaxFXQuoteTTLMillis caps forwarded FX quote TTL requests.
|
||||||
|
func WithMaxFXQuoteTTLMillis(value int64) Option {
|
||||||
|
return func(s *Service) {
|
||||||
|
if value > 0 {
|
||||||
|
s.maxFXQuoteTTLMillis = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainResolver GatewayInvokeResolver, providerResolver GatewayInvokeResolver, logger mlogger.Logger) railGatewayDependency {
|
func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainResolver GatewayInvokeResolver, providerResolver GatewayInvokeResolver, logger mlogger.Logger) railGatewayDependency {
|
||||||
result := railGatewayDependency{
|
result := railGatewayDependency{
|
||||||
byID: map[string]rail.RailGateway{},
|
byID: map[string]rail.RailGateway{},
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package orchestrator
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/tech/sendico/payments/orchestrator/internal/service/plan_builder"
|
"github.com/tech/sendico/payments/orchestrator/internal/service/plan_builder"
|
||||||
"github.com/tech/sendico/payments/storage"
|
"github.com/tech/sendico/payments/storage"
|
||||||
@@ -23,6 +24,11 @@ func (e serviceError) Error() string {
|
|||||||
return string(e)
|
return string(e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultMaxFXQuoteTTL = 10 * time.Minute
|
||||||
|
defaultMaxFXQuoteTTLMillis = int64(defaultMaxFXQuoteTTL / time.Millisecond)
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errStorageUnavailable = serviceError("payments.orchestrator: storage not initialised")
|
errStorageUnavailable = serviceError("payments.orchestrator: storage not initialised")
|
||||||
errQuotationUnavailable = serviceError("payments.orchestrator: quotation service not configured")
|
errQuotationUnavailable = serviceError("payments.orchestrator: quotation service not configured")
|
||||||
@@ -33,6 +39,7 @@ type Service struct {
|
|||||||
logger mlogger.Logger
|
logger mlogger.Logger
|
||||||
storage storage.Repository
|
storage storage.Repository
|
||||||
clock clockpkg.Clock
|
clock clockpkg.Clock
|
||||||
|
maxFXQuoteTTLMillis int64
|
||||||
|
|
||||||
deps serviceDependencies
|
deps serviceDependencies
|
||||||
h handlerSet
|
h handlerSet
|
||||||
@@ -75,6 +82,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option)
|
|||||||
logger: logger.Named("payment_orchestrator"),
|
logger: logger.Named("payment_orchestrator"),
|
||||||
storage: repo,
|
storage: repo,
|
||||||
clock: clockpkg.NewSystem(),
|
clock: clockpkg.NewSystem(),
|
||||||
|
maxFXQuoteTTLMillis: defaultMaxFXQuoteTTLMillis,
|
||||||
}
|
}
|
||||||
|
|
||||||
initMetrics()
|
initMetrics()
|
||||||
@@ -88,6 +96,9 @@ func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option)
|
|||||||
if svc.clock == nil {
|
if svc.clock == nil {
|
||||||
svc.clock = clockpkg.NewSystem()
|
svc.clock = clockpkg.NewSystem()
|
||||||
}
|
}
|
||||||
|
if svc.maxFXQuoteTTLMillis <= 0 {
|
||||||
|
svc.maxFXQuoteTTLMillis = defaultMaxFXQuoteTTLMillis
|
||||||
|
}
|
||||||
|
|
||||||
engine := defaultPaymentEngine{svc: svc}
|
engine := defaultPaymentEngine{svc: svc}
|
||||||
svc.h.commands = newPaymentCommandFactory(engine, svc.logger)
|
svc.h.commands = newPaymentCommandFactory(engine, svc.logger)
|
||||||
|
|||||||
@@ -108,6 +108,23 @@ type quoteResolutionError struct {
|
|||||||
|
|
||||||
func (e quoteResolutionError) Error() string { return e.err.Error() }
|
func (e quoteResolutionError) Error() string { return e.err.Error() }
|
||||||
|
|
||||||
|
func (s *Service) clampFXIntentTTL(intent *sharedv1.PaymentIntent) *sharedv1.PaymentIntent {
|
||||||
|
if intent == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
cloned, ok := proto.Clone(intent).(*sharedv1.PaymentIntent)
|
||||||
|
if !ok || cloned == nil {
|
||||||
|
return intent
|
||||||
|
}
|
||||||
|
if s == nil || s.maxFXQuoteTTLMillis <= 0 {
|
||||||
|
return cloned
|
||||||
|
}
|
||||||
|
if fx := cloned.GetFx(); fx != nil && fx.GetTtlMs() > s.maxFXQuoteTTLMillis {
|
||||||
|
fx.TtlMs = s.maxFXQuoteTTLMillis
|
||||||
|
}
|
||||||
|
return cloned
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInput) (*sharedv1.PaymentQuote, *sharedv1.PaymentIntent, *model.PaymentPlan, error) {
|
func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInput) (*sharedv1.PaymentQuote, *sharedv1.PaymentIntent, *model.PaymentPlan, error) {
|
||||||
if ref := strings.TrimSpace(in.QuoteRef); ref != "" {
|
if ref := strings.TrimSpace(in.QuoteRef); ref != "" {
|
||||||
quotesStore, err := ensureQuotesStore(s.storage)
|
quotesStore, err := ensureQuotesStore(s.storage)
|
||||||
@@ -149,10 +166,11 @@ func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInp
|
|||||||
if in.Intent == nil {
|
if in.Intent == nil {
|
||||||
return nil, nil, nil, merrors.InvalidArgument("intent is required")
|
return nil, nil, nil, merrors.InvalidArgument("intent is required")
|
||||||
}
|
}
|
||||||
|
intent := s.clampFXIntentTTL(in.Intent)
|
||||||
req := "ationv1.QuotePaymentRequest{
|
req := "ationv1.QuotePaymentRequest{
|
||||||
Meta: in.Meta,
|
Meta: in.Meta,
|
||||||
IdempotencyKey: in.IdempotencyKey,
|
IdempotencyKey: in.IdempotencyKey,
|
||||||
Intent: in.Intent,
|
Intent: intent,
|
||||||
PreviewOnly: false,
|
PreviewOnly: false,
|
||||||
}
|
}
|
||||||
if !s.deps.quotation.available() {
|
if !s.deps.quotation.available() {
|
||||||
@@ -175,7 +193,7 @@ func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInp
|
|||||||
OrgRef: in.OrgRef,
|
OrgRef: in.OrgRef,
|
||||||
OrgID: in.OrgID,
|
OrgID: in.OrgID,
|
||||||
Meta: in.Meta,
|
Meta: in.Meta,
|
||||||
Intent: in.Intent,
|
Intent: intent,
|
||||||
QuoteRef: ref,
|
QuoteRef: ref,
|
||||||
IdempotencyKey: in.IdempotencyKey,
|
IdempotencyKey: in.IdempotencyKey,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import (
|
|||||||
mloggerfactory "github.com/tech/sendico/pkg/mlogger/factory"
|
mloggerfactory "github.com/tech/sendico/pkg/mlogger/factory"
|
||||||
"github.com/tech/sendico/pkg/model/account_role"
|
"github.com/tech/sendico/pkg/model/account_role"
|
||||||
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
||||||
|
fxv1 "github.com/tech/sendico/pkg/proto/common/fx/v1"
|
||||||
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
|
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
|
||||||
paymentv1 "github.com/tech/sendico/pkg/proto/common/payment/v1"
|
paymentv1 "github.com/tech/sendico/pkg/proto/common/payment/v1"
|
||||||
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
|
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
|
||||||
@@ -219,6 +220,83 @@ func TestResolvePaymentQuote_QuoteRefSkipsQuoteRecompute(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestResolvePaymentQuote_ClampsForwardedFXTTL(t *testing.T) {
|
||||||
|
const (
|
||||||
|
requestedTTL = int64((15 * time.Minute) / time.Millisecond)
|
||||||
|
maxTTL = int64((10 * time.Minute) / time.Millisecond)
|
||||||
|
)
|
||||||
|
|
||||||
|
org := bson.NewObjectID()
|
||||||
|
intent := &sharedv1.PaymentIntent{
|
||||||
|
Ref: "ref-1",
|
||||||
|
Amount: &moneyv1.Money{Currency: "USD", Amount: "1"},
|
||||||
|
SettlementCurrency: "EUR",
|
||||||
|
Fx: &sharedv1.FXIntent{
|
||||||
|
Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"},
|
||||||
|
Side: fxv1.Side_SELL_BASE_BUY_QUOTE,
|
||||||
|
Firm: true,
|
||||||
|
TtlMs: requestedTTL,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
intent = protoIntentFromModel(intentFromProto(intent))
|
||||||
|
intent.Fx.TtlMs = requestedTTL
|
||||||
|
|
||||||
|
recordIntent := protoIntentFromModel(intentFromProto(intent))
|
||||||
|
recordIntent.Fx.TtlMs = maxTTL
|
||||||
|
|
||||||
|
var capturedTTLMs int64
|
||||||
|
svc := &Service{
|
||||||
|
storage: stubRepo{
|
||||||
|
quotes: &helperQuotesStore{
|
||||||
|
records: map[string]*model.PaymentQuoteRecord{
|
||||||
|
"q1": {
|
||||||
|
QuoteRef: "q1",
|
||||||
|
Intent: intentFromProto(recordIntent),
|
||||||
|
Quote: &model.PaymentQuoteSnapshot{QuoteRef: "q1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
clock: clockpkg.NewSystem(),
|
||||||
|
maxFXQuoteTTLMillis: maxTTL,
|
||||||
|
deps: serviceDependencies{
|
||||||
|
quotation: quotationDependency{
|
||||||
|
client: &helperQuotationClient{
|
||||||
|
quotePaymentFn: func(ctx context.Context, req *quotationv1.QuotePaymentRequest, opts ...grpc.CallOption) (*quotationv1.QuotePaymentResponse, error) {
|
||||||
|
capturedTTLMs = req.GetIntent().GetFx().GetTtlMs()
|
||||||
|
return "ationv1.QuotePaymentResponse{
|
||||||
|
Quote: &sharedv1.PaymentQuote{
|
||||||
|
QuoteRef: "q1",
|
||||||
|
},
|
||||||
|
IdempotencyKey: req.GetIdempotencyKey(),
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, resolvedIntent, _, err := svc.resolvePaymentQuote(context.Background(), quoteResolutionInput{
|
||||||
|
OrgRef: org.Hex(),
|
||||||
|
OrgID: org,
|
||||||
|
Meta: &sharedv1.RequestMeta{OrganizationRef: org.Hex()},
|
||||||
|
Intent: intent,
|
||||||
|
IdempotencyKey: "idem-1",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if capturedTTLMs != maxTTL {
|
||||||
|
t.Fatalf("expected forwarded ttl_ms to be clamped to %d, got %d", maxTTL, capturedTTLMs)
|
||||||
|
}
|
||||||
|
if intent.GetFx().GetTtlMs() != requestedTTL {
|
||||||
|
t.Fatalf("expected original intent ttl to stay unchanged, got %d", intent.GetFx().GetTtlMs())
|
||||||
|
}
|
||||||
|
if resolvedIntent == nil || resolvedIntent.GetFx().GetTtlMs() != maxTTL {
|
||||||
|
t.Fatalf("expected resolved intent ttl to match stored clamped value")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestInitiatePaymentIdempotency(t *testing.T) {
|
func TestInitiatePaymentIdempotency(t *testing.T) {
|
||||||
logger := mloggerfactory.NewLogger(false)
|
logger := mloggerfactory.NewLogger(false)
|
||||||
org := bson.NewObjectID()
|
org := bson.NewObjectID()
|
||||||
|
|||||||
@@ -21,8 +21,8 @@ func (e serviceError) Error() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultFeeQuoteTTLMillis int64 = 120000
|
defaultFeeQuoteTTLMillis int64 = 600000
|
||||||
defaultOracleTTLMillis int64 = 60000
|
defaultOracleTTLMillis int64 = 600000
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|||||||
Reference in New Issue
Block a user