diff --git a/api/discovery/.golangci.yml b/api/discovery/.golangci.yml index b7c11550..f489facb 100644 --- a/api/discovery/.golangci.yml +++ b/api/discovery/.golangci.yml @@ -116,7 +116,6 @@ linters: - varnamelen - wastedassign - whitespace - - wrapcheck - wsl_v5 - zerologlint # Disable specific linters. @@ -126,6 +125,7 @@ linters: - gochecknoglobals - gomoddirectives - wsl + - wrapcheck # All available settings of specific linters. # See the dedicated "linters.settings" documentation section. settings: diff --git a/api/fx/ingestor/.golangci.yml b/api/fx/ingestor/.golangci.yml new file mode 100644 index 00000000..0780108d --- /dev/null +++ b/api/fx/ingestor/.golangci.yml @@ -0,0 +1,195 @@ +# See the dedicated "version" documentation section. +version: "2" +linters: + # Default set of linters. + # The value can be: + # - `standard`: https://golangci-lint.run/docs/linters/#enabled-by-default + # - `all`: enables all linters by default. + # - `none`: disables all linters by default. + # - `fast`: enables only linters considered as "fast" (`golangci-lint help linters --json | jq '[ .[] | select(.fast==true) ] | map(.name)'`). + # Default: standard + default: all + # Enable specific linter. + enable: + - arangolint + - asasalint + - asciicheck + - bidichk + - bodyclose + - canonicalheader + - containedctx + - contextcheck + - copyloopvar + - cyclop + - decorder + - dogsled + - dupl + - dupword + - durationcheck + - embeddedstructfieldcheck + - err113 + - errcheck + - errchkjson + - errname + - errorlint + - exhaustive + - exptostd + - fatcontext + - forbidigo + - forcetypeassert + - funcorder + - funlen + - ginkgolinter + - gocheckcompilerdirectives + - gochecknoglobals + - gochecknoinits + - gochecksumtype + - gocognit + - goconst + - gocritic + - gocyclo + - godoclint + - godot + - godox + - goheader + - gomodguard + - goprintffuncname + - gosec + - gosmopolitan + - govet + - grouper + - iface + - importas + - inamedparam + - ineffassign + - interfacebloat + - intrange + - iotamixing + - ireturn + - lll + - loggercheck + - maintidx + - makezero + - mirror + - misspell + - mnd + - modernize + - musttag + - nakedret + - nestif + - nilerr + - nilnesserr + - nilnil + - nlreturn + - noctx + - noinlineerr + - nolintlint + - nonamedreturns + - nosprintfhostport + - paralleltest + - perfsprint + - prealloc + - predeclared + - promlinter + - protogetter + - reassign + - recvcheck + - revive + - rowserrcheck + - sloglint + - spancheck + - sqlclosecheck + - staticcheck + - tagalign + - tagliatelle + - testableexamples + - testifylint + - testpackage + - thelper + - tparallel + - unconvert + - unparam + - unqueryvet + - unused + - usestdlibvars + - usetesting + - varnamelen + - wastedassign + - whitespace + - wsl_v5 + - zerologlint + # Disable specific linters. + disable: + - depguard + - exhaustruct + - gochecknoglobals + - gomoddirectives + - wrapcheck + - wsl + # All available settings of specific linters. + # See the dedicated "linters.settings" documentation section. + settings: + wsl_v5: + allow-first-in-block: true + allow-whole-block: false + branch-max-lines: 2 + + # Defines a set of rules to ignore issues. + # It does not skip the analysis, and so does not ignore "typecheck" errors. + exclusions: + # Mode of the generated files analysis. + # + # - `strict`: sources are excluded by strictly following the Go generated file convention. + # Source files that have lines matching only the following regular expression will be excluded: `^// Code generated .* DO NOT EDIT\.$` + # This line must appear before the first non-comment, non-blank text in the file. + # https://go.dev/s/generatedcode + # - `lax`: sources are excluded if they contain lines like `autogenerated file`, `code generated`, `do not edit`, etc. + # - `disable`: disable the generated files exclusion. + # + # Default: strict + generated: lax + # Log a warning if an exclusion rule is unused. + # Default: false + warn-unused: true + # Predefined exclusion rules. + # Default: [] + presets: + - comments + - std-error-handling + - common-false-positives + - legacy + # Excluding configuration per-path, per-linter, per-text and per-source. + rules: + # Exclude some linters from running on tests files. + - path: _test\.go + linters: + - gocyclo + - errcheck + - dupl + - gosec + # Run some linter only for test files by excluding its issues for everything else. + - path-except: _test\.go + linters: + - forbidigo + # Exclude known linters from partially hard-vendored code, + # which is impossible to exclude via `nolint` comments. + # `/` will be replaced by the current OS file path separator to properly work on Windows. + - path: internal/hmac/ + text: "weak cryptographic primitive" + linters: + - gosec + # Exclude some `staticcheck` messages. + - linters: + - staticcheck + text: "SA9003:" + # Exclude `lll` issues for long lines with `go:generate`. + - linters: + - lll + source: "^//go:generate " + # Which file paths to exclude: they will be analyzed, but issues from them won't be reported. + # "/" will be replaced by the current OS file path separator to properly work on Windows. + # Default: [] + paths: [] + # Which file paths to not exclude. + # Default: [] + paths-except: [] diff --git a/api/fx/ingestor/internal/app/app.go b/api/fx/ingestor/internal/app/app.go index 98361358..5a0bac26 100644 --- a/api/fx/ingestor/internal/app/app.go +++ b/api/fx/ingestor/internal/app/app.go @@ -51,16 +51,17 @@ func (a *App) Run(ctx context.Context) error { return err } a.logger.Debug("Metrics server initialised") - defer metricsSrv.Close(context.Background()) + defer metricsSrv.Close(context.Background()) //nolint:contextcheck - conn, err := db.ConnectMongo(a.logger, a.cfg.Database) + conn, err := db.ConnectMongo(a.logger, a.cfg.Database) //nolint:contextcheck if err != nil { return err } - defer conn.Disconnect(context.Background()) + defer conn.Disconnect(context.Background()) //nolint:errcheck,contextcheck + a.logger.Debug("MongoDB connection established") - repo, err := mongostorage.New(a.logger, conn) + repo, err := mongostorage.New(a.logger, conn) //nolint:contextcheck if err != nil { return err } @@ -72,6 +73,7 @@ func (a *App) Run(ctx context.Context) error { } var announcer *discovery.Announcer + if cfg := a.cfg.Messaging; cfg != nil && cfg.Driver != "" { broker, err := msg.CreateMessagingBroker(a.logger.Named("discovery_bus"), cfg) if err != nil { @@ -84,6 +86,7 @@ func (a *App) Run(ctx context.Context) error { Version: appversion.Create().Short(), } announcer = discovery.NewAnnouncer(a.logger, producer, "fx_ingestor", announce) + announcer.Start() defer announcer.Stop() } @@ -98,6 +101,8 @@ func (a *App) Run(ctx context.Context) error { } return err } + a.logger.Info("Ingestor service stopped") + return nil } diff --git a/api/fx/ingestor/internal/appversion/version.go b/api/fx/ingestor/internal/appversion/version.go index 030c39ff..0170d7ed 100644 --- a/api/fx/ingestor/internal/appversion/version.go +++ b/api/fx/ingestor/internal/appversion/version.go @@ -14,8 +14,9 @@ var ( BuildDate string ) +//nolint:ireturn func Create() version.Printer { - vi := version.Info{ + info := version.Info{ Program: "Sendico FX Ingestor Service", Revision: Revision, Branch: Branch, @@ -23,5 +24,6 @@ func Create() version.Printer { BuildDate: BuildDate, Version: Version, } - return vf.Create(&vi) + + return vf.Create(&info) } diff --git a/api/fx/ingestor/internal/config/config.go b/api/fx/ingestor/internal/config/config.go index 1b0fdc76..5891272f 100644 --- a/api/fx/ingestor/internal/config/config.go +++ b/api/fx/ingestor/internal/config/config.go @@ -25,6 +25,7 @@ type Config struct { pairsBySource map[mmodel.Driver][]PairConfig } +//nolint:cyclop func Load(path string) (*Config, error) { if path == "" { return nil, merrors.InvalidArgument("config: path is empty") @@ -36,19 +37,23 @@ func Load(path string) (*Config, error) { } cfg := &Config{} - if err := yaml.Unmarshal(data, cfg); err != nil { + + err = yaml.Unmarshal(data, cfg) + if err != nil { return nil, merrors.InternalWrap(err, "config: failed to parse yaml") } if len(cfg.Market.Sources) == 0 { return nil, merrors.InvalidArgument("config: no market sources configured") } + sourceSet := make(map[mmodel.Driver]struct{}, len(cfg.Market.Sources)) for idx := range cfg.Market.Sources { src := &cfg.Market.Sources[idx] if src.Driver.IsEmpty() { return nil, merrors.InvalidArgument("config: market source driver is empty") } + sourceSet[src.Driver] = struct{}{} } @@ -65,6 +70,7 @@ func Load(path string) (*Config, error) { if driver.IsEmpty() { return nil, merrors.InvalidArgument("config: pair source is empty") } + if _, ok := sourceSet[driver]; !ok { return nil, merrors.InvalidArgument("config: pair references unknown source: "+driver.String(), "pairs."+driver.String()) } @@ -74,10 +80,12 @@ func Load(path string) (*Config, error) { pair := pairList[idx] pair.Base = strings.ToUpper(strings.TrimSpace(pair.Base)) pair.Quote = strings.ToUpper(strings.TrimSpace(pair.Quote)) + pair.Symbol = strings.TrimSpace(pair.Symbol) if pair.Base == "" || pair.Quote == "" || pair.Symbol == "" { return nil, merrors.InvalidArgument("config: pair entries must define base, quote, and symbol", "pairs."+driver.String()) } + if strings.TrimSpace(pair.Provider) == "" { pair.Provider = strings.ToLower(driver.String()) } @@ -87,6 +95,7 @@ func Load(path string) (*Config, error) { Source: driver, }) } + pairsBySource[driver] = processed normalizedPairs[driver.String()] = processed } @@ -94,6 +103,7 @@ func Load(path string) (*Config, error) { cfg.Market.Pairs = normalizedPairs cfg.pairsBySource = pairsBySource cfg.pairs = flattened + if cfg.Database == nil { return nil, merrors.InvalidArgument("config: database configuration is required") } @@ -101,7 +111,7 @@ func Load(path string) (*Config, error) { if cfg.Metrics != nil && cfg.Metrics.Enabled { cfg.Metrics.Address = strings.TrimSpace(cfg.Metrics.Address) if cfg.Metrics.Address == "" { - cfg.Metrics.Address = ":9102" + cfg.Metrics.Address = ":9102" //nolint:mnd } } @@ -112,9 +122,11 @@ func (c *Config) PollInterval() time.Duration { if c == nil { return defaultPollInterval } + if c.PollIntervalSeconds <= 0 { return defaultPollInterval } + return time.Duration(c.PollIntervalSeconds) * time.Second } @@ -122,8 +134,10 @@ func (c *Config) Pairs() []Pair { if c == nil { return nil } + out := make([]Pair, len(c.pairs)) copy(out, c.pairs) + return out } @@ -131,12 +145,14 @@ func (c *Config) PairsBySource() map[mmodel.Driver][]PairConfig { if c == nil { return nil } + out := make(map[mmodel.Driver][]PairConfig, len(c.pairsBySource)) for driver, pairs := range c.pairsBySource { cp := make([]PairConfig, len(pairs)) copy(cp, pairs) out[driver] = cp } + return out } @@ -144,6 +160,8 @@ func (c *Config) MetricsConfig() *MetricsConfig { if c == nil || c.Metrics == nil { return nil } + cp := *c.Metrics + return &cp } diff --git a/api/fx/ingestor/internal/config/market.go b/api/fx/ingestor/internal/config/market.go index af53285f..6b92e7e2 100644 --- a/api/fx/ingestor/internal/config/market.go +++ b/api/fx/ingestor/internal/config/market.go @@ -15,7 +15,8 @@ type PairConfig struct { type Pair struct { PairConfig `yaml:",inline"` - Source mmodel.Driver `yaml:"-"` + + Source mmodel.Driver `yaml:"-"` } type MarketConfig struct { diff --git a/api/fx/ingestor/internal/ingestor/service.go b/api/fx/ingestor/internal/ingestor/service.go index 8adfff9b..e5539a02 100644 --- a/api/fx/ingestor/internal/ingestor/service.go +++ b/api/fx/ingestor/internal/ingestor/service.go @@ -28,9 +28,11 @@ func New(logger mlogger.Logger, cfg *config.Config, repo storage.Repository) (*S if logger == nil { return nil, merrors.InvalidArgument("ingestor: nil logger") } + if cfg == nil { return nil, merrors.InvalidArgument("ingestor: nil config") } + if repo == nil { return nil, merrors.InvalidArgument("ingestor: nil repository") } @@ -52,6 +54,7 @@ func New(logger mlogger.Logger, cfg *config.Config, repo storage.Repository) (*S func (s *Service) Run(ctx context.Context) error { interval := s.cfg.PollInterval() + ticker := time.NewTicker(interval) defer ticker.Stop() @@ -65,6 +68,7 @@ func (s *Service) Run(ctx context.Context) error { select { case <-ctx.Done(): s.logger.Info("Context cancelled, stopping ingestor") + return ctx.Err() case <-ticker.C: if err := s.executePoll(ctx); err != nil { @@ -77,27 +81,34 @@ func (s *Service) Run(ctx context.Context) error { func (s *Service) executePoll(ctx context.Context) error { start := time.Now() err := s.pollOnce(ctx) + if s.metrics != nil { s.metrics.observePoll(time.Since(start), err) } + return err } func (s *Service) pollOnce(ctx context.Context) error { var firstErr error failures := 0 + for _, pair := range s.pairs { start := time.Now() err := s.upsertPair(ctx, pair) elapsed := time.Since(start) + if s.metrics != nil { s.metrics.observePair(pair, elapsed, err) } + if err != nil { if firstErr == nil { firstErr = err } + failures++ + s.logger.Warn("Failed to ingest pair", zap.String("symbol", pair.Symbol), zap.String("source", pair.Source.String()), @@ -110,14 +121,17 @@ func (s *Service) pollOnce(ctx context.Context) error { ) } } + if failures > 0 { s.logger.Warn("Ingestion poll completed with failures", zap.Int("failures", failures), zap.Int("total", len(s.pairs))) } else { s.logger.Debug("Ingestion poll completed", zap.Int("total", len(s.pairs))) } + return firstErr } +//nolint:funlen func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error { connector, ok := s.connectors[pair.Source] if !ok { @@ -133,6 +147,7 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error { if err != nil { return merrors.InvalidArgumentWrap(err, "parse bid price", "bid") } + ask, err := parseDecimal(ticker.AskPrice) if err != nil { return merrors.InvalidArgumentWrap(err, "parse ask price", "ask") @@ -148,16 +163,18 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error { } mid := new(big.Rat).Add(bid, ask) - mid.Quo(mid, big.NewRat(2, 1)) + mid.Quo(mid, big.NewRat(2, 1)) //nolint:mnd spread := big.NewRat(0, 1) if mid.Sign() != 0 { spread.Sub(ask, bid) + if spread.Sign() < 0 { spread.Neg(spread) } + spread.Quo(spread, mid) - spread.Mul(spread, big.NewRat(10000, 1)) // basis points + spread.Mul(spread, big.NewRat(10000, 1)) //nolint:mnd // basis points } now := time.Now().UTC() @@ -201,6 +218,7 @@ func parseDecimal(value string) (*big.Rat, error) { if _, ok := r.SetString(value); !ok { return nil, merrors.InvalidArgument("invalid decimal \""+value+"\"", "value") } + return r, nil } @@ -208,9 +226,11 @@ func invertPrices(bid, ask *big.Rat) (*big.Rat, *big.Rat) { if bid.Sign() == 0 || ask.Sign() == 0 { return bid, ask } + one := big.NewRat(1, 1) invBid := new(big.Rat).Quo(one, ask) // invert ask to get bid invAsk := new(big.Rat).Quo(one, bid) // invert bid to get ask + return invBid, invAsk } @@ -218,6 +238,7 @@ func formatDecimal(r *big.Rat) string { if r == nil { return "0" } + // Format with 8 decimal places, trimming trailing zeros. return r.FloatString(8) } diff --git a/api/fx/ingestor/internal/market/binance/connector.go b/api/fx/ingestor/internal/market/binance/connector.go index 809d89dd..26ec146e 100644 --- a/api/fx/ingestor/internal/market/binance/connector.go +++ b/api/fx/ingestor/internal/market/binance/connector.go @@ -27,30 +27,33 @@ type binanceConnector struct { } const defaultBinanceBaseURL = "https://api.binance.com" + const ( - defaultDialTimeoutSeconds = 5 * time.Second - defaultDialKeepAliveSeconds = 30 * time.Second - defaultTLSHandshakeTimeoutSeconds = 5 * time.Second - defaultResponseHeaderTimeoutSeconds = 10 * time.Second - defaultRequestTimeoutSeconds = 10 * time.Second + defaultDialTimeout = 5 * time.Second + defaultDialKeepAlive = 30 * time.Second + defaultTLSHandshakeTimeout = 5 * time.Second + defaultResponseHeaderTimeout = 10 * time.Second + defaultRequestTimeout = 10 * time.Second ) -func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { +func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { //nolint:ireturn baseURL := defaultBinanceBaseURL provider := strings.ToLower(mmodel.DriverBinance.String()) - dialTimeout := defaultDialTimeoutSeconds - dialKeepAlive := defaultDialKeepAliveSeconds - tlsHandshakeTimeout := defaultTLSHandshakeTimeoutSeconds - responseHeaderTimeout := defaultResponseHeaderTimeoutSeconds - requestTimeout := defaultRequestTimeoutSeconds + dialTimeout := defaultDialTimeout + dialKeepAlive := defaultDialKeepAlive + tlsHandshakeTimeout := defaultTLSHandshakeTimeout + responseHeaderTimeout := defaultResponseHeaderTimeout + requestTimeout := defaultRequestTimeout if settings != nil { if value, ok := settings["base_url"].(string); ok && strings.TrimSpace(value) != "" { baseURL = strings.TrimSpace(value) } + if value, ok := settings["provider"].(string); ok && strings.TrimSpace(value) != "" { provider = strings.TrimSpace(value) } + dialTimeout = common.DurationSetting(settings, "dial_timeout_seconds", dialTimeout) dialKeepAlive = common.DurationSetting(settings, "dial_keep_alive_seconds", dialKeepAlive) tlsHandshakeTimeout = common.DurationSetting(settings, "tls_handshake_timeout_seconds", tlsHandshakeTimeout) @@ -96,6 +99,7 @@ func (c *binanceConnector) FetchTicker(ctx context.Context, symbol string) (*mmo if err != nil { return nil, merrors.InternalWrap(err, "binance: parse base url") } + endpoint.Path = "/api/v3/ticker/bookTicker" query := endpoint.Query() query.Set("symbol", strings.ToUpper(strings.TrimSpace(symbol))) @@ -109,12 +113,14 @@ func (c *binanceConnector) FetchTicker(ctx context.Context, symbol string) (*mmo resp, err := c.client.Do(req) if err != nil { c.logger.Warn("Binance request failed", zap.String("symbol", symbol), zap.Error(err)) + return nil, merrors.InternalWrap(err, "binance: request failed") } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { c.logger.Warn("Binance returned non-OK status", zap.String("symbol", symbol), zap.Int("status", resp.StatusCode)) + return nil, merrors.Internal("binance: unexpected status " + strconv.Itoa(resp.StatusCode)) } @@ -124,9 +130,11 @@ func (c *binanceConnector) FetchTicker(ctx context.Context, symbol string) (*mmo AskPrice string `json:"askPrice"` } - if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil { - c.logger.Warn("Binance decode failed", zap.String("symbol", symbol), zap.Error(err)) - return nil, merrors.InternalWrap(err, "binance: decode response") + decodeErr := json.NewDecoder(resp.Body).Decode(&payload) + if decodeErr != nil { + c.logger.Warn("Binance decode failed", zap.String("symbol", symbol), zap.Error(decodeErr)) + + return nil, merrors.InternalWrap(decodeErr, "binance: decode response") } return &mmodel.Ticker{ diff --git a/api/fx/ingestor/internal/market/cbr/connector.go b/api/fx/ingestor/internal/market/cbr/connector.go index d7e03faa..f9008d29 100644 --- a/api/fx/ingestor/internal/market/cbr/connector.go +++ b/api/fx/ingestor/internal/market/cbr/connector.go @@ -49,7 +49,7 @@ const ( defaultRequestTimeoutSeconds = 10 * time.Second ) -func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { +func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { //nolint:cyclop,ireturn baseURL := defaultCBRBaseURL provider := strings.ToLower(mmodel.DriverCBR.String()) dialTimeout := defaultDialTimeoutSeconds @@ -284,7 +284,7 @@ func (c *cbrConnector) fetchDailyRate(ctx context.Context, valute valuteInfo) (s return computePrice(entry.Value, entry.Nominal) } -func (c *cbrConnector) fetchHistoricalRate(ctx context.Context, valute valuteInfo, date time.Time) (string, error) { +func (c *cbrConnector) fetchHistoricalRate(ctx context.Context, valute valuteInfo, date time.Time) (string, error) { //nolint:funlen query := map[string]string{ "date_req1": date.Format("02/01/2006"), "date_req2": date.Format("02/01/2006"), @@ -366,6 +366,7 @@ func (c *cbrConnector) buildURL(path string, query map[string]string) (string, e return "", merrors.InternalWrap(err, "cbr: parse base url") } base.Path = strings.TrimRight(base.Path, "/") + path + q := base.Query() for key, value := range query { q.Set(key, value) @@ -401,7 +402,7 @@ type valuteMapping struct { byID map[string]valuteInfo } -func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping, error) { +func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping, error) { //nolint:gocognit,nestif byISO := make(map[string]valuteInfo, len(items)) byID := make(map[string]valuteInfo, len(items)) byNum := make(map[string]string, len(items)) @@ -453,11 +454,12 @@ func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping, // 2) Otherwise prefer smaller nominal keepExisting := true - if existing.Nominal != 1 && info.Nominal == 1 { + switch { + case existing.Nominal != 1 && info.Nominal == 1: keepExisting = false - } else if existing.Nominal == 1 && info.Nominal != 1 { + case existing.Nominal == 1 && info.Nominal != 1: keepExisting = true - } else if info.Nominal < existing.Nominal { + case info.Nominal < existing.Nominal: keepExisting = false } @@ -513,7 +515,9 @@ func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping, byNum[isoNum] = id } - logger.Info("Installing currency code", zap.String("iso_code", isoChar), zap.String("id", id), zap.Int64("nominal", nominal)) + logger.Info("Installing currency code", + zap.String("iso_code", isoChar), zap.String("id", id), zap.Int64("nominal", nominal), + ) byISO[isoChar] = info byID[id] = info @@ -546,6 +550,7 @@ func (d *dailyRates) find(id string) *dailyValute { if d == nil { return nil } + for idx := range d.Valutes { if strings.EqualFold(strings.TrimSpace(d.Valutes[idx].ID), id) { return &d.Valutes[idx] @@ -569,7 +574,9 @@ func (d *dynamicRates) find(id string, date time.Time) *dynamicRecord { if d == nil { return nil } + target := date.Format("02.01.2006") + for idx := range d.Records { rec := &d.Records[idx] if !strings.EqualFold(strings.TrimSpace(rec.ID), id) { @@ -663,7 +670,7 @@ func computePrice(value string, nominalStr string) (string, error) { den := big.NewRat(nominal, 1) price := new(big.Rat).Quo(r, den) - return price.FloatString(8), nil + return price.FloatString(8), nil //nolint:mnd } func formatSymbol(iso string, asOf *time.Time) string { diff --git a/api/fx/ingestor/internal/market/coingecko/connector.go b/api/fx/ingestor/internal/market/coingecko/connector.go index 36ecb462..52234a5b 100644 --- a/api/fx/ingestor/internal/market/coingecko/connector.go +++ b/api/fx/ingestor/internal/market/coingecko/connector.go @@ -29,29 +29,36 @@ type coingeckoConnector struct { const defaultCoinGeckoBaseURL = "https://api.coingecko.com/api/v3" const ( - defaultDialTimeoutSeconds = 5 * time.Second - defaultDialKeepAliveSeconds = 30 * time.Second - defaultTLSHandshakeTimeoutSeconds = 5 * time.Second - defaultResponseHeaderTimeoutSeconds = 10 * time.Second - defaultRequestTimeoutSeconds = 10 * time.Second + defaultDialTimeout = 5 * time.Second + defaultDialKeepAlive = 30 * time.Second + defaultTLSHandshakeTimeout = 5 * time.Second + defaultResponseHeaderTimeout = 10 * time.Second + defaultRequestTimeout = 10 * time.Second ) -func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { +const ( + expectedSymbolParts = 2 + tsToMillis = 1000 +) + +func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { //nolint:ireturn baseURL := defaultCoinGeckoBaseURL provider := strings.ToLower(mmodel.DriverCoinGecko.String()) - dialTimeout := defaultDialTimeoutSeconds - dialKeepAlive := defaultDialKeepAliveSeconds - tlsHandshakeTimeout := defaultTLSHandshakeTimeoutSeconds - responseHeaderTimeout := defaultResponseHeaderTimeoutSeconds - requestTimeout := defaultRequestTimeoutSeconds + dialTimeout := defaultDialTimeout + dialKeepAlive := defaultDialKeepAlive + tlsHandshakeTimeout := defaultTLSHandshakeTimeout + responseHeaderTimeout := defaultResponseHeaderTimeout + requestTimeout := defaultRequestTimeout if settings != nil { if value, ok := settings["base_url"].(string); ok && strings.TrimSpace(value) != "" { baseURL = strings.TrimSpace(value) } + if value, ok := settings["provider"].(string); ok && strings.TrimSpace(value) != "" { provider = strings.TrimSpace(value) } + dialTimeout = common.DurationSetting(settings, "dial_timeout_seconds", dialTimeout) dialKeepAlive = common.DurationSetting(settings, "dial_keep_alive_seconds", dialKeepAlive) tlsHandshakeTimeout = common.DurationSetting(settings, "tls_handshake_timeout_seconds", tlsHandshakeTimeout) @@ -88,6 +95,7 @@ func (c *coingeckoConnector) ID() mmodel.Driver { return c.id } +//nolint:cyclop,funlen func (c *coingeckoConnector) FetchTicker(ctx context.Context, symbol string) (*mmodel.Ticker, error) { coinID, vsCurrency, err := parseSymbol(symbol) if err != nil { @@ -98,6 +106,7 @@ func (c *coingeckoConnector) FetchTicker(ctx context.Context, symbol string) (*m if err != nil { return nil, merrors.InternalWrap(err, "coingecko: parse base url") } + endpoint.Path = strings.TrimRight(endpoint.Path, "/") + "/simple/price" query := endpoint.Query() query.Set("ids", coinID) @@ -113,44 +122,51 @@ func (c *coingeckoConnector) FetchTicker(ctx context.Context, symbol string) (*m resp, err := c.client.Do(req) if err != nil { c.logger.Warn("CoinGecko request failed", zap.String("symbol", symbol), zap.Error(err)) + return nil, merrors.InternalWrap(err, "coingecko: request failed") } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { c.logger.Warn("CoinGecko returned non-OK status", zap.String("symbol", symbol), zap.Int("status", resp.StatusCode)) + return nil, merrors.Internal("coingecko: unexpected status " + strconv.Itoa(resp.StatusCode)) } decoder := json.NewDecoder(resp.Body) decoder.UseNumber() - var payload map[string]map[string]interface{} - if err := decoder.Decode(&payload); err != nil { - c.logger.Warn("CoinGecko decode failed", zap.String("symbol", symbol), zap.Error(err)) - return nil, merrors.InternalWrap(err, "coingecko: decode response") + var payload map[string]map[string]any + + decodeErr := decoder.Decode(&payload) + if decodeErr != nil { + c.logger.Warn("CoinGecko decode failed", zap.String("symbol", symbol), zap.Error(decodeErr)) + + return nil, merrors.InternalWrap(decodeErr, "coingecko: decode response") } - coinData, ok := payload[coinID] - if !ok { + coinData, coinFound := payload[coinID] + if !coinFound { return nil, merrors.Internal("coingecko: coin id not found in response") } - priceValue, ok := coinData[vsCurrency] - if !ok { + + priceValue, priceFound := coinData[vsCurrency] + if !priceFound { return nil, merrors.Internal("coingecko: vs currency not found in response") } - price, ok := toFloat(priceValue) - if !ok || price <= 0 { + price, priceOk := toFloat(priceValue) + if !priceOk || price <= 0 { return nil, merrors.Internal("coingecko: invalid price value in response") } priceStr := strconv.FormatFloat(price, 'f', -1, 64) timestamp := time.Now().UnixMilli() - if tsValue, ok := coinData["last_updated_at"]; ok { - if tsFloat, ok := toFloat(tsValue); ok && tsFloat > 0 { - tsMillis := int64(tsFloat * 1000) + + if tsValue, tsFound := coinData["last_updated_at"]; tsFound { + if tsFloat, tsOk := toFloat(tsValue); tsOk && tsFloat > 0 { + tsMillis := int64(tsFloat * tsToMillis) if tsMillis > 0 { timestamp = tsMillis } @@ -179,14 +195,16 @@ func parseSymbol(symbol string) (string, string, error) { case ':', '/', '-', '_': return true } + return false }) - if len(parts) != 2 { + if len(parts) != expectedSymbolParts { return "", "", merrors.InvalidArgument("coingecko: symbol must be /", "symbol") } coinID := strings.TrimSpace(parts[0]) + vsCurrency := strings.TrimSpace(parts[1]) if coinID == "" || vsCurrency == "" { return "", "", merrors.InvalidArgument("coingecko: symbol contains empty segments", "symbol") @@ -195,28 +213,31 @@ func parseSymbol(symbol string) (string, string, error) { return coinID, vsCurrency, nil } -func toFloat(value interface{}) (float64, bool) { - switch v := value.(type) { +func toFloat(value any) (float64, bool) { + switch val := value.(type) { case json.Number: - f, err := v.Float64() + f, err := val.Float64() if err != nil { return 0, false } + return f, true case float64: - return v, true + return val, true case float32: - return float64(v), true + return float64(val), true case int: - return float64(v), true + return float64(val), true case int64: - return float64(v), true + return float64(val), true case uint64: - return float64(v), true + return float64(val), true case string: - if parsed, err := strconv.ParseFloat(v, 64); err == nil { + parsed, parseErr := strconv.ParseFloat(val, 64) + if parseErr == nil { return parsed, true } } + return 0, false } diff --git a/api/fx/ingestor/internal/market/common/settings.go b/api/fx/ingestor/internal/market/common/settings.go index e58c450e..26951a62 100644 --- a/api/fx/ingestor/internal/market/common/settings.go +++ b/api/fx/ingestor/internal/market/common/settings.go @@ -1,4 +1,4 @@ -package common +package common //nolint:revive // package provides shared market connector utilities import ( "strconv" @@ -8,39 +8,46 @@ import ( ) // DurationSetting reads a positive duration override from settings or returns def when the value is missing or invalid. +// +//nolint:cyclop func DurationSetting(settings model.SettingsT, key string, def time.Duration) time.Duration { if settings == nil { return def } + value, ok := settings[key] if !ok { return def } - switch v := value.(type) { + switch val := value.(type) { case time.Duration: - if v > 0 { - return v + if val > 0 { + return val } case int: - if v > 0 { - return time.Duration(v) * time.Second + if val > 0 { + return time.Duration(val) * time.Second } case int64: - if v > 0 { - return time.Duration(v) * time.Second + if val > 0 { + return time.Duration(val) * time.Second } case float64: - if v > 0 { - return time.Duration(v * float64(time.Second)) + if val > 0 { + return time.Duration(val * float64(time.Second)) } case string: - if parsed, err := time.ParseDuration(v); err == nil && parsed > 0 { + parsed, parseErr := time.ParseDuration(val) + if parseErr == nil && parsed > 0 { return parsed } - if seconds, err := strconv.ParseFloat(v, 64); err == nil && seconds > 0 { + + seconds, floatErr := strconv.ParseFloat(val, 64) + if floatErr == nil && seconds > 0 { return time.Duration(seconds * float64(time.Second)) } } + return def } diff --git a/api/fx/ingestor/internal/metrics/server.go b/api/fx/ingestor/internal/metrics/server.go index 12f04c5d..85aa1ce4 100644 --- a/api/fx/ingestor/internal/metrics/server.go +++ b/api/fx/ingestor/internal/metrics/server.go @@ -24,16 +24,19 @@ const ( ) type Server interface { - SetStatus(health.ServiceStatus) - Close(context.Context) + SetStatus(status health.ServiceStatus) + Close(ctx context.Context) } +//nolint:ireturn func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error) { if logger == nil { return nil, merrors.InvalidArgument("metrics: logger is nil") } + if cfg == nil || !cfg.Enabled { logger.Debug("Metrics disabled; using noop server") + return noopServer{}, nil } @@ -47,7 +50,9 @@ func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error) router.Handle("/metrics", promhttp.Handler()) var healthRouter routers.Health - if hr, err := routers.NewHealthRouter(metricsLogger, router, ""); err != nil { + + hr, err := routers.NewHealthRouter(metricsLogger, router, "") + if err != nil { metricsLogger.Warn("Failed to initialise health router", zap.Error(err)) } else { hr.SetStatus(health.SSStarting) @@ -60,7 +65,7 @@ func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error) ReadHeaderTimeout: readHeaderTimeout, } - ms := &httpServerWrapper{ + wrapper := &httpServerWrapper{ logger: metricsLogger, server: httpServer, health: healthRouter, @@ -69,7 +74,9 @@ func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error) go func() { metricsLogger.Info("Prometheus endpoint listening", zap.String("address", address)) - if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + + err := httpServer.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { metricsLogger.Error("Prometheus endpoint stopped unexpectedly", zap.Error(err)) if healthRouter != nil { healthRouter.SetStatus(health.SSTerminating) @@ -77,7 +84,7 @@ func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error) } }() - return ms, nil + return wrapper, nil } type httpServerWrapper struct { @@ -91,6 +98,7 @@ func (s *httpServerWrapper) SetStatus(status health.ServiceStatus) { if s == nil || s.health == nil { return } + s.logger.Debug("Updating metrics health status", zap.String("status", string(status))) s.health.SetStatus(status) } @@ -110,10 +118,12 @@ func (s *httpServerWrapper) Close(ctx context.Context) { return } + //nolint:contextcheck shutdownCtx := ctx if shutdownCtx == nil { shutdownCtx = context.Background() } + if s.timeout > 0 { var cancel context.CancelFunc shutdownCtx, cancel = context.WithTimeout(shutdownCtx, s.timeout) @@ -129,6 +139,6 @@ func (s *httpServerWrapper) Close(ctx context.Context) { type noopServer struct{} -func (noopServer) SetStatus(health.ServiceStatus) {} +func (noopServer) SetStatus(_ health.ServiceStatus) {} -func (noopServer) Close(context.Context) {} +func (noopServer) Close(_ context.Context) {} diff --git a/api/fx/ingestor/main.go b/api/fx/ingestor/main.go index 7f477080..468ae6d1 100644 --- a/api/fx/ingestor/main.go +++ b/api/fx/ingestor/main.go @@ -26,16 +26,18 @@ func main() { flag.Parse() logger := lf.NewLogger(*debugFlag).Named("fx_ingestor") - logger = logger.With(zap.String("instance_id", discovery.InstanceID())) - defer logger.Sync() - av := appversion.Create() + logger = logger.With(zap.String("instance_id", discovery.InstanceID())) + defer logger.Sync() //nolint:errcheck + + appVersion := appversion.Create() if *versionFlag { - fmt.Fprintln(os.Stdout, av.Print()) + fmt.Fprintln(os.Stdout, appVersion.Print()) + return } - logger.Info(fmt.Sprintf("Starting %s", av.Program()), zap.String("version", av.Info())) + logger.Info("Starting "+appVersion.Program(), zap.String("version", appVersion.Info())) ctx, cancel := signalctx.WithSignals(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() @@ -47,8 +49,10 @@ func main() { if err := application.Run(ctx); err != nil { if errors.Is(err, context.Canceled) { logger.Info("FX ingestor stopped") + return } + logger.Error("Ingestor terminated with error", zap.Error(err)) } } diff --git a/api/gateway/chain/go.mod b/api/gateway/chain/go.mod index 0de5737e..70ac074a 100644 --- a/api/gateway/chain/go.mod +++ b/api/gateway/chain/go.mod @@ -22,7 +22,7 @@ require ( require ( github.com/Microsoft/go-winio v0.6.2 // indirect - github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260208002143-2551aa251e34 // indirect + github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260212005555-3a7e5700f354 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.24.4 // indirect github.com/bmatcuk/doublestar/v4 v4.10.0 // indirect diff --git a/api/gateway/chain/go.sum b/api/gateway/chain/go.sum index a976f0d5..ac0b2ebb 100644 --- a/api/gateway/chain/go.sum +++ b/api/gateway/chain/go.sum @@ -6,8 +6,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= -github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260208002143-2551aa251e34 h1:AyAPL6pTcPPpfZsNtOTFhxyOokKBLnrbbaV42g6Z9v0= -github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260208002143-2551aa251e34/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI= +github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260212005555-3a7e5700f354 h1:BgaMXBpcqcW74afzqI3iKo07K3tC+VuyWU3/FIvLlNI= +github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260212005555-3a7e5700f354/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI= github.com/VictoriaMetrics/fastcache v1.13.0 h1:AW4mheMR5Vd9FkAPUv+NH6Nhw+fmbTMGMsNAoA/+4G0= github.com/VictoriaMetrics/fastcache v1.13.0/go.mod h1:hHXhl4DA2fTL2HTZDJFXWgW0LNjo6B+4aj2Wmng3TjU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/api/gateway/chain/internal/service/gateway/service.go b/api/gateway/chain/internal/service/gateway/service.go index db2e8de8..dba70a8b 100644 --- a/api/gateway/chain/internal/service/gateway/service.go +++ b/api/gateway/chain/internal/service/gateway/service.go @@ -199,16 +199,22 @@ func (s *Service) startDiscoveryAnnouncers() { } version := appversion.Create().Short() for _, network := range s.networks { - currencies := []string{shared.NativeCurrency(network)} + currencies := []discovery.CurrencyAnnouncement{{ + Currency: shared.NativeCurrency(network), + Network: string(network.Name), + }} for _, token := range network.TokenConfigs { if token.Symbol != "" { - currencies = append(currencies, token.Symbol) + currencies = append(currencies, discovery.CurrencyAnnouncement{ + Currency: token.Symbol, + Network: string(network.Name), + ContractAddress: token.ContractAddress, + }) } } announce := discovery.Announcement{ Service: "CRYPTO_RAIL_GATEWAY", Rail: "CRYPTO", - Network: string(network.Name), Operations: []string{"balance.read", "payin.crypto", "payout.crypto", "fee.send", "observe.confirm"}, Currencies: currencies, InvokeURI: s.invokeURI, diff --git a/api/gateway/mntx/README.md b/api/gateway/mntx/README.md index eff7d1d8..0dc2a9e6 100644 --- a/api/gateway/mntx/README.md +++ b/api/gateway/mntx/README.md @@ -13,7 +13,7 @@ This service now supports Monetix “payout by card”. - `MONETIX_PROJECT_ID` – integer project ID - `MONETIX_SECRET_KEY` – signature secret - Optional: `allowed_currencies`, `require_customer_address`, `request_timeout_seconds` -- Gateway descriptor: `gateway.id`, optional `gateway.currencies`, `gateway.limits` +- Gateway descriptor: `gateway.id`, optional `gateway.currencies`, `gateway.limits` (for per-payout minimum use `gateway.limits.per_tx_min_amount`) - Callback server: `MNTX_GATEWAY_HTTP_PORT` (exposed as 8084), `http.callback.path`, optional `allowed_cidrs` ## Outbound request (CreateCardPayout) diff --git a/api/gateway/mntx/config.dev.yml b/api/gateway/mntx/config.dev.yml index 06bd17dd..a9b5261c 100644 --- a/api/gateway/mntx/config.dev.yml +++ b/api/gateway/mntx/config.dev.yml @@ -51,7 +51,7 @@ gateway: network: "MIR" currencies: ["RUB"] limits: - min_amount: "0" + per_tx_min_amount: "0" http: callback: diff --git a/api/gateway/mntx/config.yml b/api/gateway/mntx/config.yml index ae265c35..48d98d51 100644 --- a/api/gateway/mntx/config.yml +++ b/api/gateway/mntx/config.yml @@ -51,7 +51,7 @@ gateway: network: "MIR" currencies: ["RUB"] limits: - min_amount: "0" + per_tx_min_amount: "100.00" http: callback: diff --git a/api/gateway/mntx/internal/service/gateway/card_processor.go b/api/gateway/mntx/internal/service/gateway/card_processor.go index db8a9c0a..29027ff4 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor.go @@ -4,9 +4,11 @@ import ( "context" "encoding/json" "errors" + "fmt" "net/http" "strings" + "github.com/shopspring/decimal" "github.com/tech/sendico/gateway/mntx/internal/service/monetix" "github.com/tech/sendico/gateway/mntx/storage" "github.com/tech/sendico/gateway/mntx/storage/model" @@ -15,6 +17,7 @@ import ( "github.com/tech/sendico/pkg/merrors" msg "github.com/tech/sendico/pkg/messaging" "github.com/tech/sendico/pkg/mlogger" + gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1" mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" "go.mongodb.org/mongo-driver/v2/bson" "go.uber.org/zap" @@ -27,6 +30,9 @@ type cardPayoutProcessor struct { store storage.Repository httpClient *http.Client producer msg.Producer + + perTxMinAmountMinor int64 + perTxMinAmountMinorByCurrency map[string]int64 } func mergePayoutStateWithExisting(state, existing *model.CardPayout) { @@ -118,6 +124,90 @@ func newCardPayoutProcessor( } } +func (p *cardPayoutProcessor) applyGatewayDescriptor(descriptor *gatewayv1.GatewayInstanceDescriptor) { + if p == nil { + return + } + minAmountMinor, perCurrency := perTxMinAmountPolicy(descriptor) + p.perTxMinAmountMinor = minAmountMinor + p.perTxMinAmountMinorByCurrency = perCurrency +} + +func perTxMinAmountPolicy(descriptor *gatewayv1.GatewayInstanceDescriptor) (int64, map[string]int64) { + if descriptor == nil || descriptor.GetLimits() == nil { + return 0, nil + } + limits := descriptor.GetLimits() + globalMin, _ := decimalAmountToMinor(firstNonEmpty(limits.GetPerTxMinAmount(), limits.GetMinAmount())) + perCurrency := map[string]int64{} + for currency, override := range limits.GetCurrencyLimits() { + if override == nil { + continue + } + minor, ok := decimalAmountToMinor(override.GetMinAmount()) + if !ok { + continue + } + code := strings.ToUpper(strings.TrimSpace(currency)) + if code == "" { + continue + } + perCurrency[code] = minor + } + if len(perCurrency) == 0 { + perCurrency = nil + } + return globalMin, perCurrency +} + +func decimalAmountToMinor(raw string) (int64, bool) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0, false + } + value, err := decimal.NewFromString(raw) + if err != nil || !value.IsPositive() { + return 0, false + } + minor := value.Mul(decimal.NewFromInt(100)).Ceil().IntPart() + if minor <= 0 { + return 0, false + } + return minor, true +} + +func (p *cardPayoutProcessor) validatePerTxMinimum(amountMinor int64, currency string) error { + if p == nil { + return nil + } + minAmountMinor := p.perTxMinimum(currency) + if minAmountMinor <= 0 || amountMinor >= minAmountMinor { + return nil + } + return newPayoutError("amount_below_minimum", merrors.InvalidArgument( + fmt.Sprintf("amount_minor must be at least %d", minAmountMinor), + "amount_minor", + )) +} + +func (p *cardPayoutProcessor) perTxMinimum(currency string) int64 { + if p == nil { + return 0 + } + minAmountMinor := p.perTxMinAmountMinor + if len(p.perTxMinAmountMinorByCurrency) == 0 { + return minAmountMinor + } + code := strings.ToUpper(strings.TrimSpace(currency)) + if code == "" { + return minAmountMinor + } + if override, ok := p.perTxMinAmountMinorByCurrency[code]; ok && override > 0 { + return override + } + return minAmountMinor +} + func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) { if p == nil { return nil, merrors.Internal("card payout processor not initialised") @@ -147,6 +237,17 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout ) return nil, err } + if err := p.validatePerTxMinimum(req.GetAmountMinor(), req.GetCurrency()); err != nil { + p.logger.Warn("Card payout amount below configured minimum", + zap.String("payout_id", req.GetPayoutId()), + zap.String("customer_id", req.GetCustomerId()), + zap.Int64("amount_minor", req.GetAmountMinor()), + zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))), + zap.Int64("configured_min_amount_minor", p.perTxMinimum(req.GetCurrency())), + zap.Error(err), + ) + return nil, err + } projectID, err := p.resolveProjectID(req.GetProjectId(), "payout_id", req.GetPayoutId()) if err != nil { @@ -257,6 +358,17 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT ) return nil, err } + if err := p.validatePerTxMinimum(req.GetAmountMinor(), req.GetCurrency()); err != nil { + p.logger.Warn("Card token payout amount below configured minimum", + zap.String("payout_id", req.GetPayoutId()), + zap.String("customer_id", req.GetCustomerId()), + zap.Int64("amount_minor", req.GetAmountMinor()), + zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))), + zap.Int64("configured_min_amount_minor", p.perTxMinimum(req.GetCurrency())), + zap.Error(err), + ) + return nil, err + } projectID, err := p.resolveProjectID(req.GetProjectId(), "payout_id", req.GetPayoutId()) if err != nil { diff --git a/api/gateway/mntx/internal/service/gateway/card_processor_test.go b/api/gateway/mntx/internal/service/gateway/card_processor_test.go index e1342ab1..5879f4cf 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor_test.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor_test.go @@ -14,6 +14,7 @@ import ( "github.com/tech/sendico/gateway/mntx/storage/model" clockpkg "github.com/tech/sendico/pkg/clock" "github.com/tech/sendico/pkg/merrors" + gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1" mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" "go.uber.org/zap" ) @@ -119,6 +120,63 @@ func TestCardPayoutProcessor_Submit_MissingConfig(t *testing.T) { } } +func TestCardPayoutProcessor_Submit_RejectsAmountBelowConfiguredMinimum(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)}, + repo, + &http.Client{}, + nil, + ) + processor.applyGatewayDescriptor(&gatewayv1.GatewayInstanceDescriptor{ + Limits: &gatewayv1.Limits{ + PerTxMinAmount: "20.00", + }, + }) + + req := validCardPayoutRequest() // 15.00 RUB + _, err := processor.Submit(context.Background(), req) + requireReason(t, err, "amount_below_minimum") +} + +func TestCardPayoutProcessor_SubmitToken_RejectsAmountBelowCurrencyMinimum(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + AllowedCurrencies: []string{"USD"}, + } + + repo := newMockRepository() + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)}, + repo, + &http.Client{}, + nil, + ) + processor.applyGatewayDescriptor(&gatewayv1.GatewayInstanceDescriptor{ + Limits: &gatewayv1.Limits{ + PerTxMinAmount: "20.00", + CurrencyLimits: map[string]*gatewayv1.LimitsOverride{ + "USD": {MinAmount: "30.00"}, + }, + }, + }) + + req := validCardTokenPayoutRequest() // 25.00 USD + _, err := processor.SubmitToken(context.Background(), req) + requireReason(t, err, "amount_below_minimum") +} + func TestCardPayoutProcessor_ProcessCallback(t *testing.T) { cfg := monetix.Config{ SecretKey: "secret", diff --git a/api/gateway/mntx/internal/service/gateway/service.go b/api/gateway/mntx/internal/service/gateway/service.go index 37a90425..9331d2bf 100644 --- a/api/gateway/mntx/internal/service/gateway/service.go +++ b/api/gateway/mntx/internal/service/gateway/service.go @@ -85,6 +85,7 @@ func NewService(logger mlogger.Logger, opts ...Option) *Service { } svc.card = newCardPayoutProcessor(svc.logger, svc.config, svc.clock, svc.storage, svc.httpClient, svc.producer) + svc.card.applyGatewayDescriptor(svc.gatewayDescriptor) svc.startDiscoveryAnnouncer() return svc @@ -149,44 +150,132 @@ func (s *Service) startDiscoveryAnnouncer() { if id := strings.TrimSpace(s.gatewayDescriptor.GetId()); id != "" { announce.ID = id } - announce.Network = strings.TrimSpace(s.gatewayDescriptor.GetNetwork()) - announce.Currencies = append([]string(nil), s.gatewayDescriptor.GetCurrencies()...) - announce.Limits = limitsFromDescriptor(s.gatewayDescriptor.GetLimits()) + announce.Currencies = currenciesFromDescriptor(s.gatewayDescriptor) } s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.MntxGateway), announce) s.announcer.Start() } -func limitsFromDescriptor(src *gatewayv1.Limits) *discovery.Limits { +func currenciesFromDescriptor(src *gatewayv1.GatewayInstanceDescriptor) []discovery.CurrencyAnnouncement { if src == nil { return nil } - limits := &discovery.Limits{ - MinAmount: strings.TrimSpace(src.GetMinAmount()), - MaxAmount: strings.TrimSpace(src.GetMaxAmount()), - VolumeLimit: map[string]string{}, - VelocityLimit: map[string]int{}, + network := strings.TrimSpace(src.GetNetwork()) + limitsCfg := src.GetLimits() + values := src.GetCurrencies() + if len(values) == 0 { + return nil } - for key, value := range src.GetVolumeLimit() { - k := strings.TrimSpace(key) - v := strings.TrimSpace(value) - if k == "" || v == "" { + seen := map[string]bool{} + result := make([]discovery.CurrencyAnnouncement, 0, len(values)) + for _, value := range values { + currency := strings.ToUpper(strings.TrimSpace(value)) + if currency == "" || seen[currency] { continue } - limits.VolumeLimit[k] = v + seen[currency] = true + result = append(result, discovery.CurrencyAnnouncement{ + Currency: currency, + Network: network, + Limits: currencyLimitsFromDescriptor(limitsCfg, currency), + }) } - for key, value := range src.GetVelocityLimit() { - k := strings.TrimSpace(key) - if k == "" { + if len(result) == 0 { + return nil + } + return result +} + +func currencyLimitsFromDescriptor(src *gatewayv1.Limits, currency string) *discovery.CurrencyLimits { + if src == nil { + return nil + } + amountMin := firstNonEmpty(src.GetPerTxMinAmount(), src.GetMinAmount()) + amountMax := firstNonEmpty(src.GetPerTxMaxAmount(), src.GetMaxAmount()) + + limits := &discovery.CurrencyLimits{} + if amountMin != "" || amountMax != "" { + limits.Amount = &discovery.CurrencyAmount{ + Min: amountMin, + Max: amountMax, + } + } + + running := &discovery.CurrencyRunningLimits{} + for bucket, max := range src.GetVolumeLimit() { + bucket = strings.TrimSpace(bucket) + max = strings.TrimSpace(max) + if bucket == "" || max == "" { continue } - limits.VelocityLimit[k] = int(value) + running.Volume = append(running.Volume, discovery.VolumeLimit{ + Window: discovery.Window{ + Raw: bucket, + Named: bucket, + }, + Max: max, + }) } - if len(limits.VolumeLimit) == 0 { - limits.VolumeLimit = nil + for bucket, max := range src.GetVelocityLimit() { + bucket = strings.TrimSpace(bucket) + if bucket == "" || max <= 0 { + continue + } + running.Velocity = append(running.Velocity, discovery.VelocityLimit{ + Window: discovery.Window{ + Raw: bucket, + Named: bucket, + }, + Max: int(max), + }) } - if len(limits.VelocityLimit) == 0 { - limits.VelocityLimit = nil + if override := src.GetCurrencyLimits()[strings.ToUpper(strings.TrimSpace(currency))]; override != nil { + if min := strings.TrimSpace(override.GetMinAmount()); min != "" { + if limits.Amount == nil { + limits.Amount = &discovery.CurrencyAmount{} + } + limits.Amount.Min = min + } + if max := strings.TrimSpace(override.GetMaxAmount()); max != "" { + if limits.Amount == nil { + limits.Amount = &discovery.CurrencyAmount{} + } + limits.Amount.Max = max + } + if maxVolume := strings.TrimSpace(override.GetMaxVolume()); maxVolume != "" { + running.Volume = append(running.Volume, discovery.VolumeLimit{ + Window: discovery.Window{ + Raw: "default", + Named: "default", + }, + Max: maxVolume, + }) + } + if maxOps := int(override.GetMaxOps()); maxOps > 0 { + running.Velocity = append(running.Velocity, discovery.VelocityLimit{ + Window: discovery.Window{ + Raw: "default", + Named: "default", + }, + Max: maxOps, + }) + } + } + if len(running.Volume) > 0 || len(running.Velocity) > 0 { + limits.Running = running + } + if limits.Amount == nil && limits.Running == nil { + return nil } return limits } + +func firstNonEmpty(values ...string) string { + for _, value := range values { + clean := strings.TrimSpace(value) + if clean != "" { + return clean + } + } + return "" +} diff --git a/api/gateway/tron/go.mod b/api/gateway/tron/go.mod index fe4ea9a2..f78dcb1b 100644 --- a/api/gateway/tron/go.mod +++ b/api/gateway/tron/go.mod @@ -24,7 +24,7 @@ require ( require ( github.com/Microsoft/go-winio v0.6.2 // indirect - github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260208002143-2551aa251e34 // indirect + github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260212005555-3a7e5700f354 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.24.4 // indirect github.com/bmatcuk/doublestar/v4 v4.10.0 // indirect diff --git a/api/gateway/tron/go.sum b/api/gateway/tron/go.sum index 40a6d5f7..4e88971e 100644 --- a/api/gateway/tron/go.sum +++ b/api/gateway/tron/go.sum @@ -6,8 +6,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= -github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260208002143-2551aa251e34 h1:AyAPL6pTcPPpfZsNtOTFhxyOokKBLnrbbaV42g6Z9v0= -github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260208002143-2551aa251e34/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI= +github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260212005555-3a7e5700f354 h1:BgaMXBpcqcW74afzqI3iKo07K3tC+VuyWU3/FIvLlNI= +github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260212005555-3a7e5700f354/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI= github.com/VictoriaMetrics/fastcache v1.13.0 h1:AW4mheMR5Vd9FkAPUv+NH6Nhw+fmbTMGMsNAoA/+4G0= github.com/VictoriaMetrics/fastcache v1.13.0/go.mod h1:hHXhl4DA2fTL2HTZDJFXWgW0LNjo6B+4aj2Wmng3TjU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/api/gateway/tron/internal/service/gateway/service.go b/api/gateway/tron/internal/service/gateway/service.go index 9947accb..5284bf67 100644 --- a/api/gateway/tron/internal/service/gateway/service.go +++ b/api/gateway/tron/internal/service/gateway/service.go @@ -203,16 +203,22 @@ func (s *Service) startDiscoveryAnnouncers() { } version := appversion.Create().Short() for _, network := range s.networks { - currencies := []string{shared.NativeCurrency(network)} + currencies := []discovery.CurrencyAnnouncement{{ + Currency: shared.NativeCurrency(network), + Network: network.Name.String(), + }} for _, token := range network.TokenConfigs { if token.Symbol != "" { - currencies = append(currencies, token.Symbol) + currencies = append(currencies, discovery.CurrencyAnnouncement{ + Currency: token.Symbol, + Network: network.Name.String(), + ContractAddress: token.ContractAddress, + }) } } announce := discovery.Announcement{ Service: "CRYPTO_RAIL_GATEWAY", Rail: "CRYPTO", - Network: network.Name.String(), Operations: []string{"balance.read", "payin.crypto", "payout.crypto", "fee.send", "observe.confirm"}, Currencies: currencies, InvokeURI: s.invokeURI, diff --git a/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry.go b/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry.go index e5afe437..9d2792e8 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry.go +++ b/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry.go @@ -51,7 +51,7 @@ func (r *discoveryGatewayRegistry) List(_ context.Context) ([]*model.GatewayInst InvokeURI: strings.TrimSpace(entry.InvokeURI), Currencies: normalizeCurrencies(entry.Currencies), Capabilities: capabilitiesFromOps(entry.Operations), - Limits: limitsFromDiscovery(entry.Limits), + Limits: limitsFromDiscovery(entry.Limits, entry.CurrencyMeta), Version: entry.Version, IsEnabled: entry.Healthy, }) @@ -102,36 +102,111 @@ func capabilitiesFromOps(ops []string) model.RailCapabilities { return cap } -func limitsFromDiscovery(src *discovery.Limits) model.Limits { - if src == nil { - return model.Limits{} - } +func limitsFromDiscovery(src *discovery.Limits, currencies []discovery.CurrencyAnnouncement) model.Limits { limits := model.Limits{ - MinAmount: strings.TrimSpace(src.MinAmount), - MaxAmount: strings.TrimSpace(src.MaxAmount), - VolumeLimit: map[string]string{}, - VelocityLimit: map[string]int{}, + VolumeLimit: map[string]string{}, + VelocityLimit: map[string]int{}, + CurrencyLimits: map[string]model.LimitsOverride{}, } - for key, value := range src.VolumeLimit { - k := strings.TrimSpace(key) - v := strings.TrimSpace(value) - if k == "" || v == "" { - continue + if src != nil { + limits.MinAmount = strings.TrimSpace(src.MinAmount) + limits.MaxAmount = strings.TrimSpace(src.MaxAmount) + for key, value := range src.VolumeLimit { + k := strings.TrimSpace(key) + v := strings.TrimSpace(value) + if k == "" || v == "" { + continue + } + limits.VolumeLimit[k] = v } - limits.VolumeLimit[k] = v - } - for key, value := range src.VelocityLimit { - k := strings.TrimSpace(key) - if k == "" { - continue + for key, value := range src.VelocityLimit { + k := strings.TrimSpace(key) + if k == "" { + continue + } + limits.VelocityLimit[k] = value } - limits.VelocityLimit[k] = value } + applyCurrencyTransferLimits(&limits, currencies) if len(limits.VolumeLimit) == 0 { limits.VolumeLimit = nil } if len(limits.VelocityLimit) == 0 { limits.VelocityLimit = nil } + if len(limits.CurrencyLimits) == 0 { + limits.CurrencyLimits = nil + } return limits } + +func applyCurrencyTransferLimits(dst *model.Limits, currencies []discovery.CurrencyAnnouncement) { + if dst == nil || len(currencies) == 0 { + return + } + var ( + commonMin string + commonMax string + commonMinInit bool + commonMaxInit bool + commonMinConsistent = true + commonMaxConsistent = true + ) + + for _, currency := range currencies { + code := strings.ToUpper(strings.TrimSpace(currency.Currency)) + if code == "" || currency.Limits == nil || currency.Limits.Amount == nil { + commonMinConsistent = false + commonMaxConsistent = false + continue + } + min := strings.TrimSpace(currency.Limits.Amount.Min) + max := strings.TrimSpace(currency.Limits.Amount.Max) + + if min != "" || max != "" { + override := dst.CurrencyLimits[code] + if min != "" { + override.MinAmount = min + } + if max != "" { + override.MaxAmount = max + } + if override.MinAmount != "" || override.MaxAmount != "" || override.MaxFee != "" || override.MaxOps > 0 || override.MaxVolume != "" { + dst.CurrencyLimits[code] = override + } + } + + if min == "" { + commonMinConsistent = false + } else if !commonMinInit { + commonMin = min + commonMinInit = true + } else if commonMin != min { + commonMinConsistent = false + } + + if max == "" { + commonMaxConsistent = false + } else if !commonMaxInit { + commonMax = max + commonMaxInit = true + } else if commonMax != max { + commonMaxConsistent = false + } + } + + if commonMinInit && commonMinConsistent { + dst.PerTxMinAmount = firstDiscoveryLimitValue(dst.PerTxMinAmount, commonMin) + } + if commonMaxInit && commonMaxConsistent { + dst.PerTxMaxAmount = firstDiscoveryLimitValue(dst.PerTxMaxAmount, commonMax) + } +} + +func firstDiscoveryLimitValue(primary, fallback string) string { + primary = strings.TrimSpace(primary) + if primary != "" { + return primary + } + return strings.TrimSpace(fallback) +} diff --git a/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry_test.go b/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry_test.go new file mode 100644 index 00000000..d5a15ef1 --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry_test.go @@ -0,0 +1,62 @@ +package orchestrator + +import ( + "testing" + + "github.com/tech/sendico/pkg/discovery" +) + +func TestLimitsFromDiscovery_MapsPerTxMinimumFromCurrencyMeta(t *testing.T) { + limits := limitsFromDiscovery(nil, []discovery.CurrencyAnnouncement{ + { + Currency: "RUB", + Limits: &discovery.CurrencyLimits{ + Amount: &discovery.CurrencyAmount{ + Min: "100.00", + Max: "10000.00", + }, + }, + }, + }) + + if limits.PerTxMinAmount != "100.00" { + t.Fatalf("expected per tx min 100.00, got %q", limits.PerTxMinAmount) + } + if limits.PerTxMaxAmount != "10000.00" { + t.Fatalf("expected per tx max 10000.00, got %q", limits.PerTxMaxAmount) + } + override, ok := limits.CurrencyLimits["RUB"] + if !ok { + t.Fatalf("expected RUB currency override") + } + if override.MinAmount != "100.00" { + t.Fatalf("expected RUB min override 100.00, got %q", override.MinAmount) + } +} + +func TestLimitsFromDiscovery_DropsCommonPerTxMinimumWhenCurrenciesDiffer(t *testing.T) { + limits := limitsFromDiscovery(nil, []discovery.CurrencyAnnouncement{ + { + Currency: "USD", + Limits: &discovery.CurrencyLimits{ + Amount: &discovery.CurrencyAmount{Min: "10.00"}, + }, + }, + { + Currency: "EUR", + Limits: &discovery.CurrencyLimits{ + Amount: &discovery.CurrencyAmount{Min: "20.00"}, + }, + }, + }) + + if limits.PerTxMinAmount != "" { + t.Fatalf("expected empty common per tx min, got %q", limits.PerTxMinAmount) + } + if limits.CurrencyLimits["USD"].MinAmount != "10.00" { + t.Fatalf("expected USD min override 10.00, got %q", limits.CurrencyLimits["USD"].MinAmount) + } + if limits.CurrencyLimits["EUR"].MinAmount != "20.00" { + t.Fatalf("expected EUR min override 20.00, got %q", limits.CurrencyLimits["EUR"].MinAmount) + } +} diff --git a/api/payments/orchestrator/internal/service/orchestrator/handlers_commands.go b/api/payments/orchestrator/internal/service/orchestrator/handlers_commands.go index 36b6f394..6a72d9aa 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/handlers_commands.go +++ b/api/payments/orchestrator/internal/service/orchestrator/handlers_commands.go @@ -55,6 +55,9 @@ func (h *initiatePaymentsCommand) Execute(ctx context.Context, req *orchestrator } return gsresponse.Auto[orchestratorv1.InitiatePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err) } + if note := strings.TrimSpace(record.ExecutionNote); note != "" { + return gsresponse.FailedPrecondition[orchestratorv1.InitiatePaymentsResponse](h.logger, mservice.PaymentOrchestrator, "quote_not_executable", merrors.InvalidArgument(note)) + } intents := record.Intents quotes := record.Quotes @@ -209,6 +212,8 @@ func (h *initiatePaymentCommand) Execute(ctx context.Context, req *orchestratorv return gsresponse.FailedPrecondition[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, qerr.code, qerr.err) case "quote_expired": return gsresponse.FailedPrecondition[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, qerr.code, qerr.err) + case "quote_not_executable": + return gsresponse.FailedPrecondition[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, qerr.code, qerr.err) case "quote_intent_mismatch": return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, qerr.err) default: diff --git a/api/payments/orchestrator/internal/service/orchestrator/service_helpers.go b/api/payments/orchestrator/internal/service/orchestrator/service_helpers.go index ed92c95f..38052c9f 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service_helpers.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service_helpers.go @@ -123,6 +123,9 @@ func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInp if !record.ExpiresAt.IsZero() && s.clock.Now().After(record.ExpiresAt) { return nil, nil, nil, quoteResolutionError{code: "quote_expired", err: merrors.InvalidArgument("quote_ref expired")} } + if note := strings.TrimSpace(record.ExecutionNote); note != "" { + return nil, nil, nil, quoteResolutionError{code: "quote_not_executable", err: merrors.InvalidArgument(note)} + } intent, err := recordIntentFromQuote(record) if err != nil { return nil, nil, nil, err diff --git a/api/payments/orchestrator/internal/service/orchestrator/service_helpers_test.go b/api/payments/orchestrator/internal/service/orchestrator/service_helpers_test.go index d5efac9d..770152bc 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service_helpers_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service_helpers_test.go @@ -125,6 +125,38 @@ func TestResolvePaymentQuote_Expired(t *testing.T) { } } +func TestResolvePaymentQuote_NotExecutable(t *testing.T) { + org := bson.NewObjectID() + intent := &sharedv1.PaymentIntent{ + Ref: "ref-1", + Amount: &moneyv1.Money{Currency: "USD", Amount: "1"}, + SettlementCurrency: "USD", + } + record := &model.PaymentQuoteRecord{ + QuoteRef: "q1", + Intent: intentFromProto(intent), + Quote: &model.PaymentQuoteSnapshot{}, + ExecutionNote: "quote will not be executed: amount 1 USD below per-tx min limit 10", + ExpiresAt: time.Now().Add(time.Minute), + IdempotencyKey: "idem-1", + } + svc := &Service{ + storage: stubRepo{quotes: &helperQuotesStore{records: map[string]*model.PaymentQuoteRecord{"q1": record}}}, + clock: clockpkg.NewSystem(), + } + + _, _, _, err := svc.resolvePaymentQuote(context.Background(), quoteResolutionInput{ + OrgRef: org.Hex(), + OrgID: org, + Meta: &sharedv1.RequestMeta{OrganizationRef: org.Hex()}, + Intent: intent, + QuoteRef: "q1", + }) + if qerr, ok := err.(quoteResolutionError); !ok || qerr.code != "quote_not_executable" { + t.Fatalf("expected quote_not_executable, got %v", err) + } +} + func TestResolvePaymentQuote_QuoteRefUsesStoredIntent(t *testing.T) { org := bson.NewObjectID() intent := &sharedv1.PaymentIntent{ diff --git a/api/payments/quotation/internal/service/quotation/discovery_gateway_registry.go b/api/payments/quotation/internal/service/quotation/discovery_gateway_registry.go index 0a6fd864..129f0131 100644 --- a/api/payments/quotation/internal/service/quotation/discovery_gateway_registry.go +++ b/api/payments/quotation/internal/service/quotation/discovery_gateway_registry.go @@ -51,7 +51,7 @@ func (r *discoveryGatewayRegistry) List(_ context.Context) ([]*model.GatewayInst InvokeURI: strings.TrimSpace(entry.InvokeURI), Currencies: normalizeCurrencies(entry.Currencies), Capabilities: capabilitiesFromOps(entry.Operations), - Limits: limitsFromDiscovery(entry.Limits), + Limits: limitsFromDiscovery(entry.Limits, entry.CurrencyMeta), Version: entry.Version, IsEnabled: entry.Healthy, }) @@ -102,36 +102,111 @@ func capabilitiesFromOps(ops []string) model.RailCapabilities { return cap } -func limitsFromDiscovery(src *discovery.Limits) model.Limits { - if src == nil { - return model.Limits{} - } +func limitsFromDiscovery(src *discovery.Limits, currencies []discovery.CurrencyAnnouncement) model.Limits { limits := model.Limits{ - MinAmount: strings.TrimSpace(src.MinAmount), - MaxAmount: strings.TrimSpace(src.MaxAmount), - VolumeLimit: map[string]string{}, - VelocityLimit: map[string]int{}, + VolumeLimit: map[string]string{}, + VelocityLimit: map[string]int{}, + CurrencyLimits: map[string]model.LimitsOverride{}, } - for key, value := range src.VolumeLimit { - k := strings.TrimSpace(key) - v := strings.TrimSpace(value) - if k == "" || v == "" { - continue + if src != nil { + limits.MinAmount = strings.TrimSpace(src.MinAmount) + limits.MaxAmount = strings.TrimSpace(src.MaxAmount) + for key, value := range src.VolumeLimit { + k := strings.TrimSpace(key) + v := strings.TrimSpace(value) + if k == "" || v == "" { + continue + } + limits.VolumeLimit[k] = v } - limits.VolumeLimit[k] = v - } - for key, value := range src.VelocityLimit { - k := strings.TrimSpace(key) - if k == "" { - continue + for key, value := range src.VelocityLimit { + k := strings.TrimSpace(key) + if k == "" { + continue + } + limits.VelocityLimit[k] = value } - limits.VelocityLimit[k] = value } + applyCurrencyTransferLimits(&limits, currencies) if len(limits.VolumeLimit) == 0 { limits.VolumeLimit = nil } if len(limits.VelocityLimit) == 0 { limits.VelocityLimit = nil } + if len(limits.CurrencyLimits) == 0 { + limits.CurrencyLimits = nil + } return limits } + +func applyCurrencyTransferLimits(dst *model.Limits, currencies []discovery.CurrencyAnnouncement) { + if dst == nil || len(currencies) == 0 { + return + } + var ( + commonMin string + commonMax string + commonMinInit bool + commonMaxInit bool + commonMinConsistent = true + commonMaxConsistent = true + ) + + for _, currency := range currencies { + code := strings.ToUpper(strings.TrimSpace(currency.Currency)) + if code == "" || currency.Limits == nil || currency.Limits.Amount == nil { + commonMinConsistent = false + commonMaxConsistent = false + continue + } + min := strings.TrimSpace(currency.Limits.Amount.Min) + max := strings.TrimSpace(currency.Limits.Amount.Max) + + if min != "" || max != "" { + override := dst.CurrencyLimits[code] + if min != "" { + override.MinAmount = min + } + if max != "" { + override.MaxAmount = max + } + if override.MinAmount != "" || override.MaxAmount != "" || override.MaxFee != "" || override.MaxOps > 0 || override.MaxVolume != "" { + dst.CurrencyLimits[code] = override + } + } + + if min == "" { + commonMinConsistent = false + } else if !commonMinInit { + commonMin = min + commonMinInit = true + } else if commonMin != min { + commonMinConsistent = false + } + + if max == "" { + commonMaxConsistent = false + } else if !commonMaxInit { + commonMax = max + commonMaxInit = true + } else if commonMax != max { + commonMaxConsistent = false + } + } + + if commonMinInit && commonMinConsistent { + dst.PerTxMinAmount = firstLimitValue(dst.PerTxMinAmount, commonMin) + } + if commonMaxInit && commonMaxConsistent { + dst.PerTxMaxAmount = firstLimitValue(dst.PerTxMaxAmount, commonMax) + } +} + +func firstLimitValue(primary, fallback string) string { + primary = strings.TrimSpace(primary) + if primary != "" { + return primary + } + return strings.TrimSpace(fallback) +} diff --git a/api/payments/quotation/internal/service/quotation/discovery_gateway_registry_test.go b/api/payments/quotation/internal/service/quotation/discovery_gateway_registry_test.go new file mode 100644 index 00000000..91ff51e9 --- /dev/null +++ b/api/payments/quotation/internal/service/quotation/discovery_gateway_registry_test.go @@ -0,0 +1,62 @@ +package quotation + +import ( + "testing" + + "github.com/tech/sendico/pkg/discovery" +) + +func TestLimitsFromDiscovery_MapsPerTxMinimumFromCurrencyMeta(t *testing.T) { + limits := limitsFromDiscovery(nil, []discovery.CurrencyAnnouncement{ + { + Currency: "RUB", + Limits: &discovery.CurrencyLimits{ + Amount: &discovery.CurrencyAmount{ + Min: "100.00", + Max: "10000.00", + }, + }, + }, + }) + + if limits.PerTxMinAmount != "100.00" { + t.Fatalf("expected per tx min 100.00, got %q", limits.PerTxMinAmount) + } + if limits.PerTxMaxAmount != "10000.00" { + t.Fatalf("expected per tx max 10000.00, got %q", limits.PerTxMaxAmount) + } + override, ok := limits.CurrencyLimits["RUB"] + if !ok { + t.Fatalf("expected RUB currency override") + } + if override.MinAmount != "100.00" { + t.Fatalf("expected RUB min override 100.00, got %q", override.MinAmount) + } +} + +func TestLimitsFromDiscovery_DropsCommonPerTxMinimumWhenCurrenciesDiffer(t *testing.T) { + limits := limitsFromDiscovery(nil, []discovery.CurrencyAnnouncement{ + { + Currency: "USD", + Limits: &discovery.CurrencyLimits{ + Amount: &discovery.CurrencyAmount{Min: "10.00"}, + }, + }, + { + Currency: "EUR", + Limits: &discovery.CurrencyLimits{ + Amount: &discovery.CurrencyAmount{Min: "20.00"}, + }, + }, + }) + + if limits.PerTxMinAmount != "" { + t.Fatalf("expected empty common per tx min, got %q", limits.PerTxMinAmount) + } + if limits.CurrencyLimits["USD"].MinAmount != "10.00" { + t.Fatalf("expected USD min override 10.00, got %q", limits.CurrencyLimits["USD"].MinAmount) + } + if limits.CurrencyLimits["EUR"].MinAmount != "20.00" { + t.Fatalf("expected EUR min override 20.00, got %q", limits.CurrencyLimits["EUR"].MinAmount) + } +} diff --git a/api/payments/quotation/internal/service/quotation/handlers_commands.go b/api/payments/quotation/internal/service/quotation/handlers_commands.go index 7a1b8fab..86d3c5ce 100644 --- a/api/payments/quotation/internal/service/quotation/handlers_commands.go +++ b/api/payments/quotation/internal/service/quotation/handlers_commands.go @@ -43,6 +43,11 @@ type quoteCtx struct { hash string } +type quotePaymentResult struct { + quote *sharedv1.PaymentQuote + executionNote string +} + func (h *quotePaymentCommand) Execute( ctx context.Context, req *quotationv1.QuotePaymentRequest, @@ -65,14 +70,15 @@ func (h *quotePaymentCommand) Execute( return gsresponse.Unavailable[quotationv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err) } - quoteProto, err := h.quotePayment(ctx, quotesStore, qc, req) + result, err := h.quotePayment(ctx, quotesStore, qc, req) if err != nil { return h.mapQuoteErr(err) } return gsresponse.Success("ationv1.QuotePaymentResponse{ IdempotencyKey: req.GetIdempotencyKey(), - Quote: quoteProto, + Quote: result.quote, + ExecutionNote: result.executionNote, }) } @@ -111,7 +117,7 @@ func (h *quotePaymentCommand) quotePayment( quotesStore storage.QuotesStore, qc *quoteCtx, req *quotationv1.QuotePaymentRequest, -) (*sharedv1.PaymentQuote, error) { +) (*quotePaymentResult, error) { if qc.previewOnly { quote, _, err := h.engine.BuildPaymentQuote(ctx, qc.orgID, req) @@ -120,7 +126,7 @@ func (h *quotePaymentCommand) quotePayment( return nil, err } quote.QuoteRef = bson.NewObjectID().Hex() - return quote, nil + return "ePaymentResult{quote: quote}, nil } existing, err := quotesStore.GetByIdempotencyKey(ctx, qc.orgRef, qc.idempotencyKey) @@ -140,7 +146,10 @@ func (h *quotePaymentCommand) quotePayment( zap.String("idempotency_key", qc.idempotencyKey), zap.String("quote_ref", existing.QuoteRef), ) - return modelQuoteToProto(existing.Quote), nil + return "ePaymentResult{ + quote: modelQuoteToProto(existing.Quote), + executionNote: strings.TrimSpace(existing.ExecutionNote), + }, nil } quote, expiresAt, err := h.engine.BuildPaymentQuote(ctx, qc.orgID, req) @@ -157,17 +166,28 @@ func (h *quotePaymentCommand) quotePayment( quoteRef := bson.NewObjectID().Hex() quote.QuoteRef = quoteRef + executionNote := "" plan, err := h.engine.BuildPaymentPlan(ctx, qc.orgRef, qc.intent, qc.idempotencyKey, quote) if err != nil { - h.logger.Warn( - "Failed to build payment plan", - zap.Error(err), - mzap.ObjRef("org_ref", qc.orgRef), - zap.String("idempotency_key", qc.idempotencyKey), - ) - return nil, err + if errors.Is(err, merrors.ErrInvalidArg) { + executionNote = quoteNonExecutableNote(err) + h.logger.Info( + "Payment quote marked as non-executable", + mzap.ObjRef("org_ref", qc.orgRef), + zap.String("idempotency_key", qc.idempotencyKey), + zap.String("quote_ref", quoteRef), + zap.String("execution_note", executionNote), + ) + } else { + h.logger.Warn( + "Failed to build payment plan", + zap.Error(err), + mzap.ObjRef("org_ref", qc.orgRef), + zap.String("idempotency_key", qc.idempotencyKey), + ) + return nil, err + } } - record := &model.PaymentQuoteRecord{ QuoteRef: quoteRef, IdempotencyKey: qc.idempotencyKey, @@ -175,6 +195,7 @@ func (h *quotePaymentCommand) quotePayment( Intent: intentFromProto(qc.intent), Quote: quoteSnapshotToModel(quote), Plan: cloneStoredPaymentPlan(plan), + ExecutionNote: executionNote, ExpiresAt: expiresAt, } record.SetID(bson.NewObjectID()) @@ -187,7 +208,10 @@ func (h *quotePaymentCommand) quotePayment( if existing.Hash != qc.hash { return nil, errIdempotencyParamMismatch } - return modelQuoteToProto(existing.Quote), nil + return "ePaymentResult{ + quote: modelQuoteToProto(existing.Quote), + executionNote: strings.TrimSpace(existing.ExecutionNote), + }, nil } } return nil, err @@ -201,7 +225,10 @@ func (h *quotePaymentCommand) quotePayment( zap.String("kind", qc.intent.GetKind().String()), ) - return quote, nil + return "ePaymentResult{ + quote: quote, + executionNote: executionNote, + }, nil } func (h *quotePaymentCommand) mapQuoteErr(err error) gsresponse.Responder[quotationv1.QuotePaymentResponse] { @@ -213,6 +240,16 @@ func (h *quotePaymentCommand) mapQuoteErr(err error) gsresponse.Responder[quotat return gsresponse.Auto[quotationv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err) } +func quoteNonExecutableNote(err error) string { + reason := strings.TrimSpace(err.Error()) + reason = strings.TrimPrefix(reason, merrors.ErrInvalidArg.Error()+":") + reason = strings.TrimSpace(reason) + if reason == "" { + return "quote will not be executed" + } + return "quote will not be executed: " + reason +} + // TODO: temprorarary hashing function, replace with a proper solution later func hashQuoteRequest(req *quotationv1.QuotePaymentRequest) string { cloned := proto.Clone(req).(*quotationv1.QuotePaymentRequest) diff --git a/api/payments/quotation/internal/service/quotation/handlers_commands_test.go b/api/payments/quotation/internal/service/quotation/handlers_commands_test.go new file mode 100644 index 00000000..59bfb1fc --- /dev/null +++ b/api/payments/quotation/internal/service/quotation/handlers_commands_test.go @@ -0,0 +1,193 @@ +package quotation + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/tech/sendico/payments/storage" + "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/merrors" + mloggerfactory "github.com/tech/sendico/pkg/mlogger/factory" + moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" + quotationv1 "github.com/tech/sendico/pkg/proto/payments/quotation/v1" + sharedv1 "github.com/tech/sendico/pkg/proto/payments/shared/v1" + "go.mongodb.org/mongo-driver/v2/bson" +) + +func TestQuotePaymentStoresNonExecutableQuoteWhenPlanInvalid(t *testing.T) { + org := bson.NewObjectID() + req := "ationv1.QuotePaymentRequest{ + Meta: &sharedv1.RequestMeta{OrganizationRef: org.Hex()}, + IdempotencyKey: "idem-1", + Intent: &sharedv1.PaymentIntent{ + Kind: sharedv1.PaymentKind_PAYMENT_KIND_PAYOUT, + Amount: &moneyv1.Money{Currency: "USD", Amount: "1"}, + SettlementCurrency: "USD", + }, + } + + quotesStore := "eCommandTestQuotesStore{ + byID: make(map[string]*model.PaymentQuoteRecord), + } + engine := "eCommandTestEngine{ + repo: quoteCommandTestRepo{quotes: quotesStore}, + buildQuoteFn: func(context.Context, string, *quotationv1.QuotePaymentRequest) (*sharedv1.PaymentQuote, time.Time, error) { + return &sharedv1.PaymentQuote{ + DebitAmount: &moneyv1.Money{Currency: "USD", Amount: "1"}, + }, time.Now().Add(time.Hour), nil + }, + buildPlanFn: func(context.Context, bson.ObjectID, *sharedv1.PaymentIntent, string, *sharedv1.PaymentQuote) (*model.PaymentPlan, error) { + return nil, merrors.InvalidArgument("plan builder: no eligible gateway instance found, last error: gateway mntx eligibility check error: amount 1 USD below per-tx min limit 10") + }, + } + cmd := "ePaymentCommand{ + engine: engine, + logger: mloggerfactory.NewLogger(false), + } + + resp, err := cmd.Execute(context.Background(), req)(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp == nil || resp.GetQuote() == nil { + t.Fatalf("expected quote response, got %#v", resp) + } + if note := resp.GetExecutionNote(); !strings.Contains(note, "quote will not be executed") { + t.Fatalf("expected non-executable note, got %q", note) + } + + stored := quotesStore.byID[req.GetIdempotencyKey()] + if stored == nil { + t.Fatalf("expected stored quote record") + } + if stored.Plan != nil { + t.Fatalf("expected no stored payment plan for non-executable quote") + } + if stored.ExecutionNote != resp.GetExecutionNote() { + t.Fatalf("expected stored execution note %q, got %q", resp.GetExecutionNote(), stored.ExecutionNote) + } +} + +func TestQuotePaymentReuseReturnsStoredExecutionNote(t *testing.T) { + org := bson.NewObjectID() + req := "ationv1.QuotePaymentRequest{ + Meta: &sharedv1.RequestMeta{OrganizationRef: org.Hex()}, + IdempotencyKey: "idem-1", + Intent: &sharedv1.PaymentIntent{ + Kind: sharedv1.PaymentKind_PAYMENT_KIND_PAYOUT, + Amount: &moneyv1.Money{Currency: "USD", Amount: "1"}, + SettlementCurrency: "USD", + }, + } + + existing := &model.PaymentQuoteRecord{ + QuoteRef: "q1", + IdempotencyKey: req.GetIdempotencyKey(), + Hash: hashQuoteRequest(req), + Quote: &model.PaymentQuoteSnapshot{QuoteRef: "q1"}, + ExecutionNote: "quote will not be executed: amount 1 USD below per-tx min limit 10", + } + quotesStore := "eCommandTestQuotesStore{ + byID: map[string]*model.PaymentQuoteRecord{ + req.GetIdempotencyKey(): existing, + }, + } + engine := "eCommandTestEngine{ + repo: quoteCommandTestRepo{quotes: quotesStore}, + buildQuoteFn: func(context.Context, string, *quotationv1.QuotePaymentRequest) (*sharedv1.PaymentQuote, time.Time, error) { + t.Fatalf("build quote should not be called on idempotent reuse") + return nil, time.Time{}, nil + }, + buildPlanFn: func(context.Context, bson.ObjectID, *sharedv1.PaymentIntent, string, *sharedv1.PaymentQuote) (*model.PaymentPlan, error) { + t.Fatalf("build plan should not be called on idempotent reuse") + return nil, nil + }, + } + cmd := "ePaymentCommand{ + engine: engine, + logger: mloggerfactory.NewLogger(false), + } + + resp, err := cmd.Execute(context.Background(), req)(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if resp == nil { + t.Fatalf("expected response") + } + if got, want := resp.GetExecutionNote(), existing.ExecutionNote; got != want { + t.Fatalf("expected execution note %q, got %q", want, got) + } + if resp.GetQuote().GetQuoteRef() != "q1" { + t.Fatalf("expected quote_ref q1, got %q", resp.GetQuote().GetQuoteRef()) + } +} + +type quoteCommandTestEngine struct { + repo storage.Repository + ensureErr error + buildQuoteFn func(ctx context.Context, orgRef string, req *quotationv1.QuotePaymentRequest) (*sharedv1.PaymentQuote, time.Time, error) + buildPlanFn func(ctx context.Context, orgID bson.ObjectID, intent *sharedv1.PaymentIntent, idempotencyKey string, quote *sharedv1.PaymentQuote) (*model.PaymentPlan, error) +} + +func (e *quoteCommandTestEngine) EnsureRepository(context.Context) error { return e.ensureErr } + +func (e *quoteCommandTestEngine) BuildPaymentQuote(ctx context.Context, orgRef string, req *quotationv1.QuotePaymentRequest) (*sharedv1.PaymentQuote, time.Time, error) { + if e.buildQuoteFn == nil { + return nil, time.Time{}, nil + } + return e.buildQuoteFn(ctx, orgRef, req) +} + +func (e *quoteCommandTestEngine) BuildPaymentPlan(ctx context.Context, orgID bson.ObjectID, intent *sharedv1.PaymentIntent, idempotencyKey string, quote *sharedv1.PaymentQuote) (*model.PaymentPlan, error) { + if e.buildPlanFn == nil { + return nil, nil + } + return e.buildPlanFn(ctx, orgID, intent, idempotencyKey, quote) +} + +func (e *quoteCommandTestEngine) ResolvePaymentQuote(context.Context, quoteResolutionInput) (*sharedv1.PaymentQuote, *sharedv1.PaymentIntent, *model.PaymentPlan, error) { + return nil, nil, nil, nil +} + +func (e *quoteCommandTestEngine) Repository() storage.Repository { return e.repo } + +type quoteCommandTestRepo struct { + quotes storage.QuotesStore +} + +func (r quoteCommandTestRepo) Ping(context.Context) error { return nil } +func (r quoteCommandTestRepo) Payments() storage.PaymentsStore { return nil } +func (r quoteCommandTestRepo) Quotes() storage.QuotesStore { return r.quotes } +func (r quoteCommandTestRepo) Routes() storage.RoutesStore { return nil } +func (r quoteCommandTestRepo) PlanTemplates() storage.PlanTemplatesStore { return nil } + +type quoteCommandTestQuotesStore struct { + byID map[string]*model.PaymentQuoteRecord +} + +func (s *quoteCommandTestQuotesStore) Create(_ context.Context, rec *model.PaymentQuoteRecord) error { + if s.byID == nil { + s.byID = make(map[string]*model.PaymentQuoteRecord) + } + s.byID[rec.IdempotencyKey] = rec + return nil +} + +func (s *quoteCommandTestQuotesStore) GetByRef(_ context.Context, _ bson.ObjectID, quoteRef string) (*model.PaymentQuoteRecord, error) { + for _, rec := range s.byID { + if rec != nil && rec.QuoteRef == quoteRef { + return rec, nil + } + } + return nil, storage.ErrQuoteNotFound +} + +func (s *quoteCommandTestQuotesStore) GetByIdempotencyKey(_ context.Context, _ bson.ObjectID, idempotencyKey string) (*model.PaymentQuoteRecord, error) { + if rec, ok := s.byID[idempotencyKey]; ok { + return rec, nil + } + return nil, storage.ErrQuoteNotFound +} diff --git a/api/payments/quotation/internal/service/quotation/service_helpers.go b/api/payments/quotation/internal/service/quotation/service_helpers.go index eefde7fa..b738a813 100644 --- a/api/payments/quotation/internal/service/quotation/service_helpers.go +++ b/api/payments/quotation/internal/service/quotation/service_helpers.go @@ -85,6 +85,9 @@ func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInp if !record.ExpiresAt.IsZero() && s.clock.Now().After(record.ExpiresAt) { return nil, nil, nil, quoteResolutionError{code: "quote_expired", err: merrors.InvalidArgument("quote_ref expired")} } + if note := strings.TrimSpace(record.ExecutionNote); note != "" { + return nil, nil, nil, quoteResolutionError{code: "quote_not_executable", err: merrors.InvalidArgument(note)} + } intent, err := recordIntentFromQuote(record) if err != nil { return nil, nil, nil, err diff --git a/api/payments/storage/model/quote.go b/api/payments/storage/model/quote.go index 73f19e1f..43222af2 100644 --- a/api/payments/storage/model/quote.go +++ b/api/payments/storage/model/quote.go @@ -20,6 +20,7 @@ type PaymentQuoteRecord struct { Quotes []*PaymentQuoteSnapshot `bson:"quotes,omitempty" json:"quotes,omitempty"` Plan *PaymentPlan `bson:"plan,omitempty" json:"plan,omitempty"` Plans []*PaymentPlan `bson:"plans,omitempty" json:"plans,omitempty"` + ExecutionNote string `bson:"executionNote,omitempty" json:"executionNote,omitempty"` ExpiresAt time.Time `bson:"expiresAt" json:"expiresAt"` PurgeAt time.Time `bson:"purgeAt,omitempty" json:"purgeAt,omitempty"` Hash string `bson:"hash" json:"hash"` diff --git a/api/payments/storage/quote/mongo/store/quotes.go b/api/payments/storage/quote/mongo/store/quotes.go index 9312d747..a4f4ec12 100644 --- a/api/payments/storage/quote/mongo/store/quotes.go +++ b/api/payments/storage/quote/mongo/store/quotes.go @@ -90,6 +90,7 @@ func (q *Quotes) Create(ctx context.Context, quote *model.PaymentQuoteRecord) er if quote.IdempotencyKey == "" { return merrors.InvalidArgument("quotesStore: idempotency key is required") } + quote.ExecutionNote = strings.TrimSpace(quote.ExecutionNote) if quote.ExpiresAt.IsZero() { return merrors.InvalidArgument("quotesStore: expires_at is required") } diff --git a/api/pkg/discovery/logging.go b/api/pkg/discovery/logging.go index f750532a..0fa8396e 100644 --- a/api/pkg/discovery/logging.go +++ b/api/pkg/discovery/logging.go @@ -21,8 +21,8 @@ func announcementFields(announce Announcement) []zap.Field { if announce.Rail != "" { fields = append(fields, zap.String("rail", announce.Rail)) } - if announce.Network != "" { - fields = append(fields, zap.String("network", announce.Network)) + if network := legacyNetworkFromCurrencies(announce.Currencies); network != "" { + fields = append(fields, zap.String("network", network)) } if announce.InvokeURI != "" { fields = append(fields, zap.String("invoke_uri", announce.InvokeURI)) diff --git a/api/pkg/discovery/lookup.go b/api/pkg/discovery/lookup.go index 879e70f8..2094759b 100644 --- a/api/pkg/discovery/lookup.go +++ b/api/pkg/discovery/lookup.go @@ -23,17 +23,18 @@ type ServiceSummary struct { } type GatewaySummary struct { - ID string `json:"id"` - InstanceID string `json:"instanceId"` - Rail string `json:"rail"` - Network string `json:"network,omitempty"` - Currencies []string `json:"currencies,omitempty"` - Ops []string `json:"ops,omitempty"` - Limits *Limits `json:"limits,omitempty"` - Version string `json:"version,omitempty"` - Healthy bool `json:"healthy,omitempty"` - RoutingPriority int `json:"routingPriority,omitempty"` - InvokeURI string `json:"invokeURI,omitempty"` + ID string `json:"id"` + InstanceID string `json:"instanceId"` + Rail string `json:"rail"` + Network string `json:"network,omitempty"` + Currencies []string `json:"currencies,omitempty"` + CurrencyMeta []CurrencyAnnouncement `json:"currencyMeta,omitempty"` + Ops []string `json:"ops,omitempty"` + Limits *Limits `json:"limits,omitempty"` + Version string `json:"version,omitempty"` + Healthy bool `json:"healthy,omitempty"` + RoutingPriority int `json:"routingPriority,omitempty"` + InvokeURI string `json:"invokeURI,omitempty"` } func (r *Registry) Lookup(now time.Time) LookupResponse { @@ -51,6 +52,7 @@ func (r *Registry) Lookup(now time.Time) LookupResponse { Rail: entry.Rail, Network: entry.Network, Currencies: cloneStrings(entry.Currencies), + CurrencyMeta: cloneCurrencyAnnouncements(entry.CurrencyMeta), Ops: cloneStrings(entry.Operations), Limits: cloneLimits(entry.Limits), Version: entry.Version, diff --git a/api/pkg/discovery/registry.go b/api/pkg/discovery/registry.go index 4f6c0cdd..e4e60281 100644 --- a/api/pkg/discovery/registry.go +++ b/api/pkg/discovery/registry.go @@ -1,6 +1,7 @@ package discovery import ( + "fmt" "strings" "sync" "time" @@ -12,21 +13,22 @@ const ( ) type RegistryEntry struct { - ID string `json:"id"` - InstanceID string `bson:"instanceId" json:"instanceId"` - Service string `json:"service"` - Rail string `json:"rail,omitempty"` - Network string `json:"network,omitempty"` - Operations []string `json:"operations,omitempty"` - Currencies []string `json:"currencies,omitempty"` - Limits *Limits `json:"limits,omitempty"` - InvokeURI string `json:"invokeURI,omitempty"` - RoutingPriority int `json:"routingPriority,omitempty"` - Version string `json:"version,omitempty"` - Health HealthParams `json:"health,omitempty"` - LastHeartbeat time.Time `json:"lastHeartbeat,omitempty"` - Status string `json:"status,omitempty"` - Healthy bool `json:"healthy,omitempty"` + ID string `json:"id"` + InstanceID string `bson:"instanceId" json:"instanceId"` + Service string `json:"service"` + Rail string `json:"rail,omitempty"` + Network string `json:"network,omitempty"` + Operations []string `json:"operations,omitempty"` + Currencies []string `json:"currencies,omitempty"` + CurrencyMeta []CurrencyAnnouncement `json:"currencyMeta,omitempty"` + Limits *Limits `json:"limits,omitempty"` + InvokeURI string `json:"invokeURI,omitempty"` + RoutingPriority int `json:"routingPriority,omitempty"` + Version string `json:"version,omitempty"` + Health HealthParams `json:"health,omitempty"` + LastHeartbeat time.Time `json:"lastHeartbeat,omitempty"` + Status string `json:"status,omitempty"` + Healthy bool `json:"healthy,omitempty"` } type Registry struct { @@ -200,15 +202,17 @@ func (r *Registry) List(now time.Time, onlyHealthy bool) []RegistryEntry { func registryEntryFromAnnouncement(announce Announcement, now time.Time) RegistryEntry { status := "ok" + currencies := cloneCurrencyAnnouncements(announce.Currencies) return RegistryEntry{ ID: strings.TrimSpace(announce.ID), InstanceID: strings.TrimSpace(announce.InstanceID), Service: strings.TrimSpace(announce.Service), Rail: strings.ToUpper(strings.TrimSpace(announce.Rail)), - Network: strings.ToUpper(strings.TrimSpace(announce.Network)), + Network: legacyNetworkFromCurrencies(currencies), Operations: cloneStrings(announce.Operations), - Currencies: cloneStrings(announce.Currencies), - Limits: cloneLimits(announce.Limits), + Currencies: legacyCurrencyCodes(currencies), + CurrencyMeta: currencies, + Limits: legacyLimitsFromCurrencies(currencies), InvokeURI: strings.TrimSpace(announce.InvokeURI), RoutingPriority: announce.RoutingPriority, Version: strings.TrimSpace(announce.Version), @@ -228,12 +232,21 @@ func normalizeEntry(entry RegistryEntry) RegistryEntry { entry.Rail = strings.ToUpper(strings.TrimSpace(entry.Rail)) entry.Network = strings.ToUpper(strings.TrimSpace(entry.Network)) entry.Operations = normalizeStrings(entry.Operations, false) - entry.Currencies = normalizeStrings(entry.Currencies, true) + entry.CurrencyMeta = normalizeCurrencyAnnouncements(entry.CurrencyMeta) + if len(entry.CurrencyMeta) > 0 { + entry.Currencies = legacyCurrencyCodes(entry.CurrencyMeta) + if derivedNetwork := legacyNetworkFromCurrencies(entry.CurrencyMeta); derivedNetwork != "" { + entry.Network = derivedNetwork + } + entry.Limits = legacyLimitsFromCurrencies(entry.CurrencyMeta) + } else { + entry.Currencies = normalizeStrings(entry.Currencies, true) + } entry.InvokeURI = strings.TrimSpace(entry.InvokeURI) entry.Version = strings.TrimSpace(entry.Version) entry.Status = strings.TrimSpace(entry.Status) entry.Health = normalizeHealth(entry.Health) - if entry.Limits != nil { + if len(entry.CurrencyMeta) == 0 && entry.Limits != nil { entry.Limits = normalizeLimits(*entry.Limits) } return entry @@ -247,15 +260,11 @@ func normalizeAnnouncement(announce Announcement) Announcement { } announce.Service = strings.TrimSpace(announce.Service) announce.Rail = strings.ToUpper(strings.TrimSpace(announce.Rail)) - announce.Network = strings.ToUpper(strings.TrimSpace(announce.Network)) announce.Operations = normalizeStrings(announce.Operations, false) - announce.Currencies = normalizeStrings(announce.Currencies, true) + announce.Currencies = normalizeCurrencyAnnouncements(announce.Currencies) announce.InvokeURI = strings.TrimSpace(announce.InvokeURI) announce.Version = strings.TrimSpace(announce.Version) announce.Health = normalizeHealth(announce.Health) - if announce.Limits != nil { - announce.Limits = normalizeLimits(*announce.Limits) - } return announce } @@ -309,6 +318,364 @@ func cloneLimits(src *Limits) *Limits { return &dst } +func normalizeCurrencyAnnouncements(values []CurrencyAnnouncement) []CurrencyAnnouncement { + if len(values) == 0 { + return nil + } + seen := map[string]bool{} + result := make([]CurrencyAnnouncement, 0, len(values)) + for _, value := range values { + clean, ok := normalizeCurrencyAnnouncement(value) + if !ok { + continue + } + key := strings.Join([]string{ + clean.Currency, + clean.Network, + clean.ProviderID, + clean.ContractAddress, + }, "|") + if seen[key] { + continue + } + seen[key] = true + result = append(result, clean) + } + if len(result) == 0 { + return nil + } + return result +} + +func normalizeCurrencyAnnouncement(src CurrencyAnnouncement) (CurrencyAnnouncement, bool) { + src.Currency = strings.ToUpper(strings.TrimSpace(src.Currency)) + if src.Currency == "" { + return CurrencyAnnouncement{}, false + } + src.Network = strings.ToUpper(strings.TrimSpace(src.Network)) + src.ProviderID = strings.TrimSpace(src.ProviderID) + src.ContractAddress = strings.ToLower(strings.TrimSpace(src.ContractAddress)) + if src.Decimals != nil && *src.Decimals < 0 { + src.Decimals = nil + } + src.Limits = normalizeCurrencyLimits(src.Limits) + return src, true +} + +func normalizeCurrencyLimits(src *CurrencyLimits) *CurrencyLimits { + if src == nil { + return nil + } + dst := &CurrencyLimits{} + if src.Amount != nil { + amount := &CurrencyAmount{ + Min: strings.TrimSpace(src.Amount.Min), + Max: strings.TrimSpace(src.Amount.Max), + } + if amount.Min != "" || amount.Max != "" { + dst.Amount = amount + } + } + if src.Running != nil { + running := &CurrencyRunningLimits{} + for _, limit := range src.Running.Volume { + max := strings.TrimSpace(limit.Max) + if max == "" { + continue + } + window := normalizeWindow(limit.Window) + if legacyWindowKey(window) == "" { + continue + } + running.Volume = append(running.Volume, VolumeLimit{ + Window: window, + Max: max, + }) + } + for _, limit := range src.Running.Velocity { + if limit.Max <= 0 { + continue + } + window := normalizeWindow(limit.Window) + if legacyWindowKey(window) == "" { + continue + } + running.Velocity = append(running.Velocity, VelocityLimit{ + Window: window, + Max: limit.Max, + }) + } + if len(running.Volume) > 0 || len(running.Velocity) > 0 { + dst.Running = running + } + } + if dst.Amount == nil && dst.Running == nil { + return nil + } + return dst +} + +func normalizeWindow(src Window) Window { + src.Raw = strings.TrimSpace(src.Raw) + src.Duration = strings.TrimSpace(src.Duration) + src.Named = strings.TrimSpace(src.Named) + if src.Calendar != nil { + cal := &CalendarWindow{ + Unit: CalendarUnit(strings.ToLower(strings.TrimSpace(string(src.Calendar.Unit)))), + Count: src.Calendar.Count, + } + if cal.Count <= 0 { + cal.Count = 1 + } + if cal.Unit == CalendarUnitUnspecified { + cal = nil + } + src.Calendar = cal + } + return src +} + +func cloneCurrencyAnnouncements(values []CurrencyAnnouncement) []CurrencyAnnouncement { + if len(values) == 0 { + return nil + } + out := make([]CurrencyAnnouncement, 0, len(values)) + for _, value := range values { + cp := CurrencyAnnouncement{ + Currency: value.Currency, + Network: value.Network, + ProviderID: value.ProviderID, + ContractAddress: value.ContractAddress, + } + if value.Decimals != nil { + decimals := *value.Decimals + cp.Decimals = &decimals + } + cp.Limits = cloneCurrencyLimits(value.Limits) + out = append(out, cp) + } + return out +} + +func cloneCurrencyLimits(src *CurrencyLimits) *CurrencyLimits { + if src == nil { + return nil + } + dst := &CurrencyLimits{} + if src.Amount != nil { + dst.Amount = &CurrencyAmount{ + Min: src.Amount.Min, + Max: src.Amount.Max, + } + } + if src.Running != nil { + running := &CurrencyRunningLimits{} + if len(src.Running.Volume) > 0 { + running.Volume = make([]VolumeLimit, 0, len(src.Running.Volume)) + for _, item := range src.Running.Volume { + running.Volume = append(running.Volume, VolumeLimit{ + Window: cloneWindow(item.Window), + Max: item.Max, + }) + } + } + if len(src.Running.Velocity) > 0 { + running.Velocity = make([]VelocityLimit, 0, len(src.Running.Velocity)) + for _, item := range src.Running.Velocity { + running.Velocity = append(running.Velocity, VelocityLimit{ + Window: cloneWindow(item.Window), + Max: item.Max, + }) + } + } + if len(running.Volume) > 0 || len(running.Velocity) > 0 { + dst.Running = running + } + } + if dst.Amount == nil && dst.Running == nil { + return nil + } + return dst +} + +func cloneWindow(src Window) Window { + dst := Window{ + Raw: src.Raw, + Duration: src.Duration, + Named: src.Named, + } + if src.Calendar != nil { + dst.Calendar = &CalendarWindow{ + Unit: src.Calendar.Unit, + Count: src.Calendar.Count, + } + } + return dst +} + +func legacyCurrencyCodes(values []CurrencyAnnouncement) []string { + if len(values) == 0 { + return nil + } + seen := map[string]bool{} + out := make([]string, 0, len(values)) + for _, value := range values { + currency := strings.ToUpper(strings.TrimSpace(value.Currency)) + if currency == "" || seen[currency] { + continue + } + seen[currency] = true + out = append(out, currency) + } + if len(out) == 0 { + return nil + } + return out +} + +func legacyNetworkFromCurrencies(values []CurrencyAnnouncement) string { + if len(values) == 0 { + return "" + } + network := "" + for _, value := range values { + current := strings.ToUpper(strings.TrimSpace(value.Network)) + if current == "" { + continue + } + if network == "" { + network = current + continue + } + if network != current { + return "" + } + } + return network +} + +func legacyLimitsFromCurrencies(values []CurrencyAnnouncement) *Limits { + if len(values) == 0 { + return nil + } + var merged *Limits + for _, value := range values { + current := legacyLimitsFromCurrency(value.Limits) + if current == nil { + continue + } + if merged == nil { + merged = current + continue + } + if !strings.EqualFold(strings.TrimSpace(merged.MinAmount), strings.TrimSpace(current.MinAmount)) { + merged.MinAmount = "" + } + if !strings.EqualFold(strings.TrimSpace(merged.MaxAmount), strings.TrimSpace(current.MaxAmount)) { + merged.MaxAmount = "" + } + merged.VolumeLimit = intersectStringMaps(merged.VolumeLimit, current.VolumeLimit) + merged.VelocityLimit = intersectIntMaps(merged.VelocityLimit, current.VelocityLimit) + } + if merged == nil { + return nil + } + if merged.MinAmount == "" && merged.MaxAmount == "" && len(merged.VolumeLimit) == 0 && len(merged.VelocityLimit) == 0 { + return nil + } + return merged +} + +func legacyLimitsFromCurrency(src *CurrencyLimits) *Limits { + if src == nil { + return nil + } + out := &Limits{} + if src.Amount != nil { + out.MinAmount = strings.TrimSpace(src.Amount.Min) + out.MaxAmount = strings.TrimSpace(src.Amount.Max) + } + if src.Running != nil { + if len(src.Running.Volume) > 0 { + out.VolumeLimit = map[string]string{} + for _, item := range src.Running.Volume { + key := legacyWindowKey(item.Window) + max := strings.TrimSpace(item.Max) + if key == "" || max == "" { + continue + } + out.VolumeLimit[key] = max + } + } + if len(src.Running.Velocity) > 0 { + out.VelocityLimit = map[string]int{} + for _, item := range src.Running.Velocity { + key := legacyWindowKey(item.Window) + if key == "" || item.Max <= 0 { + continue + } + out.VelocityLimit[key] = item.Max + } + } + } + if out.MinAmount == "" && out.MaxAmount == "" && len(out.VolumeLimit) == 0 && len(out.VelocityLimit) == 0 { + return nil + } + return out +} + +func intersectStringMaps(left, right map[string]string) map[string]string { + if len(left) == 0 || len(right) == 0 { + return nil + } + out := map[string]string{} + for key, value := range left { + if rightValue, ok := right[key]; ok && strings.EqualFold(strings.TrimSpace(value), strings.TrimSpace(rightValue)) { + out[key] = value + } + } + if len(out) == 0 { + return nil + } + return out +} + +func intersectIntMaps(left, right map[string]int) map[string]int { + if len(left) == 0 || len(right) == 0 { + return nil + } + out := map[string]int{} + for key, value := range left { + if rightValue, ok := right[key]; ok && value == rightValue { + out[key] = value + } + } + if len(out) == 0 { + return nil + } + return out +} + +func legacyWindowKey(window Window) string { + if raw := strings.TrimSpace(window.Raw); raw != "" { + return raw + } + if named := strings.TrimSpace(window.Named); named != "" { + return named + } + if duration := strings.TrimSpace(window.Duration); duration != "" { + return duration + } + if window.Calendar != nil && window.Calendar.Unit != CalendarUnitUnspecified { + count := window.Calendar.Count + if count <= 0 { + count = 1 + } + return fmt.Sprintf("%d%s", count, strings.ToLower(strings.TrimSpace(string(window.Calendar.Unit)))) + } + return "" +} + func normalizeStrings(values []string, upper bool) []string { if len(values) == 0 { return nil diff --git a/api/pkg/discovery/types.go b/api/pkg/discovery/types.go index cb2fde71..1f904d63 100644 --- a/api/pkg/discovery/types.go +++ b/api/pkg/discovery/types.go @@ -5,6 +5,78 @@ type HealthParams struct { TimeoutSec int `json:"timeoutSec"` } +// CurrencyAmount defines per-transfer min/max for a unit (fiat currency or crypto asset). +// Values are decimals in string form to avoid float issues. +type CurrencyAmount struct { + Min string `json:"min,omitempty"` + Max string `json:"max,omitempty"` +} + +// CalendarUnit is used for calendar-aligned windows (e.g. 1 calendar month). +type CalendarUnit string + +const ( + CalendarUnitUnspecified CalendarUnit = "" + CalendarUnitHour CalendarUnit = "hour" + CalendarUnitDay CalendarUnit = "day" + CalendarUnitWeek CalendarUnit = "week" + CalendarUnitMonth CalendarUnit = "month" +) + +// CalendarWindow represents a calendar-aligned window. +// Example: {unit:"month", count:1} means "1 calendar month". +type CalendarWindow struct { + Unit CalendarUnit `json:"unit"` + Count int `json:"count"` +} + +// Window is a typed description of a running-limit window. +// Gateways/providers can express windows differently, so we keep: +// - Raw: original provider token/string (optional but recommended) +// and one of the normalized forms: +// - Duration: fixed window expressed as a Go-duration string (e.g. "24h", "168h") +// - Calendar: calendar-aligned window (e.g. 1 month) +// - Named: opaque provider token when you do not want/cannot normalize (e.g. "daily") +type Window struct { + // Raw may be set alongside the normalized form. + // Normalized form: exactly one of Duration/Calendar/Named SHOULD be set. + Raw string `json:"raw,omitempty"` + + // Duration uses Go time.Duration format and MUST NOT use "d". + // Use "24h", "168h", "720h", etc. + Duration string `json:"duration,omitempty"` // e.g. "24h" + Calendar *CalendarWindow `json:"calendar,omitempty"` // e.g. month x1 + Named string `json:"named,omitempty"` // e.g. "daily" +} + +// VolumeLimit is the max total amount allowed within a window. +// Amount is a decimal string in the same unit as the announcement (currency/asset). +type VolumeLimit struct { + Window Window `json:"window"` + Max string `json:"max"` +} + +// VelocityLimit is the max number of operations allowed within a window. +type VelocityLimit struct { + Window Window `json:"window"` + Max int `json:"max"` +} + +// CurrencyRunningLimits groups windowed limits. +// Slices are used instead of maps to avoid relying on untyped map keys. +type CurrencyRunningLimits struct { + Volume []VolumeLimit `json:"volume,omitempty"` + Velocity []VelocityLimit `json:"velocity,omitempty"` +} + +// CurrencyLimits combines per-transfer and running limits for a unit. +type CurrencyLimits struct { + Amount *CurrencyAmount `json:"amount,omitempty"` + Running *CurrencyRunningLimits `json:"running,omitempty"` +} + +// Limits is a legacy flat limits shape used by lookup summaries. +// Deprecated: use CurrencyAnnouncement.Limits for announcement payloads. type Limits struct { MinAmount string `json:"minAmount,omitempty"` MaxAmount string `json:"maxAmount,omitempty"` @@ -12,19 +84,33 @@ type Limits struct { VelocityLimit map[string]int `json:"velocityLimit,omitempty"` } +// CurrencyAnnouncement declares limits for a unit. +// For fiat: Currency is ISO 4217 ("EUR", "USD"). +// For crypto: Currency is the ticker ("USDT", "BTC") and you SHOULD also fill Network, +// and for tokens ContractAddress + Decimals. +type CurrencyAnnouncement struct { + // Unit identity + Currency string `json:"currency"` // fiat code or crypto ticker + Network string `json:"network,omitempty"` // crypto chain or fiat rail, e.g. "tron", "ethereum", "sepa", "swift", "visa" + ProviderID string `json:"providerId,omitempty"` // specific provider identifier providing currency over network/rail (e.g. "binance", "circle") + ContractAddress string `json:"contractAddress,omitempty"` // for tokens only + Decimals *int `json:"decimals,omitempty"` // for crypto/token precision + + // Limits for this unit + Limits *CurrencyLimits `json:"limits,omitempty"` +} + type Announcement struct { - ID string `json:"id"` - InstanceID string `bson:"instanceId" json:"instanceId"` - Service string `json:"service"` - Rail string `json:"rail,omitempty"` - Network string `json:"network,omitempty"` - Operations []string `json:"operations,omitempty"` - Currencies []string `json:"currencies,omitempty"` - Limits *Limits `json:"limits,omitempty"` - InvokeURI string `json:"invokeURI,omitempty"` - RoutingPriority int `json:"routingPriority,omitempty"` - Version string `json:"version,omitempty"` - Health HealthParams `json:"health,omitempty"` + ID string `json:"id"` + InstanceID string `bson:"instanceId" json:"instanceId"` + Service string `json:"service"` + Rail string `json:"rail,omitempty"` + Operations []string `json:"operations,omitempty"` + Currencies []CurrencyAnnouncement `json:"currencies,omitempty"` + InvokeURI string `json:"invokeURI,omitempty"` + RoutingPriority int `json:"routingPriority,omitempty"` + Version string `json:"version,omitempty"` + Health HealthParams `json:"health,omitempty"` } type Heartbeat struct { diff --git a/api/proto/payments/quotation/v1/quotation.proto b/api/proto/payments/quotation/v1/quotation.proto index b733ca6d..093eebc0 100644 --- a/api/proto/payments/quotation/v1/quotation.proto +++ b/api/proto/payments/quotation/v1/quotation.proto @@ -16,6 +16,8 @@ message QuotePaymentRequest { message QuotePaymentResponse { payments.shared.v1.PaymentQuote quote = 1; string idempotency_key = 2; + // Non-empty when quote is valid for pricing but cannot be executed. + string execution_note = 3; } message QuotePaymentsRequest {