set 10 min quotations timeout
This commit is contained in:
@@ -11,6 +11,8 @@ grpc:
|
||||
metrics:
|
||||
address: ":9400"
|
||||
|
||||
max_quote_ttl_ms: 600000
|
||||
|
||||
database:
|
||||
driver: mongodb
|
||||
settings:
|
||||
|
||||
@@ -11,6 +11,8 @@ grpc:
|
||||
metrics:
|
||||
address: ":9400"
|
||||
|
||||
max_quote_ttl_ms: 600000
|
||||
|
||||
database:
|
||||
driver: mongodb
|
||||
settings:
|
||||
|
||||
@@ -22,11 +22,28 @@ type Imp struct {
|
||||
file string
|
||||
debug bool
|
||||
|
||||
config *grpcapp.Config
|
||||
config *config
|
||||
app *grpcapp.App[storage.Repository]
|
||||
service *oracle.Service
|
||||
}
|
||||
|
||||
type config struct {
|
||||
*grpcapp.Config `yaml:",inline"`
|
||||
MaxQuoteTTLMs int64 `yaml:"max_quote_ttl_ms"`
|
||||
}
|
||||
|
||||
const (
|
||||
defaultMaxQuoteTTL = 10 * time.Minute
|
||||
defaultMaxQuoteTTLMillis = int64(defaultMaxQuoteTTL / time.Millisecond)
|
||||
)
|
||||
|
||||
func (c *config) maxQuoteTTLMillis() int64 {
|
||||
if c == nil || c.MaxQuoteTTLMs <= 0 {
|
||||
return defaultMaxQuoteTTLMillis
|
||||
}
|
||||
return c.MaxQuoteTTLMs
|
||||
}
|
||||
|
||||
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
|
||||
return &Imp{
|
||||
logger: logger.Named("server"),
|
||||
@@ -63,12 +80,18 @@ func (i *Imp) Start() error {
|
||||
}
|
||||
|
||||
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
|
||||
svc := oracle.NewService(logger, repo, producer, cfg.GRPC.DiscoveryInvokeURI())
|
||||
svc := oracle.NewService(
|
||||
logger,
|
||||
repo,
|
||||
producer,
|
||||
cfg.GRPC.DiscoveryInvokeURI(),
|
||||
oracle.WithMaxQuoteTTLMillis(cfg.maxQuoteTTLMillis()),
|
||||
)
|
||||
i.service = svc
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
app, err := grpcapp.NewApp(i.logger, "fx", cfg, i.debug, repoFactory, serviceFactory)
|
||||
app, err := grpcapp.NewApp(i.logger, "fx", cfg.Config, i.debug, repoFactory, serviceFactory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -77,14 +100,14 @@ func (i *Imp) Start() error {
|
||||
return i.app.Start()
|
||||
}
|
||||
|
||||
func (i *Imp) loadConfig() (*grpcapp.Config, error) {
|
||||
func (i *Imp) loadConfig() (*config, error) {
|
||||
data, err := os.ReadFile(i.file)
|
||||
if err != nil {
|
||||
i.logger.Error("Could not read configuration file", zap.String("config_file", i.file), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := &grpcapp.Config{}
|
||||
cfg := &config{Config: &grpcapp.Config{}}
|
||||
if err := yaml.Unmarshal(data, cfg); err != nil {
|
||||
i.logger.Error("Failed to parse configuration", zap.Error(err))
|
||||
return nil, err
|
||||
|
||||
@@ -28,6 +28,14 @@ func (e serviceError) Error() string {
|
||||
return string(e)
|
||||
}
|
||||
|
||||
const (
|
||||
defaultMaxQuoteTTL = 10 * time.Minute
|
||||
defaultMaxQuoteTTLMillis = int64(defaultMaxQuoteTTL / time.Millisecond)
|
||||
)
|
||||
|
||||
// Option configures oracle service behavior.
|
||||
type Option func(*Service)
|
||||
|
||||
var (
|
||||
errSideRequired = serviceError("oracle: side is required")
|
||||
errAmountsMutuallyExclusive = serviceError("oracle: exactly one amount must be provided")
|
||||
@@ -38,21 +46,40 @@ var (
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
logger mlogger.Logger
|
||||
storage storage.Repository
|
||||
producer pmessaging.Producer
|
||||
announcer *discovery.Announcer
|
||||
invokeURI string
|
||||
logger mlogger.Logger
|
||||
storage storage.Repository
|
||||
producer pmessaging.Producer
|
||||
announcer *discovery.Announcer
|
||||
invokeURI string
|
||||
maxQuoteTTLMillis int64
|
||||
oraclev1.UnimplementedOracleServer
|
||||
}
|
||||
|
||||
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, invokeURI string) *Service {
|
||||
// WithMaxQuoteTTLMillis caps firm quote TTL requests to the supplied number of milliseconds.
|
||||
func WithMaxQuoteTTLMillis(value int64) Option {
|
||||
return func(s *Service) {
|
||||
if value > 0 {
|
||||
s.maxQuoteTTLMillis = value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, invokeURI string, opts ...Option) *Service {
|
||||
initMetrics()
|
||||
svc := &Service{
|
||||
logger: logger.Named("oracle"),
|
||||
storage: repo,
|
||||
producer: prod,
|
||||
invokeURI: strings.TrimSpace(invokeURI),
|
||||
logger: logger.Named("oracle"),
|
||||
storage: repo,
|
||||
producer: prod,
|
||||
invokeURI: strings.TrimSpace(invokeURI),
|
||||
maxQuoteTTLMillis: defaultMaxQuoteTTLMillis,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
if opt != nil {
|
||||
opt(svc)
|
||||
}
|
||||
}
|
||||
if svc.maxQuoteTTLMillis <= 0 {
|
||||
svc.maxQuoteTTLMillis = defaultMaxQuoteTTLMillis
|
||||
}
|
||||
svc.startDiscoveryAnnouncer()
|
||||
return svc
|
||||
@@ -222,7 +249,16 @@ func (s *Service) getQuoteResponder(ctx context.Context, req *oraclev1.GetQuoteR
|
||||
|
||||
expiresAt := int64(0)
|
||||
if req.GetFirm() {
|
||||
expiry, err := computeExpiry(now, req.GetTtlMs())
|
||||
ttlMs := req.GetTtlMs()
|
||||
if ttlMs > s.maxQuoteTTLMillis {
|
||||
logger.Info(
|
||||
"Clamping requested firm quote ttl to configured maximum",
|
||||
zap.Int64("requested_ttl_ms", ttlMs),
|
||||
zap.Int64("max_ttl_ms", s.maxQuoteTTLMillis),
|
||||
)
|
||||
ttlMs = s.maxQuoteTTLMillis
|
||||
}
|
||||
expiry, err := computeExpiry(now, ttlMs)
|
||||
if err != nil {
|
||||
return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err)
|
||||
}
|
||||
|
||||
@@ -181,6 +181,62 @@ func TestServiceGetQuoteFirm(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceGetQuoteFirm_ClampsTTLToConfiguredMax(t *testing.T) {
|
||||
const (
|
||||
configuredMaxTTL = 1 * time.Second
|
||||
requestedTTL = 1 * time.Minute
|
||||
)
|
||||
|
||||
repo := &repositoryStub{}
|
||||
repo.pairs = &pairStoreStub{
|
||||
getFn: func(ctx context.Context, pair model.CurrencyPair) (*model.Pair, error) {
|
||||
return &model.Pair{
|
||||
Pair: pair,
|
||||
BaseMeta: model.CurrencySettings{Code: pair.Base, Decimals: 2, Rounding: model.RoundingModeHalfEven},
|
||||
QuoteMeta: model.CurrencySettings{Code: pair.Quote, Decimals: 2, Rounding: model.RoundingModeHalfEven},
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
repo.rates = &ratesStoreStub{
|
||||
latestFn: func(ctx context.Context, pair model.CurrencyPair, provider string) (*model.RateSnapshot, error) {
|
||||
return &model.RateSnapshot{
|
||||
Pair: pair,
|
||||
Provider: provider,
|
||||
Ask: "1.10",
|
||||
Bid: "1.08",
|
||||
RateRef: "rate#1",
|
||||
AsOfUnixMs: time.Now().UnixMilli(),
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
repo.quotes = "esStoreStub{}
|
||||
repo.currencies = currencyStoreStub{}
|
||||
|
||||
svc := NewService(zap.NewNop(), repo, nil, "", WithMaxQuoteTTLMillis(int64(configuredMaxTTL/time.Millisecond)))
|
||||
start := time.Now()
|
||||
|
||||
resp, err := svc.GetQuote(context.Background(), &oraclev1.GetQuoteRequest{
|
||||
Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"},
|
||||
Side: fxv1.Side_BUY_BASE_SELL_QUOTE,
|
||||
AmountInput: &oraclev1.GetQuoteRequest_BaseAmount{BaseAmount: &moneyv1.Money{
|
||||
Currency: "USD",
|
||||
Amount: "100",
|
||||
}},
|
||||
Firm: true,
|
||||
TtlMs: int64(requestedTTL / time.Millisecond),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
expiry := time.UnixMilli(resp.GetQuote().GetExpiresAtUnixMs())
|
||||
if expiry.Before(start) {
|
||||
t.Fatalf("expected expiry after request start, got %s", expiry)
|
||||
}
|
||||
if expiry.After(start.Add(5 * time.Second)) {
|
||||
t.Fatalf("expected clamped expiry close to 1s max ttl, got %s", expiry)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceGetQuoteRateNotFound(t *testing.T) {
|
||||
repo := &repositoryStub{
|
||||
pairs: &pairStoreStub{
|
||||
|
||||
Reference in New Issue
Block a user