added payment orchestrator optimizer tweaks #716
2
.gitignore
vendored
2
.gitignore
vendored
@@ -14,7 +14,9 @@ test.sh
|
|||||||
.gocache/
|
.gocache/
|
||||||
.golangci-cache/
|
.golangci-cache/
|
||||||
.cache/
|
.cache/
|
||||||
|
|
||||||
.claude/
|
.claude/
|
||||||
|
.codex/
|
||||||
|
|
||||||
# Air hot reload build artifacts
|
# Air hot reload build artifacts
|
||||||
**/tmp/
|
**/tmp/
|
||||||
|
|||||||
@@ -44,4 +44,17 @@ card_gateways:
|
|||||||
funding_address: "TUaWaCkiXwYPKm5qjcB27Lhwv976vPvedE"
|
funding_address: "TUaWaCkiXwYPKm5qjcB27Lhwv976vPvedE"
|
||||||
fee_wallet_ref: "697a062a248dc785125ccb9e"
|
fee_wallet_ref: "697a062a248dc785125ccb9e"
|
||||||
|
|
||||||
|
# Batch optimizer settings:
|
||||||
|
# - default_mode disables aggregation for unmatched traffic
|
||||||
|
# - crypto rule enables destination aggregation for crypto rail
|
||||||
|
optimizer:
|
||||||
|
aggregation:
|
||||||
|
default_mode: "no_optimization"
|
||||||
|
rules:
|
||||||
|
- id: "crypto_by_destination"
|
||||||
|
priority: 100
|
||||||
|
mode: "merge_by_destination"
|
||||||
|
match:
|
||||||
|
rail: "CRYPTO"
|
||||||
|
|
||||||
# Gateway instances and capabilities are sourced from service discovery.
|
# Gateway instances and capabilities are sourced from service discovery.
|
||||||
|
|||||||
@@ -44,4 +44,17 @@ card_gateways:
|
|||||||
funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF"
|
funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF"
|
||||||
fee_wallet_ref: "694c124ed76f9f811ac57133"
|
fee_wallet_ref: "694c124ed76f9f811ac57133"
|
||||||
|
|
||||||
|
# Batch optimizer settings:
|
||||||
|
# - default_mode disables aggregation for unmatched traffic
|
||||||
|
# - crypto rule enables destination aggregation for crypto rail
|
||||||
|
optimizer:
|
||||||
|
aggregation:
|
||||||
|
default_mode: "no_optimization"
|
||||||
|
rules:
|
||||||
|
- id: "crypto_by_destination"
|
||||||
|
priority: 100
|
||||||
|
mode: "merge_by_destination"
|
||||||
|
match:
|
||||||
|
rail: "CRYPTO"
|
||||||
|
|
||||||
# Gateway instances and capabilities are sourced from service discovery.
|
# Gateway instances and capabilities are sourced from service discovery.
|
||||||
|
|||||||
@@ -3,7 +3,9 @@ package serverimp
|
|||||||
import (
|
import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
|
||||||
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
|
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
|
||||||
|
"github.com/tech/sendico/payments/storage/model"
|
||||||
"github.com/tech/sendico/pkg/discovery"
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
"github.com/tech/sendico/pkg/mlogger"
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
)
|
)
|
||||||
@@ -42,3 +44,52 @@ func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, re
|
|||||||
}
|
}
|
||||||
return orchestrator.NewDiscoveryGatewayRegistry(logger, registry)
|
return orchestrator.NewDiscoveryGatewayRegistry(logger, registry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func buildBatchOptimizationPolicy(cfg optimizerConfig) psvc.BatchOptimizationPolicy {
|
||||||
|
rules := make([]psvc.BatchOptimizationRule, 0, len(cfg.Aggregation.Rules))
|
||||||
|
for i := range cfg.Aggregation.Rules {
|
||||||
|
ruleCfg := cfg.Aggregation.Rules[i]
|
||||||
|
rule := psvc.BatchOptimizationRule{
|
||||||
|
ID: strings.TrimSpace(ruleCfg.ID),
|
||||||
|
Enabled: ruleCfg.Enabled,
|
||||||
|
Priority: ruleCfg.Priority,
|
||||||
|
Mode: psvc.BatchOptimizationMode(strings.TrimSpace(ruleCfg.Mode)),
|
||||||
|
Match: psvc.BatchOptimizationMatch{
|
||||||
|
Rail: model.ParseRail(ruleCfg.Match.Rail),
|
||||||
|
Providers: cloneTrimmedSlice(ruleCfg.Match.Providers),
|
||||||
|
Networks: cloneTrimmedSlice(ruleCfg.Match.Networks),
|
||||||
|
Currencies: cloneTrimmedSlice(ruleCfg.Match.Currencies),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if ruleCfg.Match.Amount != nil {
|
||||||
|
rule.Match.Amount = &psvc.BatchOptimizationAmountRange{
|
||||||
|
Min: strings.TrimSpace(ruleCfg.Match.Amount.Min),
|
||||||
|
Max: strings.TrimSpace(ruleCfg.Match.Amount.Max),
|
||||||
|
Currency: strings.TrimSpace(ruleCfg.Match.Amount.Currency),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rules = append(rules, rule)
|
||||||
|
}
|
||||||
|
return psvc.BatchOptimizationPolicy{
|
||||||
|
DefaultMode: psvc.BatchOptimizationMode(strings.TrimSpace(cfg.Aggregation.DefaultMode)),
|
||||||
|
Rules: rules,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func cloneTrimmedSlice(values []string) []string {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := make([]string, 0, len(values))
|
||||||
|
for i := range values {
|
||||||
|
token := strings.TrimSpace(values[i])
|
||||||
|
if token == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
out = append(out, token)
|
||||||
|
}
|
||||||
|
if len(out) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|||||||
@@ -15,9 +15,41 @@ type config struct {
|
|||||||
*grpcapp.Config `yaml:",inline"`
|
*grpcapp.Config `yaml:",inline"`
|
||||||
CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"`
|
CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"`
|
||||||
GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"`
|
GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"`
|
||||||
|
Optimizer optimizerConfig `yaml:"optimizer"`
|
||||||
QuoteRetentionHrs int `yaml:"quote_retention_hours"`
|
QuoteRetentionHrs int `yaml:"quote_retention_hours"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type optimizerConfig struct {
|
||||||
|
Aggregation aggregationConfig `yaml:"aggregation"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type aggregationConfig struct {
|
||||||
|
DefaultMode string `yaml:"default_mode"`
|
||||||
|
Rules []aggregationRuleConfig `yaml:"rules"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type aggregationRuleConfig struct {
|
||||||
|
ID string `yaml:"id"`
|
||||||
|
Enabled *bool `yaml:"enabled"`
|
||||||
|
Priority int `yaml:"priority"`
|
||||||
|
Mode string `yaml:"mode"`
|
||||||
|
Match aggregationMatchConfig `yaml:"match"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type aggregationMatchConfig struct {
|
||||||
|
Rail string `yaml:"rail"`
|
||||||
|
Providers []string `yaml:"providers"`
|
||||||
|
Networks []string `yaml:"networks"`
|
||||||
|
Currencies []string `yaml:"currencies"`
|
||||||
|
Amount *aggregationAmountConfig `yaml:"amount"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type aggregationAmountConfig struct {
|
||||||
|
Min string `yaml:"min"`
|
||||||
|
Max string `yaml:"max"`
|
||||||
|
Currency string `yaml:"currency"`
|
||||||
|
}
|
||||||
|
|
||||||
type cardGatewayRouteConfig struct {
|
type cardGatewayRouteConfig struct {
|
||||||
FundingAddress string `yaml:"funding_address"`
|
FundingAddress string `yaml:"funding_address"`
|
||||||
FeeAddress string `yaml:"fee_address"`
|
FeeAddress string `yaml:"fee_address"`
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest
|
|||||||
if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 {
|
if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 {
|
||||||
opts = append(opts, orchestrator.WithCardGatewayRoutes(routes))
|
opts = append(opts, orchestrator.WithCardGatewayRoutes(routes))
|
||||||
}
|
}
|
||||||
|
opts = append(opts, orchestrator.WithBatchOptimizationPolicy(buildBatchOptimizationPolicy(cfg.Optimizer)))
|
||||||
if registry := buildGatewayRegistry(i.logger, cfg.GatewayInstances, i.discoveryReg); registry != nil {
|
if registry := buildGatewayRegistry(i.logger, cfg.GatewayInstances, i.discoveryReg); registry != nil {
|
||||||
opts = append(opts, orchestrator.WithGatewayRegistry(registry))
|
opts = append(opts, orchestrator.WithGatewayRegistry(registry))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,11 +16,22 @@ type Input struct {
|
|||||||
Items []Item
|
Items []Item
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MergeMode controls whether an item can be compacted with compatible peers.
|
||||||
|
type MergeMode string
|
||||||
|
|
||||||
|
const (
|
||||||
|
MergeModeUnspecified MergeMode = ""
|
||||||
|
MergeModeByRecipient MergeMode = "merge_by_recipient"
|
||||||
|
MergeModeNever MergeMode = "never_merge"
|
||||||
|
)
|
||||||
|
|
||||||
// Item is one quote-intent pair candidate for aggregation.
|
// Item is one quote-intent pair candidate for aggregation.
|
||||||
type Item struct {
|
type Item struct {
|
||||||
IntentRef string
|
IntentRef string
|
||||||
IntentSnapshot model.PaymentIntent
|
IntentSnapshot model.PaymentIntent
|
||||||
QuoteSnapshot *model.PaymentQuoteSnapshot
|
QuoteSnapshot *model.PaymentQuoteSnapshot
|
||||||
|
MergeMode MergeMode
|
||||||
|
PolicyTag string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Group is one aggregated recipient operation group.
|
// Group is one aggregated recipient operation group.
|
||||||
|
|||||||
@@ -64,6 +64,12 @@ func (s *svc) Aggregate(in Input) (out *Output, err error) {
|
|||||||
if !isBatchingEligible(item.QuoteSnapshot) {
|
if !isBatchingEligible(item.QuoteSnapshot) {
|
||||||
key = key + keySep + "non_batching=" + itoa(i)
|
key = key + keySep + "non_batching=" + itoa(i)
|
||||||
}
|
}
|
||||||
|
if normalizeMergeMode(item.MergeMode) == MergeModeNever {
|
||||||
|
key = key + keySep + "merge_mode=never:" + itoa(i)
|
||||||
|
}
|
||||||
|
if policyTag := strings.TrimSpace(item.PolicyTag); policyTag != "" {
|
||||||
|
key = key + keySep + "policy_tag=" + policyTag
|
||||||
|
}
|
||||||
|
|
||||||
intentRef := firstNonEmpty(
|
intentRef := firstNonEmpty(
|
||||||
strings.TrimSpace(item.IntentRef),
|
strings.TrimSpace(item.IntentRef),
|
||||||
@@ -190,3 +196,12 @@ func isBatchingEligible(quote *model.PaymentQuoteSnapshot) bool {
|
|||||||
}
|
}
|
||||||
return quote.ExecutionConditions.BatchingEligible
|
return quote.ExecutionConditions.BatchingEligible
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func normalizeMergeMode(mode MergeMode) MergeMode {
|
||||||
|
switch strings.ToLower(strings.TrimSpace(string(mode))) {
|
||||||
|
case strings.ToLower(string(MergeModeNever)):
|
||||||
|
return MergeModeNever
|
||||||
|
default:
|
||||||
|
return MergeModeByRecipient
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,184 @@
|
|||||||
|
package psvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg"
|
||||||
|
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/qsnap"
|
||||||
|
"github.com/tech/sendico/payments/storage/model"
|
||||||
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
|
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BatchOptimizeInput is optimizer payload for one ExecuteBatchPayment call.
|
||||||
|
type BatchOptimizeInput struct {
|
||||||
|
Items []qsnap.ResolvedItem
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchOptimizeOutput is optimizer result payload.
|
||||||
|
type BatchOptimizeOutput struct {
|
||||||
|
Groups []opagg.Group
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchOptimizer encapsulates batch item compaction policy and aggregation.
|
||||||
|
type BatchOptimizer interface {
|
||||||
|
Optimize(in BatchOptimizeInput) (*BatchOptimizeOutput, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PolicyBatchOptimizerDependencies configures policy-based optimizer implementation.
|
||||||
|
type PolicyBatchOptimizerDependencies struct {
|
||||||
|
Logger mlogger.Logger
|
||||||
|
Aggregator opagg.Aggregator
|
||||||
|
Policy BatchOptimizationPolicy
|
||||||
|
}
|
||||||
|
|
||||||
|
type policyBatchOptimizer struct {
|
||||||
|
logger mlogger.Logger
|
||||||
|
aggregator opagg.Aggregator
|
||||||
|
policy BatchOptimizationPolicy
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPolicyBatchOptimizer creates default rule-based optimizer implementation.
|
||||||
|
func NewPolicyBatchOptimizer(deps PolicyBatchOptimizerDependencies) BatchOptimizer {
|
||||||
|
logger := deps.Logger
|
||||||
|
if logger == nil {
|
||||||
|
logger = zap.NewNop()
|
||||||
|
}
|
||||||
|
aggregator := deps.Aggregator
|
||||||
|
if aggregator == nil {
|
||||||
|
aggregator = opagg.New(opagg.Dependencies{Logger: logger})
|
||||||
|
}
|
||||||
|
return &policyBatchOptimizer{
|
||||||
|
logger: logger.Named("batch_optimizer"),
|
||||||
|
aggregator: aggregator,
|
||||||
|
policy: normalizeBatchOptimizationPolicy(deps.Policy),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *policyBatchOptimizer) Optimize(in BatchOptimizeInput) (*BatchOptimizeOutput, error) {
|
||||||
|
if len(in.Items) == 0 {
|
||||||
|
return nil, merrors.InvalidArgument("items are required")
|
||||||
|
}
|
||||||
|
|
||||||
|
aggItems := make([]opagg.Item, 0, len(in.Items))
|
||||||
|
for i := range in.Items {
|
||||||
|
item := in.Items[i]
|
||||||
|
selection := o.policy.Select(batchOptimizationContextFromItem(item))
|
||||||
|
intentSnapshot := applyBatchOptimizationSelection(item.IntentSnapshot, selection)
|
||||||
|
aggItems = append(aggItems, opagg.Item{
|
||||||
|
IntentRef: item.IntentRef,
|
||||||
|
IntentSnapshot: intentSnapshot,
|
||||||
|
QuoteSnapshot: item.QuoteSnapshot,
|
||||||
|
MergeMode: mergeModeFromBatchOptimization(selection.Mode),
|
||||||
|
PolicyTag: batchOptimizationSelectionTag(selection),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
aggOutput, err := o.aggregator.Aggregate(opagg.Input{Items: aggItems})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if aggOutput == nil || len(aggOutput.Groups) == 0 {
|
||||||
|
return nil, merrors.InvalidArgument("aggregation produced no groups")
|
||||||
|
}
|
||||||
|
return &BatchOptimizeOutput{Groups: aggOutput.Groups}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func batchOptimizationContextFromItem(item qsnap.ResolvedItem) BatchOptimizationContext {
|
||||||
|
rail, provider, network := destinationHopSignature(item.QuoteSnapshot)
|
||||||
|
money := selectSettlementMoney(item)
|
||||||
|
ctx := BatchOptimizationContext{
|
||||||
|
Rail: rail,
|
||||||
|
Provider: provider,
|
||||||
|
Network: network,
|
||||||
|
}
|
||||||
|
if money != nil {
|
||||||
|
ctx.Amount = strings.TrimSpace(money.Amount)
|
||||||
|
ctx.Currency = strings.TrimSpace(money.Currency)
|
||||||
|
}
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func selectSettlementMoney(item qsnap.ResolvedItem) *paymenttypes.Money {
|
||||||
|
if item.QuoteSnapshot != nil && item.QuoteSnapshot.ExpectedSettlementAmount != nil {
|
||||||
|
return item.QuoteSnapshot.ExpectedSettlementAmount
|
||||||
|
}
|
||||||
|
return item.IntentSnapshot.Amount
|
||||||
|
}
|
||||||
|
|
||||||
|
func destinationHopSignature(snapshot *model.PaymentQuoteSnapshot) (model.Rail, string, string) {
|
||||||
|
if snapshot == nil || snapshot.Route == nil {
|
||||||
|
return discovery.RailUnspecified, "", ""
|
||||||
|
}
|
||||||
|
route := snapshot.Route
|
||||||
|
rail := model.ParseRail(route.Rail)
|
||||||
|
provider := strings.TrimSpace(route.Provider)
|
||||||
|
network := strings.TrimSpace(route.Network)
|
||||||
|
|
||||||
|
var destinationHop *paymenttypes.QuoteRouteHop
|
||||||
|
for i := range route.Hops {
|
||||||
|
hop := route.Hops[i]
|
||||||
|
if hop == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
destinationHop = hop
|
||||||
|
if hop.Role == paymenttypes.QuoteRouteHopRoleDestination {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if destinationHop != nil {
|
||||||
|
if parsed := model.ParseRail(destinationHop.Rail); parsed != discovery.RailUnspecified {
|
||||||
|
rail = parsed
|
||||||
|
}
|
||||||
|
if provider == "" {
|
||||||
|
provider = strings.TrimSpace(destinationHop.Gateway)
|
||||||
|
}
|
||||||
|
if network == "" {
|
||||||
|
network = strings.TrimSpace(destinationHop.Network)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rail, provider, network
|
||||||
|
}
|
||||||
|
|
||||||
|
func mergeModeFromBatchOptimization(mode BatchOptimizationMode) opagg.MergeMode {
|
||||||
|
switch normalizeBatchOptimizationMode(mode) {
|
||||||
|
case BatchOptimizationModeMergeByDestination:
|
||||||
|
return opagg.MergeModeByRecipient
|
||||||
|
default:
|
||||||
|
return opagg.MergeModeNever
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func batchOptimizationSelectionTag(sel BatchOptimizationSelection) string {
|
||||||
|
mode := normalizeBatchOptimizationMode(sel.Mode)
|
||||||
|
matched := strconv.FormatBool(sel.Matched)
|
||||||
|
ruleID := strings.TrimSpace(sel.RuleID)
|
||||||
|
if ruleID == "" {
|
||||||
|
ruleID = "default"
|
||||||
|
}
|
||||||
|
return string(mode) + "|" + matched + "|" + ruleID
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyBatchOptimizationSelection(intent model.PaymentIntent, sel BatchOptimizationSelection) model.PaymentIntent {
|
||||||
|
if intent.Attributes == nil {
|
||||||
|
intent.Attributes = map[string]string{}
|
||||||
|
}
|
||||||
|
mode := normalizeBatchOptimizationMode(sel.Mode)
|
||||||
|
if mode == BatchOptimizationModeUnspecified {
|
||||||
|
mode = BatchOptimizationModeNoOptimization
|
||||||
|
}
|
||||||
|
intent.Attributes[attrBatchOptimizerMode] = string(mode)
|
||||||
|
intent.Attributes[attrBatchOptimizerRuleMatch] = strconv.FormatBool(sel.Matched)
|
||||||
|
ruleID := strings.TrimSpace(sel.RuleID)
|
||||||
|
if ruleID == "" {
|
||||||
|
delete(intent.Attributes, attrBatchOptimizerRuleID)
|
||||||
|
} else {
|
||||||
|
intent.Attributes[attrBatchOptimizerRuleID] = ruleID
|
||||||
|
}
|
||||||
|
return intent
|
||||||
|
}
|
||||||
@@ -28,8 +28,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
attrAggregatedByRecipient = "orchestrator.v2.aggregated_by_recipient"
|
attrAggregatedByRecipient = "orchestrator.v2.aggregated_by_recipient"
|
||||||
attrAggregatedItems = "orchestrator.v2.aggregated_items"
|
attrAggregatedItems = "orchestrator.v2.aggregated_items"
|
||||||
|
attrBatchOptimizerMode = "orchestrator.v2.batch_optimizer_mode"
|
||||||
|
attrBatchOptimizerRuleID = "orchestrator.v2.batch_optimizer_rule_id"
|
||||||
|
attrBatchOptimizerRuleMatch = "orchestrator.v2.batch_optimizer_rule_matched"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (resp *orchestrationv2.ExecuteBatchPaymentResponse, err error) {
|
func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (resp *orchestrationv2.ExecuteBatchPaymentResponse, err error) {
|
||||||
@@ -71,31 +74,27 @@ func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.Exec
|
|||||||
return nil, merrors.InvalidArgument("quotation has no executable items")
|
return nil, merrors.InvalidArgument("quotation has no executable items")
|
||||||
}
|
}
|
||||||
|
|
||||||
aggItems := make([]opagg.Item, 0, len(resolved.Items))
|
optimized, err := s.batchOptimizer.Optimize(BatchOptimizeInput{Items: resolved.Items})
|
||||||
for _, item := range resolved.Items {
|
|
||||||
aggItems = append(aggItems, opagg.Item{
|
|
||||||
IntentRef: item.IntentRef,
|
|
||||||
IntentSnapshot: item.IntentSnapshot,
|
|
||||||
QuoteSnapshot: item.QuoteSnapshot,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
aggOutput, err := s.aggregator.Aggregate(opagg.Input{Items: aggItems})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
group, err := s.buildBatchOperationGroup(aggOutput.Groups)
|
groups, err := s.buildBatchOperationGroups(optimized.Groups)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
payment, err := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, group)
|
payments := make([]*orchestrationv2.Payment, 0, len(groups))
|
||||||
if err != nil {
|
for i := range groups {
|
||||||
return nil, err
|
payment, groupErr := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, groups[i])
|
||||||
|
if groupErr != nil {
|
||||||
|
return nil, groupErr
|
||||||
|
}
|
||||||
|
protoPayment, mapErr := s.mapPayment(payment)
|
||||||
|
if mapErr != nil {
|
||||||
|
return nil, mapErr
|
||||||
|
}
|
||||||
|
payments = append(payments, protoPayment)
|
||||||
}
|
}
|
||||||
protoPayment, err := s.mapPayment(payment)
|
return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: payments}, nil
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: []*orchestrationv2.Payment{protoPayment}}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *svc) prepareBatchExecute(req *orchestrationv2.ExecuteBatchPaymentRequest) (*reqval.Ctx, error) {
|
func (s *svc) prepareBatchExecute(req *orchestrationv2.ExecuteBatchPaymentRequest) (*reqval.Ctx, error) {
|
||||||
@@ -270,56 +269,44 @@ func normalizeIntentRefs(values []string) []string {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *svc) buildBatchOperationGroup(groups []opagg.Group) (opagg.Group, error) {
|
func (s *svc) buildBatchOperationGroups(groups []opagg.Group) ([]opagg.Group, error) {
|
||||||
if len(groups) == 0 {
|
if len(groups) == 0 {
|
||||||
return opagg.Group{}, merrors.InvalidArgument("aggregation produced no groups")
|
return nil, merrors.InvalidArgument("aggregation produced no groups")
|
||||||
}
|
}
|
||||||
|
|
||||||
anchorDestination := groups[0].IntentSnapshot.Destination
|
out := make([]opagg.Group, 0, len(groups))
|
||||||
synthetic := make([]opagg.Item, 0, len(groups))
|
|
||||||
allIntentRefs := make([]string, 0, len(groups))
|
|
||||||
for i := range groups {
|
for i := range groups {
|
||||||
group := groups[i]
|
group := groups[i]
|
||||||
intent := group.IntentSnapshot
|
group.IntentRefs = normalizeIntentRefs(group.IntentRefs)
|
||||||
intent.Destination = anchorDestination
|
if len(group.IntentRefs) == 0 {
|
||||||
synthetic = append(synthetic, opagg.Item{
|
return nil, merrors.InvalidArgument("aggregated group has no intent refs")
|
||||||
IntentRef: strings.Join(normalizeIntentRefs(group.IntentRefs), ","),
|
|
||||||
IntentSnapshot: intent,
|
|
||||||
QuoteSnapshot: group.QuoteSnapshot,
|
|
||||||
})
|
|
||||||
allIntentRefs = append(allIntentRefs, group.IntentRefs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
collapsed, err := s.aggregator.Aggregate(opagg.Input{Items: synthetic})
|
|
||||||
if err != nil {
|
|
||||||
return opagg.Group{}, err
|
|
||||||
}
|
|
||||||
if collapsed == nil || len(collapsed.Groups) != 1 {
|
|
||||||
return opagg.Group{}, merrors.InvalidArgument("batch quotation contains incompatible operation groups")
|
|
||||||
}
|
|
||||||
|
|
||||||
out := collapsed.Groups[0]
|
|
||||||
out.IntentRefs = normalizeIntentRefs(allIntentRefs)
|
|
||||||
if len(out.IntentRefs) == 0 {
|
|
||||||
return opagg.Group{}, merrors.InvalidArgument("aggregated group has no intent refs")
|
|
||||||
}
|
|
||||||
if out.IntentSnapshot.Attributes == nil {
|
|
||||||
out.IntentSnapshot.Attributes = map[string]string{}
|
|
||||||
}
|
|
||||||
if len(groups) > 1 {
|
|
||||||
out.IntentSnapshot.Attributes[attrAggregatedByRecipient] = "true"
|
|
||||||
}
|
|
||||||
out.IntentSnapshot.Attributes[attrAggregatedItems] = strconv.Itoa(len(out.IntentRefs))
|
|
||||||
|
|
||||||
targets := buildBatchPayoutTargets(groups)
|
|
||||||
if routeContainsCardPayout(out.QuoteSnapshot) && len(targets) > 0 {
|
|
||||||
raw, err := batchmeta.EncodePayoutTargets(targets)
|
|
||||||
if err != nil {
|
|
||||||
return opagg.Group{}, err
|
|
||||||
}
|
}
|
||||||
if strings.TrimSpace(raw) != "" {
|
if group.IntentSnapshot.Attributes == nil {
|
||||||
out.IntentSnapshot.Attributes[batchmeta.AttrPayoutTargets] = raw
|
group.IntentSnapshot.Attributes = map[string]string{}
|
||||||
}
|
}
|
||||||
|
if len(group.IntentRefs) > 1 {
|
||||||
|
group.IntentSnapshot.Attributes[attrAggregatedByRecipient] = "true"
|
||||||
|
} else {
|
||||||
|
delete(group.IntentSnapshot.Attributes, attrAggregatedByRecipient)
|
||||||
|
}
|
||||||
|
group.IntentSnapshot.Attributes[attrAggregatedItems] = strconv.Itoa(len(group.IntentRefs))
|
||||||
|
|
||||||
|
targets := buildBatchPayoutTargets([]opagg.Group{group})
|
||||||
|
if routeContainsCardPayout(group.QuoteSnapshot) && len(targets) > 0 {
|
||||||
|
raw, err := batchmeta.EncodePayoutTargets(targets)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(raw) != "" {
|
||||||
|
group.IntentSnapshot.Attributes[batchmeta.AttrPayoutTargets] = raw
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
delete(group.IntentSnapshot.Attributes, batchmeta.AttrPayoutTargets)
|
||||||
|
}
|
||||||
|
out = append(out, group)
|
||||||
|
}
|
||||||
|
if len(out) == 0 {
|
||||||
|
return nil, merrors.InvalidArgument("aggregation produced no groups")
|
||||||
}
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,13 +10,14 @@ import (
|
|||||||
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/batchmeta"
|
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/batchmeta"
|
||||||
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
|
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
|
||||||
"github.com/tech/sendico/payments/storage/model"
|
"github.com/tech/sendico/payments/storage/model"
|
||||||
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
pm "github.com/tech/sendico/pkg/model"
|
pm "github.com/tech/sendico/pkg/model"
|
||||||
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
||||||
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
|
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) {
|
func TestExecuteBatchPayment_DefaultNoOptimization_DoesNotMergeSameDestination(t *testing.T) {
|
||||||
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
|
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
|
||||||
step := req.StepExecution
|
step := req.StepExecution
|
||||||
step.State = agg.StepStateCompleted
|
step.State = agg.StepStateCompleted
|
||||||
@@ -37,15 +38,17 @@ func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) {
|
|||||||
if resp == nil {
|
if resp == nil {
|
||||||
t.Fatal("expected response")
|
t.Fatal("expected response")
|
||||||
}
|
}
|
||||||
if got, want := len(resp.GetPayments()), 1; got != want {
|
if got, want := len(resp.GetPayments()), 2; got != want {
|
||||||
t.Fatalf("expected %d payment(s) for same-destination merge, got=%d", want, got)
|
t.Fatalf("expected %d payment(s) with default no optimization, got=%d", want, got)
|
||||||
}
|
}
|
||||||
if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
|
for i, p := range resp.GetPayments() {
|
||||||
t.Fatalf("state mismatch: got=%s want=%s", got, want)
|
if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
|
||||||
|
t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *testing.T) {
|
func TestExecuteBatchPayment_DefaultNoOptimization_DoesNotMergeDifferentDestinations(t *testing.T) {
|
||||||
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
|
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
|
||||||
step := req.StepExecution
|
step := req.StepExecution
|
||||||
step.State = agg.StepStateCompleted
|
step.State = agg.StepStateCompleted
|
||||||
@@ -63,8 +66,8 @@ func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *t
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
|
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
|
||||||
}
|
}
|
||||||
if got, want := len(resp.GetPayments()), 1; got != want {
|
if got, want := len(resp.GetPayments()), 2; got != want {
|
||||||
t.Fatalf("expected %d payment for batched execution, got=%d", want, got)
|
t.Fatalf("expected %d payments for batched execution without optimization, got=%d", want, got)
|
||||||
}
|
}
|
||||||
for i, p := range resp.GetPayments() {
|
for i, p := range resp.GetPayments() {
|
||||||
if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
|
if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
|
||||||
@@ -73,7 +76,7 @@ func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *t
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment(t *testing.T) {
|
func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsWithoutMerging(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
targetRefs []string
|
targetRefs []string
|
||||||
amounts []string
|
amounts []string
|
||||||
@@ -99,11 +102,13 @@ func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
|
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
|
||||||
}
|
}
|
||||||
if got, want := len(resp.GetPayments()), 1; got != want {
|
if got, want := len(resp.GetPayments()), 2; got != want {
|
||||||
t.Fatalf("expected %d payment for card batch, got=%d", want, got)
|
t.Fatalf("expected %d payments for card batch without optimization, got=%d", want, got)
|
||||||
}
|
}
|
||||||
if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
|
for i, payment := range resp.GetPayments() {
|
||||||
t.Fatalf("state mismatch: got=%s want=%s", got, want)
|
if got, want := payment.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
|
||||||
|
t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if got, want := len(targetRefs), 2; got != want {
|
if got, want := len(targetRefs), 2; got != want {
|
||||||
t.Fatalf("expected %d card payout send calls, got=%d", want, got)
|
t.Fatalf("expected %d card payout send calls, got=%d", want, got)
|
||||||
@@ -122,6 +127,41 @@ func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExecuteBatchPayment_CryptoPolicyMergesByDestination(t *testing.T) {
|
||||||
|
env := newTestEnvWithPolicy(t, BatchOptimizationPolicy{
|
||||||
|
DefaultMode: BatchOptimizationModeNoOptimization,
|
||||||
|
Rules: []BatchOptimizationRule{
|
||||||
|
{
|
||||||
|
ID: "crypto-merge",
|
||||||
|
Priority: 100,
|
||||||
|
Mode: BatchOptimizationModeMergeByDestination,
|
||||||
|
Match: BatchOptimizationMatch{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
|
||||||
|
step := req.StepExecution
|
||||||
|
step.State = agg.StepStateCompleted
|
||||||
|
return &sexec.ExecuteOutput{StepExecution: step}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
quote := newExecutableBatchCryptoQuoteSameDestination(env.orgID, "quote-batch-crypto-merge")
|
||||||
|
env.quotes.Put(quote)
|
||||||
|
|
||||||
|
resp, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{
|
||||||
|
Meta: testMeta(env.orgID, "idem-batch-crypto-merge"),
|
||||||
|
QuotationRef: "quote-batch-crypto-merge",
|
||||||
|
ClientPaymentRef: "client-batch-crypto-merge",
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
|
||||||
|
}
|
||||||
|
if got, want := len(resp.GetPayments()), 1; got != want {
|
||||||
|
t.Fatalf("expected %d merged payment for crypto policy, got=%d", want, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) {
|
func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) {
|
||||||
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
|
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
|
||||||
step := req.StepExecution
|
step := req.StepExecution
|
||||||
@@ -150,8 +190,23 @@ func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) {
|
|||||||
if got, want := len(resp2.GetPayments()), len(resp1.GetPayments()); got != want {
|
if got, want := len(resp2.GetPayments()), len(resp1.GetPayments()); got != want {
|
||||||
t.Fatalf("expected same number of payments on retry: got=%d want=%d", got, want)
|
t.Fatalf("expected same number of payments on retry: got=%d want=%d", got, want)
|
||||||
}
|
}
|
||||||
if got, want := resp2.GetPayments()[0].GetPaymentRef(), resp1.GetPayments()[0].GetPaymentRef(); got != want {
|
refs1 := make([]string, 0, len(resp1.GetPayments()))
|
||||||
t.Fatalf("expected same payment_ref on retry: got=%q want=%q", got, want)
|
refs2 := make([]string, 0, len(resp2.GetPayments()))
|
||||||
|
for i := range resp1.GetPayments() {
|
||||||
|
refs1 = append(refs1, resp1.GetPayments()[i].GetPaymentRef())
|
||||||
|
}
|
||||||
|
for i := range resp2.GetPayments() {
|
||||||
|
refs2 = append(refs2, resp2.GetPayments()[i].GetPaymentRef())
|
||||||
|
}
|
||||||
|
sort.Strings(refs1)
|
||||||
|
sort.Strings(refs2)
|
||||||
|
if len(refs1) != len(refs2) {
|
||||||
|
t.Fatalf("payment refs count mismatch: got=%d want=%d", len(refs2), len(refs1))
|
||||||
|
}
|
||||||
|
for i := range refs1 {
|
||||||
|
if refs1[i] != refs2[i] {
|
||||||
|
t.Fatalf("payment_ref mismatch on retry at index %d: got=%q want=%q", i, refs2[i], refs1[i])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -307,3 +362,81 @@ func newExecutableBatchCardQuoteDiffDest(orgRef bson.ObjectID, quoteRef string)
|
|||||||
ExpiresAt: now.Add(1 * time.Hour),
|
ExpiresAt: now.Add(1 * time.Hour),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newExecutableBatchCryptoQuoteSameDestination(orgRef bson.ObjectID, quoteRef string) *model.PaymentQuoteRecord {
|
||||||
|
now := time.Now().UTC()
|
||||||
|
return &model.PaymentQuoteRecord{
|
||||||
|
Base: modelBase(now),
|
||||||
|
OrganizationBoundBase: pm.OrganizationBoundBase{
|
||||||
|
OrganizationRef: orgRef,
|
||||||
|
},
|
||||||
|
QuoteRef: quoteRef,
|
||||||
|
RequestShape: model.QuoteRequestShapeBatch,
|
||||||
|
Items: []*model.PaymentQuoteItemV2{
|
||||||
|
{
|
||||||
|
Intent: &model.PaymentIntent{
|
||||||
|
Ref: "intent-a",
|
||||||
|
Kind: model.PaymentKindPayout,
|
||||||
|
Source: testLedgerEndpoint("ledger-src"),
|
||||||
|
Destination: testExternalChainEndpoint("TQfCWxU4ERWstZZcMzEVB9MnnCSzE4agj8"),
|
||||||
|
Amount: &paymenttypes.Money{Amount: "12", Currency: "USDT"},
|
||||||
|
SettlementCurrency: "USDT",
|
||||||
|
},
|
||||||
|
Quote: &model.PaymentQuoteSnapshot{
|
||||||
|
QuoteRef: quoteRef,
|
||||||
|
DebitAmount: &paymenttypes.Money{Amount: "12", Currency: "USDT"},
|
||||||
|
ExpectedSettlementAmount: &paymenttypes.Money{Amount: "12", Currency: "USDT"},
|
||||||
|
Route: &paymenttypes.QuoteRouteSpecification{
|
||||||
|
Rail: "CRYPTO",
|
||||||
|
Provider: "gw-crypto",
|
||||||
|
Network: "TRON_NILE",
|
||||||
|
Hops: []*paymenttypes.QuoteRouteHop{
|
||||||
|
{Index: 10, Rail: "LEDGER", Role: paymenttypes.QuoteRouteHopRoleSource},
|
||||||
|
{Index: 20, Rail: "CRYPTO", Gateway: "gw-crypto", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleDestination},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Intent: &model.PaymentIntent{
|
||||||
|
Ref: "intent-b",
|
||||||
|
Kind: model.PaymentKindPayout,
|
||||||
|
Source: testLedgerEndpoint("ledger-src"),
|
||||||
|
Destination: testExternalChainEndpoint("TQfCWxU4ERWstZZcMzEVB9MnnCSzE4agj8"),
|
||||||
|
Amount: &paymenttypes.Money{Amount: "7", Currency: "USDT"},
|
||||||
|
SettlementCurrency: "USDT",
|
||||||
|
},
|
||||||
|
Quote: &model.PaymentQuoteSnapshot{
|
||||||
|
QuoteRef: quoteRef,
|
||||||
|
DebitAmount: &paymenttypes.Money{Amount: "7", Currency: "USDT"},
|
||||||
|
ExpectedSettlementAmount: &paymenttypes.Money{Amount: "7", Currency: "USDT"},
|
||||||
|
Route: &paymenttypes.QuoteRouteSpecification{
|
||||||
|
Rail: "CRYPTO",
|
||||||
|
Provider: "gw-crypto",
|
||||||
|
Network: "TRON_NILE",
|
||||||
|
Hops: []*paymenttypes.QuoteRouteHop{
|
||||||
|
{Index: 10, Rail: "LEDGER", Role: paymenttypes.QuoteRouteHopRoleSource},
|
||||||
|
{Index: 20, Rail: "CRYPTO", Gateway: "gw-crypto", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleDestination},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ExpiresAt: now.Add(1 * time.Hour),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testExternalChainEndpoint(address string) model.PaymentEndpoint {
|
||||||
|
return model.PaymentEndpoint{
|
||||||
|
Type: model.EndpointTypeExternalChain,
|
||||||
|
ExternalChain: &model.ExternalChainEndpoint{
|
||||||
|
Address: address,
|
||||||
|
Asset: &paymenttypes.Asset{
|
||||||
|
Chain: "TRON",
|
||||||
|
TokenSymbol: "USDT",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -51,25 +51,27 @@ type Dependencies struct {
|
|||||||
|
|
||||||
QuoteStore qsnap.Store
|
QuoteStore qsnap.Store
|
||||||
|
|
||||||
Validator reqval.Validator
|
Validator reqval.Validator
|
||||||
Idempotency idem.Service
|
Idempotency idem.Service
|
||||||
Quote qsnap.Resolver
|
Quote qsnap.Resolver
|
||||||
Aggregator opagg.Aggregator
|
BatchOptimizer BatchOptimizer
|
||||||
Aggregate agg.Factory
|
Aggregator opagg.Aggregator
|
||||||
Planner xplan.Compiler
|
Aggregate agg.Factory
|
||||||
State ostate.StateMachine
|
Planner xplan.Compiler
|
||||||
Scheduler ssched.Runtime
|
State ostate.StateMachine
|
||||||
Executors sexec.Registry
|
Scheduler ssched.Runtime
|
||||||
Reconciler erecon.Reconciler
|
Executors sexec.Registry
|
||||||
Repository prepo.Repository
|
Reconciler erecon.Reconciler
|
||||||
Query pquery.Service
|
Repository prepo.Repository
|
||||||
Mapper prmap.Mapper
|
Query pquery.Service
|
||||||
Observer oobs.Observer
|
Mapper prmap.Mapper
|
||||||
Producer msg.Producer
|
Observer oobs.Observer
|
||||||
|
Producer msg.Producer
|
||||||
|
|
||||||
RetryPolicy ssched.RetryPolicy
|
RetryPolicy ssched.RetryPolicy
|
||||||
Now func() time.Time
|
BatchOptimizationPolicy BatchOptimizationPolicy
|
||||||
MaxTicks int
|
Now func() time.Time
|
||||||
|
MaxTicks int
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(deps Dependencies) (Service, error) {
|
func New(deps Dependencies) (Service, error) {
|
||||||
|
|||||||
@@ -0,0 +1,361 @@
|
|||||||
|
package psvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/shopspring/decimal"
|
||||||
|
"github.com/tech/sendico/payments/storage/model"
|
||||||
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BatchOptimizationMode defines how ExecuteBatchPayment compacts batch operations.
|
||||||
|
type BatchOptimizationMode string
|
||||||
|
|
||||||
|
const (
|
||||||
|
BatchOptimizationModeUnspecified BatchOptimizationMode = ""
|
||||||
|
BatchOptimizationModeMergeByDestination BatchOptimizationMode = "merge_by_destination"
|
||||||
|
BatchOptimizationModeNoOptimization BatchOptimizationMode = "no_optimization"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BatchOptimizationPolicy configures batch operation compaction behavior.
|
||||||
|
//
|
||||||
|
// Rules are evaluated by "best match":
|
||||||
|
// 1. all specified match fields must match the item context;
|
||||||
|
// 2. winner is highest specificity (number of optional selectors present);
|
||||||
|
// 3. ties are resolved by larger Priority;
|
||||||
|
// 4. remaining ties keep the first rule order from config.
|
||||||
|
//
|
||||||
|
// Rail is mandatory per rule. Other match fields are optional.
|
||||||
|
type BatchOptimizationPolicy struct {
|
||||||
|
DefaultMode BatchOptimizationMode
|
||||||
|
Rules []BatchOptimizationRule
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchOptimizationRule defines one matching rule.
|
||||||
|
type BatchOptimizationRule struct {
|
||||||
|
ID string
|
||||||
|
Enabled *bool
|
||||||
|
Priority int
|
||||||
|
Mode BatchOptimizationMode
|
||||||
|
Match BatchOptimizationMatch
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchOptimizationMatch defines selectors for one rule.
|
||||||
|
type BatchOptimizationMatch struct {
|
||||||
|
Rail model.Rail
|
||||||
|
Providers []string
|
||||||
|
Networks []string
|
||||||
|
Currencies []string
|
||||||
|
Amount *BatchOptimizationAmountRange
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchOptimizationAmountRange defines optional amount matching boundaries.
|
||||||
|
type BatchOptimizationAmountRange struct {
|
||||||
|
Min string
|
||||||
|
Max string
|
||||||
|
Currency string
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchOptimizationContext is runtime data used for matching one batch item.
|
||||||
|
type BatchOptimizationContext struct {
|
||||||
|
Rail model.Rail
|
||||||
|
Provider string
|
||||||
|
Network string
|
||||||
|
Currency string
|
||||||
|
Amount string
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchOptimizationSelection is the selected optimization decision.
|
||||||
|
type BatchOptimizationSelection struct {
|
||||||
|
Mode BatchOptimizationMode
|
||||||
|
RuleID string
|
||||||
|
Matched bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultBatchOptimizationPolicy() BatchOptimizationPolicy {
|
||||||
|
return BatchOptimizationPolicy{
|
||||||
|
DefaultMode: BatchOptimizationModeNoOptimization,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeBatchOptimizationPolicy(in BatchOptimizationPolicy) BatchOptimizationPolicy {
|
||||||
|
out := defaultBatchOptimizationPolicy()
|
||||||
|
if normalizedDefault := normalizeBatchOptimizationMode(in.DefaultMode); normalizedDefault != BatchOptimizationModeUnspecified {
|
||||||
|
out.DefaultMode = normalizedDefault
|
||||||
|
}
|
||||||
|
if len(in.Rules) == 0 {
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
rules := make([]BatchOptimizationRule, 0, len(in.Rules))
|
||||||
|
for i := range in.Rules {
|
||||||
|
rule, ok := normalizeBatchOptimizationRule(in.Rules[i])
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rules = append(rules, rule)
|
||||||
|
}
|
||||||
|
if len(rules) > 0 {
|
||||||
|
out.Rules = rules
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeBatchOptimizationRule(in BatchOptimizationRule) (BatchOptimizationRule, bool) {
|
||||||
|
out := BatchOptimizationRule{
|
||||||
|
ID: strings.TrimSpace(in.ID),
|
||||||
|
Enabled: in.Enabled,
|
||||||
|
Priority: in.Priority,
|
||||||
|
Mode: normalizeBatchOptimizationMode(in.Mode),
|
||||||
|
}
|
||||||
|
if out.Mode == BatchOptimizationModeUnspecified {
|
||||||
|
return BatchOptimizationRule{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
match, ok := normalizeBatchOptimizationMatch(in.Match)
|
||||||
|
if !ok {
|
||||||
|
return BatchOptimizationRule{}, false
|
||||||
|
}
|
||||||
|
out.Match = match
|
||||||
|
return out, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeBatchOptimizationMatch(in BatchOptimizationMatch) (BatchOptimizationMatch, bool) {
|
||||||
|
out := BatchOptimizationMatch{
|
||||||
|
Rail: model.ParseRail(string(in.Rail)),
|
||||||
|
Providers: normalizeOptimizationTokens(in.Providers),
|
||||||
|
Networks: normalizeOptimizationTokens(in.Networks),
|
||||||
|
Currencies: normalizeOptimizationCurrencies(in.Currencies),
|
||||||
|
}
|
||||||
|
if out.Rail == discovery.RailUnspecified {
|
||||||
|
return BatchOptimizationMatch{}, false
|
||||||
|
}
|
||||||
|
if in.Amount != nil {
|
||||||
|
normalizedAmount, ok := normalizeBatchOptimizationAmountRange(*in.Amount)
|
||||||
|
if !ok {
|
||||||
|
return BatchOptimizationMatch{}, false
|
||||||
|
}
|
||||||
|
out.Amount = &normalizedAmount
|
||||||
|
}
|
||||||
|
return out, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeBatchOptimizationAmountRange(in BatchOptimizationAmountRange) (BatchOptimizationAmountRange, bool) {
|
||||||
|
out := BatchOptimizationAmountRange{
|
||||||
|
Min: strings.TrimSpace(in.Min),
|
||||||
|
Max: strings.TrimSpace(in.Max),
|
||||||
|
Currency: normalizeOptimizationCurrency(in.Currency),
|
||||||
|
}
|
||||||
|
if out.Min == "" && out.Max == "" {
|
||||||
|
return BatchOptimizationAmountRange{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
min decimal.Decimal
|
||||||
|
max decimal.Decimal
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if out.Min != "" {
|
||||||
|
min, err = decimal.NewFromString(out.Min)
|
||||||
|
if err != nil {
|
||||||
|
return BatchOptimizationAmountRange{}, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if out.Max != "" {
|
||||||
|
max, err = decimal.NewFromString(out.Max)
|
||||||
|
if err != nil {
|
||||||
|
return BatchOptimizationAmountRange{}, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if out.Min != "" && out.Max != "" && min.GreaterThan(max) {
|
||||||
|
return BatchOptimizationAmountRange{}, false
|
||||||
|
}
|
||||||
|
return out, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p BatchOptimizationPolicy) Select(in BatchOptimizationContext) BatchOptimizationSelection {
|
||||||
|
policy := normalizeBatchOptimizationPolicy(p)
|
||||||
|
ctx := normalizeBatchOptimizationContext(in)
|
||||||
|
|
||||||
|
selected := BatchOptimizationSelection{
|
||||||
|
Mode: policy.DefaultMode,
|
||||||
|
}
|
||||||
|
|
||||||
|
bestSpecificity := -1
|
||||||
|
bestPriority := 0
|
||||||
|
bestOrder := len(policy.Rules) + 1
|
||||||
|
for i := range policy.Rules {
|
||||||
|
rule := policy.Rules[i]
|
||||||
|
if rule.Enabled != nil && !*rule.Enabled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
specificity, ok := rule.matches(ctx)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if specificity > bestSpecificity ||
|
||||||
|
(specificity == bestSpecificity && rule.Priority > bestPriority) ||
|
||||||
|
(specificity == bestSpecificity && rule.Priority == bestPriority && i < bestOrder) {
|
||||||
|
selected = BatchOptimizationSelection{
|
||||||
|
Mode: rule.Mode,
|
||||||
|
RuleID: rule.ID,
|
||||||
|
Matched: true,
|
||||||
|
}
|
||||||
|
bestSpecificity = specificity
|
||||||
|
bestPriority = rule.Priority
|
||||||
|
bestOrder = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return selected
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r BatchOptimizationRule) matches(ctx BatchOptimizationContext) (int, bool) {
|
||||||
|
match := r.Match
|
||||||
|
if match.Rail == discovery.RailUnspecified || ctx.Rail == discovery.RailUnspecified {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
if match.Rail != ctx.Rail {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
specificity := 0
|
||||||
|
if len(match.Providers) > 0 {
|
||||||
|
specificity++
|
||||||
|
if !containsOptimizationToken(match.Providers, ctx.Provider) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(match.Networks) > 0 {
|
||||||
|
specificity++
|
||||||
|
if !containsOptimizationToken(match.Networks, ctx.Network) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(match.Currencies) > 0 {
|
||||||
|
specificity++
|
||||||
|
if !containsOptimizationToken(match.Currencies, ctx.Currency) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if match.Amount != nil {
|
||||||
|
specificity++
|
||||||
|
if !amountRangeMatches(*match.Amount, ctx.Amount, ctx.Currency) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return specificity, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func amountRangeMatches(rule BatchOptimizationAmountRange, amountRaw string, currencyRaw string) bool {
|
||||||
|
value, err := decimal.NewFromString(strings.TrimSpace(amountRaw))
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if ruleCurrency := normalizeOptimizationCurrency(rule.Currency); ruleCurrency != "" && ruleCurrency != normalizeOptimizationCurrency(currencyRaw) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if rule.Min != "" {
|
||||||
|
min, parseErr := decimal.NewFromString(rule.Min)
|
||||||
|
if parseErr != nil || value.LessThan(min) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if rule.Max != "" {
|
||||||
|
max, parseErr := decimal.NewFromString(rule.Max)
|
||||||
|
if parseErr != nil || value.GreaterThan(max) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeBatchOptimizationContext(in BatchOptimizationContext) BatchOptimizationContext {
|
||||||
|
return BatchOptimizationContext{
|
||||||
|
Rail: model.ParseRail(string(in.Rail)),
|
||||||
|
Provider: normalizeOptimizationToken(in.Provider),
|
||||||
|
Network: normalizeOptimizationToken(in.Network),
|
||||||
|
Currency: normalizeOptimizationCurrency(in.Currency),
|
||||||
|
Amount: strings.TrimSpace(in.Amount),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeBatchOptimizationMode(raw BatchOptimizationMode) BatchOptimizationMode {
|
||||||
|
switch strings.ToLower(strings.TrimSpace(string(raw))) {
|
||||||
|
case "merge_by_destination", "merge-by-destination", "by_destination", "by-destination":
|
||||||
|
return BatchOptimizationModeMergeByDestination
|
||||||
|
case "no_optimization", "no-optimization", "none", "never_merge", "never-merge", "never":
|
||||||
|
return BatchOptimizationModeNoOptimization
|
||||||
|
default:
|
||||||
|
return BatchOptimizationModeUnspecified
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeOptimizationTokens(values []string) []string {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
seen := make(map[string]struct{}, len(values))
|
||||||
|
out := make([]string, 0, len(values))
|
||||||
|
for i := range values {
|
||||||
|
token := normalizeOptimizationToken(values[i])
|
||||||
|
if token == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, exists := seen[token]; exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[token] = struct{}{}
|
||||||
|
out = append(out, token)
|
||||||
|
}
|
||||||
|
if len(out) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeOptimizationCurrencies(values []string) []string {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
seen := make(map[string]struct{}, len(values))
|
||||||
|
out := make([]string, 0, len(values))
|
||||||
|
for i := range values {
|
||||||
|
token := normalizeOptimizationCurrency(values[i])
|
||||||
|
if token == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, exists := seen[token]; exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[token] = struct{}{}
|
||||||
|
out = append(out, token)
|
||||||
|
}
|
||||||
|
if len(out) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeOptimizationToken(value string) string {
|
||||||
|
return strings.ToUpper(strings.TrimSpace(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizeOptimizationCurrency(value string) string {
|
||||||
|
return strings.ToUpper(strings.TrimSpace(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
func containsOptimizationToken(values []string, value string) bool {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
normalized := normalizeOptimizationToken(value)
|
||||||
|
if normalized == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i := range values {
|
||||||
|
if values[i] == normalized {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
@@ -0,0 +1,167 @@
|
|||||||
|
package psvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNormalizeBatchOptimizationPolicy_DefaultMode_NoOptimization(t *testing.T) {
|
||||||
|
got := normalizeBatchOptimizationPolicy(BatchOptimizationPolicy{})
|
||||||
|
if got.DefaultMode != BatchOptimizationModeNoOptimization {
|
||||||
|
t.Fatalf("default_mode mismatch: got=%q want=%q", got.DefaultMode, BatchOptimizationModeNoOptimization)
|
||||||
|
}
|
||||||
|
if got.Rules != nil {
|
||||||
|
t.Fatalf("expected no rules, got=%d", len(got.Rules))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeBatchOptimizationPolicy_DropsInvalidRules(t *testing.T) {
|
||||||
|
got := normalizeBatchOptimizationPolicy(BatchOptimizationPolicy{
|
||||||
|
DefaultMode: "merge_by_destination",
|
||||||
|
Rules: []BatchOptimizationRule{
|
||||||
|
{
|
||||||
|
ID: "missing-rail",
|
||||||
|
Mode: BatchOptimizationModeMergeByDestination,
|
||||||
|
Match: BatchOptimizationMatch{
|
||||||
|
Rail: "unknown",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "invalid-mode",
|
||||||
|
Mode: "x",
|
||||||
|
Match: BatchOptimizationMatch{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "valid",
|
||||||
|
Mode: BatchOptimizationModeMergeByDestination,
|
||||||
|
Match: BatchOptimizationMatch{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if got.DefaultMode != BatchOptimizationModeMergeByDestination {
|
||||||
|
t.Fatalf("default_mode mismatch: got=%q want=%q", got.DefaultMode, BatchOptimizationModeMergeByDestination)
|
||||||
|
}
|
||||||
|
if len(got.Rules) != 1 {
|
||||||
|
t.Fatalf("rules count mismatch: got=%d want=%d", len(got.Rules), 1)
|
||||||
|
}
|
||||||
|
if got.Rules[0].ID != "valid" {
|
||||||
|
t.Fatalf("rule id mismatch: got=%q want=%q", got.Rules[0].ID, "valid")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchOptimizationPolicy_Select_UsesBestSpecificityThenPriority(t *testing.T) {
|
||||||
|
policy := BatchOptimizationPolicy{
|
||||||
|
DefaultMode: BatchOptimizationModeNoOptimization,
|
||||||
|
Rules: []BatchOptimizationRule{
|
||||||
|
{
|
||||||
|
ID: "crypto-generic",
|
||||||
|
Priority: 10,
|
||||||
|
Mode: BatchOptimizationModeMergeByDestination,
|
||||||
|
Match: BatchOptimizationMatch{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "crypto-network",
|
||||||
|
Priority: 5,
|
||||||
|
Mode: BatchOptimizationModeNoOptimization,
|
||||||
|
Match: BatchOptimizationMatch{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
Networks: []string{"tron_nile"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "crypto-network-priority",
|
||||||
|
Priority: 20,
|
||||||
|
Mode: BatchOptimizationModeMergeByDestination,
|
||||||
|
Match: BatchOptimizationMatch{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
Networks: []string{"tron_nile"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
out := policy.Select(BatchOptimizationContext{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
Network: "TRON_NILE",
|
||||||
|
})
|
||||||
|
if !out.Matched {
|
||||||
|
t.Fatal("expected matched rule")
|
||||||
|
}
|
||||||
|
if out.RuleID != "crypto-network-priority" {
|
||||||
|
t.Fatalf("rule id mismatch: got=%q want=%q", out.RuleID, "crypto-network-priority")
|
||||||
|
}
|
||||||
|
if out.Mode != BatchOptimizationModeMergeByDestination {
|
||||||
|
t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeMergeByDestination)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchOptimizationPolicy_Select_FallsBackToDefault(t *testing.T) {
|
||||||
|
policy := BatchOptimizationPolicy{
|
||||||
|
DefaultMode: BatchOptimizationModeNoOptimization,
|
||||||
|
Rules: []BatchOptimizationRule{
|
||||||
|
{
|
||||||
|
ID: "crypto-only",
|
||||||
|
Mode: BatchOptimizationModeMergeByDestination,
|
||||||
|
Match: BatchOptimizationMatch{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
out := policy.Select(BatchOptimizationContext{Rail: discovery.RailCardPayout})
|
||||||
|
if out.Matched {
|
||||||
|
t.Fatalf("unexpected matched rule: %q", out.RuleID)
|
||||||
|
}
|
||||||
|
if out.Mode != BatchOptimizationModeNoOptimization {
|
||||||
|
t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeNoOptimization)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchOptimizationPolicy_Select_MatchesAmountRange(t *testing.T) {
|
||||||
|
policy := BatchOptimizationPolicy{
|
||||||
|
DefaultMode: BatchOptimizationModeNoOptimization,
|
||||||
|
Rules: []BatchOptimizationRule{
|
||||||
|
{
|
||||||
|
ID: "crypto-large-usdt",
|
||||||
|
Mode: BatchOptimizationModeMergeByDestination,
|
||||||
|
Match: BatchOptimizationMatch{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
Amount: &BatchOptimizationAmountRange{
|
||||||
|
Min: "100",
|
||||||
|
Currency: "USDT",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
matched := policy.Select(BatchOptimizationContext{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
Amount: "120",
|
||||||
|
Currency: "USDT",
|
||||||
|
})
|
||||||
|
if !matched.Matched {
|
||||||
|
t.Fatal("expected amount rule match")
|
||||||
|
}
|
||||||
|
|
||||||
|
notMatched := policy.Select(BatchOptimizationContext{
|
||||||
|
Rail: discovery.RailCrypto,
|
||||||
|
Amount: "50",
|
||||||
|
Currency: "USDT",
|
||||||
|
})
|
||||||
|
if notMatched.Matched {
|
||||||
|
t.Fatalf("expected no amount rule match, got rule=%q", notMatched.RuleID)
|
||||||
|
}
|
||||||
|
if notMatched.Mode != BatchOptimizationModeNoOptimization {
|
||||||
|
t.Fatalf("default mode mismatch: got=%q want=%q", notMatched.Mode, BatchOptimizationModeNoOptimization)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -32,21 +32,21 @@ type svc struct {
|
|||||||
|
|
||||||
quoteStore qsnap.Store
|
quoteStore qsnap.Store
|
||||||
|
|
||||||
validator reqval.Validator
|
validator reqval.Validator
|
||||||
idempotency idem.Service
|
idempotency idem.Service
|
||||||
quote qsnap.Resolver
|
quote qsnap.Resolver
|
||||||
aggregator opagg.Aggregator
|
batchOptimizer BatchOptimizer
|
||||||
aggregate agg.Factory
|
aggregate agg.Factory
|
||||||
planner xplan.Compiler
|
planner xplan.Compiler
|
||||||
state ostate.StateMachine
|
state ostate.StateMachine
|
||||||
scheduler ssched.Runtime
|
scheduler ssched.Runtime
|
||||||
executors sexec.Registry
|
executors sexec.Registry
|
||||||
reconciler erecon.Reconciler
|
reconciler erecon.Reconciler
|
||||||
repository prepo.Repository
|
repository prepo.Repository
|
||||||
query pquery.Service
|
query pquery.Service
|
||||||
mapper prmap.Mapper
|
mapper prmap.Mapper
|
||||||
observer oobs.Observer
|
observer oobs.Observer
|
||||||
statuses paymentStatusPublisher
|
statuses paymentStatusPublisher
|
||||||
|
|
||||||
retryPolicy ssched.RetryPolicy
|
retryPolicy ssched.RetryPolicy
|
||||||
now func() time.Time
|
now func() time.Time
|
||||||
@@ -93,21 +93,21 @@ func newService(deps Dependencies) (Service, error) {
|
|||||||
|
|
||||||
quoteStore: deps.QuoteStore,
|
quoteStore: deps.QuoteStore,
|
||||||
|
|
||||||
validator: firstValidator(deps.Validator, logger),
|
validator: firstValidator(deps.Validator, logger),
|
||||||
idempotency: firstIdempotency(deps.Idempotency, logger),
|
idempotency: firstIdempotency(deps.Idempotency, logger),
|
||||||
quote: firstQuoteResolver(deps.Quote, logger),
|
quote: firstQuoteResolver(deps.Quote, logger),
|
||||||
aggregator: firstAggregator(deps.Aggregator, logger),
|
batchOptimizer: firstBatchOptimizer(deps.BatchOptimizer, deps.BatchOptimizationPolicy, deps.Aggregator, logger),
|
||||||
aggregate: firstAggregateFactory(deps.Aggregate, logger),
|
aggregate: firstAggregateFactory(deps.Aggregate, logger),
|
||||||
planner: firstPlanCompiler(deps.Planner, logger),
|
planner: firstPlanCompiler(deps.Planner, logger),
|
||||||
state: firstStateMachine(deps.State, logger),
|
state: firstStateMachine(deps.State, logger),
|
||||||
scheduler: firstScheduler(deps.Scheduler, logger),
|
scheduler: firstScheduler(deps.Scheduler, logger),
|
||||||
executors: firstExecutors(deps.Executors, logger),
|
executors: firstExecutors(deps.Executors, logger),
|
||||||
reconciler: firstReconciler(deps.Reconciler, logger),
|
reconciler: firstReconciler(deps.Reconciler, logger),
|
||||||
repository: deps.Repository,
|
repository: deps.Repository,
|
||||||
query: query,
|
query: query,
|
||||||
mapper: firstMapper(deps.Mapper, logger),
|
mapper: firstMapper(deps.Mapper, logger),
|
||||||
observer: observer,
|
observer: observer,
|
||||||
statuses: newPaymentStatusPublisher(logger, deps.Producer),
|
statuses: newPaymentStatusPublisher(logger, deps.Producer),
|
||||||
|
|
||||||
retryPolicy: deps.RetryPolicy,
|
retryPolicy: deps.RetryPolicy,
|
||||||
now: deps.Now,
|
now: deps.Now,
|
||||||
@@ -153,6 +153,17 @@ func firstAggregateFactory(v agg.Factory, logger mlogger.Logger) agg.Factory {
|
|||||||
return agg.New(agg.Dependencies{Logger: logger})
|
return agg.New(agg.Dependencies{Logger: logger})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func firstBatchOptimizer(v BatchOptimizer, policy BatchOptimizationPolicy, aggregator opagg.Aggregator, logger mlogger.Logger) BatchOptimizer {
|
||||||
|
if v != nil {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return NewPolicyBatchOptimizer(PolicyBatchOptimizerDependencies{
|
||||||
|
Logger: logger,
|
||||||
|
Aggregator: firstAggregator(aggregator, logger),
|
||||||
|
Policy: policy,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func firstAggregator(v opagg.Aggregator, logger mlogger.Logger) opagg.Aggregator {
|
func firstAggregator(v opagg.Aggregator, logger mlogger.Logger) opagg.Aggregator {
|
||||||
if v != nil {
|
if v != nil {
|
||||||
return v
|
return v
|
||||||
|
|||||||
@@ -366,6 +366,14 @@ type testEnv struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error)) *testEnv {
|
func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error)) *testEnv {
|
||||||
|
return newTestEnvWithPolicy(t, BatchOptimizationPolicy{}, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestEnvWithPolicy(
|
||||||
|
t *testing.T,
|
||||||
|
policy BatchOptimizationPolicy,
|
||||||
|
handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error),
|
||||||
|
) *testEnv {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
repo := newMemoryRepo(func() time.Time {
|
repo := newMemoryRepo(func() time.Time {
|
||||||
return time.Now().UTC()
|
return time.Now().UTC()
|
||||||
@@ -388,13 +396,14 @@ func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (
|
|||||||
|
|
||||||
producer := &capturingProducer{}
|
producer := &capturingProducer{}
|
||||||
svc, err := New(Dependencies{
|
svc, err := New(Dependencies{
|
||||||
QuoteStore: quotes,
|
QuoteStore: quotes,
|
||||||
Repository: repo,
|
Repository: repo,
|
||||||
Executors: registry,
|
Executors: registry,
|
||||||
Observer: observer,
|
Observer: observer,
|
||||||
Producer: producer,
|
Producer: producer,
|
||||||
RetryPolicy: ssched.RetryPolicy{MaxAttempts: 2},
|
RetryPolicy: ssched.RetryPolicy{MaxAttempts: 2},
|
||||||
MaxTicks: 20,
|
BatchOptimizationPolicy: policy,
|
||||||
|
MaxTicks: 20,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("New returned error: %v", err)
|
t.Fatalf("New returned error: %v", err)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
chainclient "github.com/tech/sendico/gateway/chain/client"
|
chainclient "github.com/tech/sendico/gateway/chain/client"
|
||||||
ledgerclient "github.com/tech/sendico/ledger/client"
|
ledgerclient "github.com/tech/sendico/ledger/client"
|
||||||
|
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
|
||||||
"github.com/tech/sendico/payments/storage/model"
|
"github.com/tech/sendico/payments/storage/model"
|
||||||
clockpkg "github.com/tech/sendico/pkg/clock"
|
clockpkg "github.com/tech/sendico/pkg/clock"
|
||||||
"github.com/tech/sendico/pkg/discovery"
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
@@ -108,6 +109,16 @@ func WithGatewayRegistry(registry GatewayRegistry) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithBatchOptimizationPolicy configures policy-based batch optimizer behavior.
|
||||||
|
func WithBatchOptimizationPolicy(policy psvc.BatchOptimizationPolicy) Option {
|
||||||
|
return func(s *Service) {
|
||||||
|
if s == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.batchOptimizationPolicy = policy
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type discoveryGatewayRegistry struct {
|
type discoveryGatewayRegistry struct {
|
||||||
registry *discovery.Registry
|
registry *discovery.Registry
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,13 +24,14 @@ type Service struct {
|
|||||||
paymentRepo prepo.Repository
|
paymentRepo prepo.Repository
|
||||||
producer msg.Producer
|
producer msg.Producer
|
||||||
|
|
||||||
ledgerClient ledgerclient.Client
|
ledgerClient ledgerclient.Client
|
||||||
gatewayInvokeResolver GatewayInvokeResolver
|
gatewayInvokeResolver GatewayInvokeResolver
|
||||||
gatewayRegistry GatewayRegistry
|
gatewayRegistry GatewayRegistry
|
||||||
cardGatewayRoutes map[string]CardGatewayRoute
|
cardGatewayRoutes map[string]CardGatewayRoute
|
||||||
paymentGatewayBroker mb.Broker
|
batchOptimizationPolicy psvc.BatchOptimizationPolicy
|
||||||
gatewayConsumers []msg.Consumer
|
paymentGatewayBroker mb.Broker
|
||||||
stopExternalWorkers context.CancelFunc
|
gatewayConsumers []msg.Consumer
|
||||||
|
stopExternalWorkers context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService constructs the v2 orchestrator service.
|
// NewService constructs the v2 orchestrator service.
|
||||||
@@ -53,11 +54,12 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
|
|||||||
|
|
||||||
var err error
|
var err error
|
||||||
svc.v2, svc.paymentRepo, err = newOrchestrationV2Service(svc.logger, repo, v2RuntimeDeps{
|
svc.v2, svc.paymentRepo, err = newOrchestrationV2Service(svc.logger, repo, v2RuntimeDeps{
|
||||||
LedgerClient: svc.ledgerClient,
|
LedgerClient: svc.ledgerClient,
|
||||||
GatewayInvokeResolver: svc.gatewayInvokeResolver,
|
GatewayInvokeResolver: svc.gatewayInvokeResolver,
|
||||||
GatewayRegistry: svc.gatewayRegistry,
|
GatewayRegistry: svc.gatewayRegistry,
|
||||||
CardGatewayRoutes: svc.cardGatewayRoutes,
|
CardGatewayRoutes: svc.cardGatewayRoutes,
|
||||||
Producer: svc.producer,
|
BatchOptimizationPolicy: svc.batchOptimizationPolicy,
|
||||||
|
Producer: svc.producer,
|
||||||
})
|
})
|
||||||
svc.startExternalRuntime()
|
svc.startExternalRuntime()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -25,11 +25,12 @@ type v2MongoDBProvider interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type v2RuntimeDeps struct {
|
type v2RuntimeDeps struct {
|
||||||
LedgerClient ledgerclient.Client
|
LedgerClient ledgerclient.Client
|
||||||
GatewayInvokeResolver GatewayInvokeResolver
|
GatewayInvokeResolver GatewayInvokeResolver
|
||||||
GatewayRegistry GatewayRegistry
|
GatewayRegistry GatewayRegistry
|
||||||
CardGatewayRoutes map[string]CardGatewayRoute
|
CardGatewayRoutes map[string]CardGatewayRoute
|
||||||
Producer msg.Producer
|
BatchOptimizationPolicy psvc.BatchOptimizationPolicy
|
||||||
|
Producer msg.Producer
|
||||||
}
|
}
|
||||||
|
|
||||||
func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository, runtimeDeps v2RuntimeDeps) (psvc.Service, prepo.Repository, error) {
|
func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository, runtimeDeps v2RuntimeDeps) (psvc.Service, prepo.Repository, error) {
|
||||||
@@ -70,13 +71,14 @@ func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository, r
|
|||||||
executors := buildOrchestrationV2Executors(logger, runtimeDeps)
|
executors := buildOrchestrationV2Executors(logger, runtimeDeps)
|
||||||
|
|
||||||
svc, err := psvc.New(psvc.Dependencies{
|
svc, err := psvc.New(psvc.Dependencies{
|
||||||
Logger: logger.Named("v2"),
|
Logger: logger.Named("v2"),
|
||||||
QuoteStore: repo.Quotes(),
|
QuoteStore: repo.Quotes(),
|
||||||
Repository: paymentRepo,
|
Repository: paymentRepo,
|
||||||
Query: query,
|
Query: query,
|
||||||
Observer: observer,
|
Observer: observer,
|
||||||
Executors: executors,
|
Executors: executors,
|
||||||
Producer: runtimeDeps.Producer,
|
BatchOptimizationPolicy: runtimeDeps.BatchOptimizationPolicy,
|
||||||
|
Producer: runtimeDeps.Producer,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Orchestration v2 disabled: service init failed", zap.Error(err))
|
logger.Error("Orchestration v2 disabled: service init failed", zap.Error(err))
|
||||||
|
|||||||
Reference in New Issue
Block a user