added payment orchestrator optimizer tweaks #716

Merged
tech merged 1 commits from po-714 into main 2026-03-11 14:58:54 +00:00
19 changed files with 1164 additions and 157 deletions

2
.gitignore vendored
View File

@@ -14,7 +14,9 @@ test.sh
.gocache/ .gocache/
.golangci-cache/ .golangci-cache/
.cache/ .cache/
.claude/ .claude/
.codex/
# Air hot reload build artifacts # Air hot reload build artifacts
**/tmp/ **/tmp/

View File

@@ -44,4 +44,17 @@ card_gateways:
funding_address: "TUaWaCkiXwYPKm5qjcB27Lhwv976vPvedE" funding_address: "TUaWaCkiXwYPKm5qjcB27Lhwv976vPvedE"
fee_wallet_ref: "697a062a248dc785125ccb9e" fee_wallet_ref: "697a062a248dc785125ccb9e"
# Batch optimizer settings:
# - default_mode disables aggregation for unmatched traffic
# - crypto rule enables destination aggregation for crypto rail
optimizer:
aggregation:
default_mode: "no_optimization"
rules:
- id: "crypto_by_destination"
priority: 100
mode: "merge_by_destination"
match:
rail: "CRYPTO"
# Gateway instances and capabilities are sourced from service discovery. # Gateway instances and capabilities are sourced from service discovery.

View File

@@ -44,4 +44,17 @@ card_gateways:
funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF" funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF"
fee_wallet_ref: "694c124ed76f9f811ac57133" fee_wallet_ref: "694c124ed76f9f811ac57133"
# Batch optimizer settings:
# - default_mode disables aggregation for unmatched traffic
# - crypto rule enables destination aggregation for crypto rail
optimizer:
aggregation:
default_mode: "no_optimization"
rules:
- id: "crypto_by_destination"
priority: 100
mode: "merge_by_destination"
match:
rail: "CRYPTO"
# Gateway instances and capabilities are sourced from service discovery. # Gateway instances and capabilities are sourced from service discovery.

View File

@@ -3,7 +3,9 @@ package serverimp
import ( import (
"strings" "strings"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
) )
@@ -42,3 +44,52 @@ func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, re
} }
return orchestrator.NewDiscoveryGatewayRegistry(logger, registry) return orchestrator.NewDiscoveryGatewayRegistry(logger, registry)
} }
func buildBatchOptimizationPolicy(cfg optimizerConfig) psvc.BatchOptimizationPolicy {
rules := make([]psvc.BatchOptimizationRule, 0, len(cfg.Aggregation.Rules))
for i := range cfg.Aggregation.Rules {
ruleCfg := cfg.Aggregation.Rules[i]
rule := psvc.BatchOptimizationRule{
ID: strings.TrimSpace(ruleCfg.ID),
Enabled: ruleCfg.Enabled,
Priority: ruleCfg.Priority,
Mode: psvc.BatchOptimizationMode(strings.TrimSpace(ruleCfg.Mode)),
Match: psvc.BatchOptimizationMatch{
Rail: model.ParseRail(ruleCfg.Match.Rail),
Providers: cloneTrimmedSlice(ruleCfg.Match.Providers),
Networks: cloneTrimmedSlice(ruleCfg.Match.Networks),
Currencies: cloneTrimmedSlice(ruleCfg.Match.Currencies),
},
}
if ruleCfg.Match.Amount != nil {
rule.Match.Amount = &psvc.BatchOptimizationAmountRange{
Min: strings.TrimSpace(ruleCfg.Match.Amount.Min),
Max: strings.TrimSpace(ruleCfg.Match.Amount.Max),
Currency: strings.TrimSpace(ruleCfg.Match.Amount.Currency),
}
}
rules = append(rules, rule)
}
return psvc.BatchOptimizationPolicy{
DefaultMode: psvc.BatchOptimizationMode(strings.TrimSpace(cfg.Aggregation.DefaultMode)),
Rules: rules,
}
}
func cloneTrimmedSlice(values []string) []string {
if len(values) == 0 {
return nil
}
out := make([]string, 0, len(values))
for i := range values {
token := strings.TrimSpace(values[i])
if token == "" {
continue
}
out = append(out, token)
}
if len(out) == 0 {
return nil
}
return out
}

View File

@@ -15,9 +15,41 @@ type config struct {
*grpcapp.Config `yaml:",inline"` *grpcapp.Config `yaml:",inline"`
CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"` CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"`
GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"` GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"`
Optimizer optimizerConfig `yaml:"optimizer"`
QuoteRetentionHrs int `yaml:"quote_retention_hours"` QuoteRetentionHrs int `yaml:"quote_retention_hours"`
} }
type optimizerConfig struct {
Aggregation aggregationConfig `yaml:"aggregation"`
}
type aggregationConfig struct {
DefaultMode string `yaml:"default_mode"`
Rules []aggregationRuleConfig `yaml:"rules"`
}
type aggregationRuleConfig struct {
ID string `yaml:"id"`
Enabled *bool `yaml:"enabled"`
Priority int `yaml:"priority"`
Mode string `yaml:"mode"`
Match aggregationMatchConfig `yaml:"match"`
}
type aggregationMatchConfig struct {
Rail string `yaml:"rail"`
Providers []string `yaml:"providers"`
Networks []string `yaml:"networks"`
Currencies []string `yaml:"currencies"`
Amount *aggregationAmountConfig `yaml:"amount"`
}
type aggregationAmountConfig struct {
Min string `yaml:"min"`
Max string `yaml:"max"`
Currency string `yaml:"currency"`
}
type cardGatewayRouteConfig struct { type cardGatewayRouteConfig struct {
FundingAddress string `yaml:"funding_address"` FundingAddress string `yaml:"funding_address"`
FeeAddress string `yaml:"fee_address"` FeeAddress string `yaml:"fee_address"`

View File

@@ -40,6 +40,7 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest
if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 { if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 {
opts = append(opts, orchestrator.WithCardGatewayRoutes(routes)) opts = append(opts, orchestrator.WithCardGatewayRoutes(routes))
} }
opts = append(opts, orchestrator.WithBatchOptimizationPolicy(buildBatchOptimizationPolicy(cfg.Optimizer)))
if registry := buildGatewayRegistry(i.logger, cfg.GatewayInstances, i.discoveryReg); registry != nil { if registry := buildGatewayRegistry(i.logger, cfg.GatewayInstances, i.discoveryReg); registry != nil {
opts = append(opts, orchestrator.WithGatewayRegistry(registry)) opts = append(opts, orchestrator.WithGatewayRegistry(registry))
} }

View File

@@ -16,11 +16,22 @@ type Input struct {
Items []Item Items []Item
} }
// MergeMode controls whether an item can be compacted with compatible peers.
type MergeMode string
const (
MergeModeUnspecified MergeMode = ""
MergeModeByRecipient MergeMode = "merge_by_recipient"
MergeModeNever MergeMode = "never_merge"
)
// Item is one quote-intent pair candidate for aggregation. // Item is one quote-intent pair candidate for aggregation.
type Item struct { type Item struct {
IntentRef string IntentRef string
IntentSnapshot model.PaymentIntent IntentSnapshot model.PaymentIntent
QuoteSnapshot *model.PaymentQuoteSnapshot QuoteSnapshot *model.PaymentQuoteSnapshot
MergeMode MergeMode
PolicyTag string
} }
// Group is one aggregated recipient operation group. // Group is one aggregated recipient operation group.

View File

@@ -64,6 +64,12 @@ func (s *svc) Aggregate(in Input) (out *Output, err error) {
if !isBatchingEligible(item.QuoteSnapshot) { if !isBatchingEligible(item.QuoteSnapshot) {
key = key + keySep + "non_batching=" + itoa(i) key = key + keySep + "non_batching=" + itoa(i)
} }
if normalizeMergeMode(item.MergeMode) == MergeModeNever {
key = key + keySep + "merge_mode=never:" + itoa(i)
}
if policyTag := strings.TrimSpace(item.PolicyTag); policyTag != "" {
key = key + keySep + "policy_tag=" + policyTag
}
intentRef := firstNonEmpty( intentRef := firstNonEmpty(
strings.TrimSpace(item.IntentRef), strings.TrimSpace(item.IntentRef),
@@ -190,3 +196,12 @@ func isBatchingEligible(quote *model.PaymentQuoteSnapshot) bool {
} }
return quote.ExecutionConditions.BatchingEligible return quote.ExecutionConditions.BatchingEligible
} }
func normalizeMergeMode(mode MergeMode) MergeMode {
switch strings.ToLower(strings.TrimSpace(string(mode))) {
case strings.ToLower(string(MergeModeNever)):
return MergeModeNever
default:
return MergeModeByRecipient
}
}

View File

@@ -0,0 +1,184 @@
package psvc
import (
"strconv"
"strings"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/qsnap"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.uber.org/zap"
)
// BatchOptimizeInput is optimizer payload for one ExecuteBatchPayment call.
type BatchOptimizeInput struct {
Items []qsnap.ResolvedItem
}
// BatchOptimizeOutput is optimizer result payload.
type BatchOptimizeOutput struct {
Groups []opagg.Group
}
// BatchOptimizer encapsulates batch item compaction policy and aggregation.
type BatchOptimizer interface {
Optimize(in BatchOptimizeInput) (*BatchOptimizeOutput, error)
}
// PolicyBatchOptimizerDependencies configures policy-based optimizer implementation.
type PolicyBatchOptimizerDependencies struct {
Logger mlogger.Logger
Aggregator opagg.Aggregator
Policy BatchOptimizationPolicy
}
type policyBatchOptimizer struct {
logger mlogger.Logger
aggregator opagg.Aggregator
policy BatchOptimizationPolicy
}
// NewPolicyBatchOptimizer creates default rule-based optimizer implementation.
func NewPolicyBatchOptimizer(deps PolicyBatchOptimizerDependencies) BatchOptimizer {
logger := deps.Logger
if logger == nil {
logger = zap.NewNop()
}
aggregator := deps.Aggregator
if aggregator == nil {
aggregator = opagg.New(opagg.Dependencies{Logger: logger})
}
return &policyBatchOptimizer{
logger: logger.Named("batch_optimizer"),
aggregator: aggregator,
policy: normalizeBatchOptimizationPolicy(deps.Policy),
}
}
func (o *policyBatchOptimizer) Optimize(in BatchOptimizeInput) (*BatchOptimizeOutput, error) {
if len(in.Items) == 0 {
return nil, merrors.InvalidArgument("items are required")
}
aggItems := make([]opagg.Item, 0, len(in.Items))
for i := range in.Items {
item := in.Items[i]
selection := o.policy.Select(batchOptimizationContextFromItem(item))
intentSnapshot := applyBatchOptimizationSelection(item.IntentSnapshot, selection)
aggItems = append(aggItems, opagg.Item{
IntentRef: item.IntentRef,
IntentSnapshot: intentSnapshot,
QuoteSnapshot: item.QuoteSnapshot,
MergeMode: mergeModeFromBatchOptimization(selection.Mode),
PolicyTag: batchOptimizationSelectionTag(selection),
})
}
aggOutput, err := o.aggregator.Aggregate(opagg.Input{Items: aggItems})
if err != nil {
return nil, err
}
if aggOutput == nil || len(aggOutput.Groups) == 0 {
return nil, merrors.InvalidArgument("aggregation produced no groups")
}
return &BatchOptimizeOutput{Groups: aggOutput.Groups}, nil
}
func batchOptimizationContextFromItem(item qsnap.ResolvedItem) BatchOptimizationContext {
rail, provider, network := destinationHopSignature(item.QuoteSnapshot)
money := selectSettlementMoney(item)
ctx := BatchOptimizationContext{
Rail: rail,
Provider: provider,
Network: network,
}
if money != nil {
ctx.Amount = strings.TrimSpace(money.Amount)
ctx.Currency = strings.TrimSpace(money.Currency)
}
return ctx
}
func selectSettlementMoney(item qsnap.ResolvedItem) *paymenttypes.Money {
if item.QuoteSnapshot != nil && item.QuoteSnapshot.ExpectedSettlementAmount != nil {
return item.QuoteSnapshot.ExpectedSettlementAmount
}
return item.IntentSnapshot.Amount
}
func destinationHopSignature(snapshot *model.PaymentQuoteSnapshot) (model.Rail, string, string) {
if snapshot == nil || snapshot.Route == nil {
return discovery.RailUnspecified, "", ""
}
route := snapshot.Route
rail := model.ParseRail(route.Rail)
provider := strings.TrimSpace(route.Provider)
network := strings.TrimSpace(route.Network)
var destinationHop *paymenttypes.QuoteRouteHop
for i := range route.Hops {
hop := route.Hops[i]
if hop == nil {
continue
}
destinationHop = hop
if hop.Role == paymenttypes.QuoteRouteHopRoleDestination {
break
}
}
if destinationHop != nil {
if parsed := model.ParseRail(destinationHop.Rail); parsed != discovery.RailUnspecified {
rail = parsed
}
if provider == "" {
provider = strings.TrimSpace(destinationHop.Gateway)
}
if network == "" {
network = strings.TrimSpace(destinationHop.Network)
}
}
return rail, provider, network
}
func mergeModeFromBatchOptimization(mode BatchOptimizationMode) opagg.MergeMode {
switch normalizeBatchOptimizationMode(mode) {
case BatchOptimizationModeMergeByDestination:
return opagg.MergeModeByRecipient
default:
return opagg.MergeModeNever
}
}
func batchOptimizationSelectionTag(sel BatchOptimizationSelection) string {
mode := normalizeBatchOptimizationMode(sel.Mode)
matched := strconv.FormatBool(sel.Matched)
ruleID := strings.TrimSpace(sel.RuleID)
if ruleID == "" {
ruleID = "default"
}
return string(mode) + "|" + matched + "|" + ruleID
}
func applyBatchOptimizationSelection(intent model.PaymentIntent, sel BatchOptimizationSelection) model.PaymentIntent {
if intent.Attributes == nil {
intent.Attributes = map[string]string{}
}
mode := normalizeBatchOptimizationMode(sel.Mode)
if mode == BatchOptimizationModeUnspecified {
mode = BatchOptimizationModeNoOptimization
}
intent.Attributes[attrBatchOptimizerMode] = string(mode)
intent.Attributes[attrBatchOptimizerRuleMatch] = strconv.FormatBool(sel.Matched)
ruleID := strings.TrimSpace(sel.RuleID)
if ruleID == "" {
delete(intent.Attributes, attrBatchOptimizerRuleID)
} else {
intent.Attributes[attrBatchOptimizerRuleID] = ruleID
}
return intent
}

View File

@@ -30,6 +30,9 @@ import (
const ( const (
attrAggregatedByRecipient = "orchestrator.v2.aggregated_by_recipient" attrAggregatedByRecipient = "orchestrator.v2.aggregated_by_recipient"
attrAggregatedItems = "orchestrator.v2.aggregated_items" attrAggregatedItems = "orchestrator.v2.aggregated_items"
attrBatchOptimizerMode = "orchestrator.v2.batch_optimizer_mode"
attrBatchOptimizerRuleID = "orchestrator.v2.batch_optimizer_rule_id"
attrBatchOptimizerRuleMatch = "orchestrator.v2.batch_optimizer_rule_matched"
) )
func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (resp *orchestrationv2.ExecuteBatchPaymentResponse, err error) { func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (resp *orchestrationv2.ExecuteBatchPaymentResponse, err error) {
@@ -71,31 +74,27 @@ func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.Exec
return nil, merrors.InvalidArgument("quotation has no executable items") return nil, merrors.InvalidArgument("quotation has no executable items")
} }
aggItems := make([]opagg.Item, 0, len(resolved.Items)) optimized, err := s.batchOptimizer.Optimize(BatchOptimizeInput{Items: resolved.Items})
for _, item := range resolved.Items {
aggItems = append(aggItems, opagg.Item{
IntentRef: item.IntentRef,
IntentSnapshot: item.IntentSnapshot,
QuoteSnapshot: item.QuoteSnapshot,
})
}
aggOutput, err := s.aggregator.Aggregate(opagg.Input{Items: aggItems})
if err != nil { if err != nil {
return nil, err return nil, err
} }
group, err := s.buildBatchOperationGroup(aggOutput.Groups) groups, err := s.buildBatchOperationGroups(optimized.Groups)
if err != nil { if err != nil {
return nil, err return nil, err
} }
payment, err := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, group) payments := make([]*orchestrationv2.Payment, 0, len(groups))
if err != nil { for i := range groups {
return nil, err payment, groupErr := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, groups[i])
if groupErr != nil {
return nil, groupErr
} }
protoPayment, err := s.mapPayment(payment) protoPayment, mapErr := s.mapPayment(payment)
if err != nil { if mapErr != nil {
return nil, err return nil, mapErr
} }
return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: []*orchestrationv2.Payment{protoPayment}}, nil payments = append(payments, protoPayment)
}
return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: payments}, nil
} }
func (s *svc) prepareBatchExecute(req *orchestrationv2.ExecuteBatchPaymentRequest) (*reqval.Ctx, error) { func (s *svc) prepareBatchExecute(req *orchestrationv2.ExecuteBatchPaymentRequest) (*reqval.Ctx, error) {
@@ -270,56 +269,44 @@ func normalizeIntentRefs(values []string) []string {
return out return out
} }
func (s *svc) buildBatchOperationGroup(groups []opagg.Group) (opagg.Group, error) { func (s *svc) buildBatchOperationGroups(groups []opagg.Group) ([]opagg.Group, error) {
if len(groups) == 0 { if len(groups) == 0 {
return opagg.Group{}, merrors.InvalidArgument("aggregation produced no groups") return nil, merrors.InvalidArgument("aggregation produced no groups")
} }
anchorDestination := groups[0].IntentSnapshot.Destination out := make([]opagg.Group, 0, len(groups))
synthetic := make([]opagg.Item, 0, len(groups))
allIntentRefs := make([]string, 0, len(groups))
for i := range groups { for i := range groups {
group := groups[i] group := groups[i]
intent := group.IntentSnapshot group.IntentRefs = normalizeIntentRefs(group.IntentRefs)
intent.Destination = anchorDestination if len(group.IntentRefs) == 0 {
synthetic = append(synthetic, opagg.Item{ return nil, merrors.InvalidArgument("aggregated group has no intent refs")
IntentRef: strings.Join(normalizeIntentRefs(group.IntentRefs), ","),
IntentSnapshot: intent,
QuoteSnapshot: group.QuoteSnapshot,
})
allIntentRefs = append(allIntentRefs, group.IntentRefs...)
} }
if group.IntentSnapshot.Attributes == nil {
group.IntentSnapshot.Attributes = map[string]string{}
}
if len(group.IntentRefs) > 1 {
group.IntentSnapshot.Attributes[attrAggregatedByRecipient] = "true"
} else {
delete(group.IntentSnapshot.Attributes, attrAggregatedByRecipient)
}
group.IntentSnapshot.Attributes[attrAggregatedItems] = strconv.Itoa(len(group.IntentRefs))
collapsed, err := s.aggregator.Aggregate(opagg.Input{Items: synthetic}) targets := buildBatchPayoutTargets([]opagg.Group{group})
if err != nil { if routeContainsCardPayout(group.QuoteSnapshot) && len(targets) > 0 {
return opagg.Group{}, err
}
if collapsed == nil || len(collapsed.Groups) != 1 {
return opagg.Group{}, merrors.InvalidArgument("batch quotation contains incompatible operation groups")
}
out := collapsed.Groups[0]
out.IntentRefs = normalizeIntentRefs(allIntentRefs)
if len(out.IntentRefs) == 0 {
return opagg.Group{}, merrors.InvalidArgument("aggregated group has no intent refs")
}
if out.IntentSnapshot.Attributes == nil {
out.IntentSnapshot.Attributes = map[string]string{}
}
if len(groups) > 1 {
out.IntentSnapshot.Attributes[attrAggregatedByRecipient] = "true"
}
out.IntentSnapshot.Attributes[attrAggregatedItems] = strconv.Itoa(len(out.IntentRefs))
targets := buildBatchPayoutTargets(groups)
if routeContainsCardPayout(out.QuoteSnapshot) && len(targets) > 0 {
raw, err := batchmeta.EncodePayoutTargets(targets) raw, err := batchmeta.EncodePayoutTargets(targets)
if err != nil { if err != nil {
return opagg.Group{}, err return nil, err
} }
if strings.TrimSpace(raw) != "" { if strings.TrimSpace(raw) != "" {
out.IntentSnapshot.Attributes[batchmeta.AttrPayoutTargets] = raw group.IntentSnapshot.Attributes[batchmeta.AttrPayoutTargets] = raw
} }
} else {
delete(group.IntentSnapshot.Attributes, batchmeta.AttrPayoutTargets)
}
out = append(out, group)
}
if len(out) == 0 {
return nil, merrors.InvalidArgument("aggregation produced no groups")
} }
return out, nil return out, nil
} }

View File

@@ -10,13 +10,14 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/batchmeta" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/batchmeta"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/storage/model" "github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
pm "github.com/tech/sendico/pkg/model" pm "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types" paymenttypes "github.com/tech/sendico/pkg/payments/types"
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2" orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
"go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/bson"
) )
func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) { func TestExecuteBatchPayment_DefaultNoOptimization_DoesNotMergeSameDestination(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution step := req.StepExecution
step.State = agg.StepStateCompleted step.State = agg.StepStateCompleted
@@ -37,15 +38,17 @@ func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) {
if resp == nil { if resp == nil {
t.Fatal("expected response") t.Fatal("expected response")
} }
if got, want := len(resp.GetPayments()), 1; got != want { if got, want := len(resp.GetPayments()), 2; got != want {
t.Fatalf("expected %d payment(s) for same-destination merge, got=%d", want, got) t.Fatalf("expected %d payment(s) with default no optimization, got=%d", want, got)
}
for i, p := range resp.GetPayments() {
if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want)
} }
if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("state mismatch: got=%s want=%s", got, want)
} }
} }
func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *testing.T) { func TestExecuteBatchPayment_DefaultNoOptimization_DoesNotMergeDifferentDestinations(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution step := req.StepExecution
step.State = agg.StepStateCompleted step.State = agg.StepStateCompleted
@@ -63,8 +66,8 @@ func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *t
if err != nil { if err != nil {
t.Fatalf("ExecuteBatchPayment returned error: %v", err) t.Fatalf("ExecuteBatchPayment returned error: %v", err)
} }
if got, want := len(resp.GetPayments()), 1; got != want { if got, want := len(resp.GetPayments()), 2; got != want {
t.Fatalf("expected %d payment for batched execution, got=%d", want, got) t.Fatalf("expected %d payments for batched execution without optimization, got=%d", want, got)
} }
for i, p := range resp.GetPayments() { for i, p := range resp.GetPayments() {
if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want { if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
@@ -73,7 +76,7 @@ func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *t
} }
} }
func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment(t *testing.T) { func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsWithoutMerging(t *testing.T) {
var ( var (
targetRefs []string targetRefs []string
amounts []string amounts []string
@@ -99,11 +102,13 @@ func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment
if err != nil { if err != nil {
t.Fatalf("ExecuteBatchPayment returned error: %v", err) t.Fatalf("ExecuteBatchPayment returned error: %v", err)
} }
if got, want := len(resp.GetPayments()), 1; got != want { if got, want := len(resp.GetPayments()), 2; got != want {
t.Fatalf("expected %d payment for card batch, got=%d", want, got) t.Fatalf("expected %d payments for card batch without optimization, got=%d", want, got)
}
for i, payment := range resp.GetPayments() {
if got, want := payment.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want)
} }
if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("state mismatch: got=%s want=%s", got, want)
} }
if got, want := len(targetRefs), 2; got != want { if got, want := len(targetRefs), 2; got != want {
t.Fatalf("expected %d card payout send calls, got=%d", want, got) t.Fatalf("expected %d card payout send calls, got=%d", want, got)
@@ -122,6 +127,41 @@ func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment
} }
} }
func TestExecuteBatchPayment_CryptoPolicyMergesByDestination(t *testing.T) {
env := newTestEnvWithPolicy(t, BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-merge",
Priority: 100,
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
},
}, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
return &sexec.ExecuteOutput{StepExecution: step}, nil
})
quote := newExecutableBatchCryptoQuoteSameDestination(env.orgID, "quote-batch-crypto-merge")
env.quotes.Put(quote)
resp, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{
Meta: testMeta(env.orgID, "idem-batch-crypto-merge"),
QuotationRef: "quote-batch-crypto-merge",
ClientPaymentRef: "client-batch-crypto-merge",
})
if err != nil {
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
}
if got, want := len(resp.GetPayments()), 1; got != want {
t.Fatalf("expected %d merged payment for crypto policy, got=%d", want, got)
}
}
func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) { func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution step := req.StepExecution
@@ -150,8 +190,23 @@ func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) {
if got, want := len(resp2.GetPayments()), len(resp1.GetPayments()); got != want { if got, want := len(resp2.GetPayments()), len(resp1.GetPayments()); got != want {
t.Fatalf("expected same number of payments on retry: got=%d want=%d", got, want) t.Fatalf("expected same number of payments on retry: got=%d want=%d", got, want)
} }
if got, want := resp2.GetPayments()[0].GetPaymentRef(), resp1.GetPayments()[0].GetPaymentRef(); got != want { refs1 := make([]string, 0, len(resp1.GetPayments()))
t.Fatalf("expected same payment_ref on retry: got=%q want=%q", got, want) refs2 := make([]string, 0, len(resp2.GetPayments()))
for i := range resp1.GetPayments() {
refs1 = append(refs1, resp1.GetPayments()[i].GetPaymentRef())
}
for i := range resp2.GetPayments() {
refs2 = append(refs2, resp2.GetPayments()[i].GetPaymentRef())
}
sort.Strings(refs1)
sort.Strings(refs2)
if len(refs1) != len(refs2) {
t.Fatalf("payment refs count mismatch: got=%d want=%d", len(refs2), len(refs1))
}
for i := range refs1 {
if refs1[i] != refs2[i] {
t.Fatalf("payment_ref mismatch on retry at index %d: got=%q want=%q", i, refs2[i], refs1[i])
}
} }
} }
@@ -307,3 +362,81 @@ func newExecutableBatchCardQuoteDiffDest(orgRef bson.ObjectID, quoteRef string)
ExpiresAt: now.Add(1 * time.Hour), ExpiresAt: now.Add(1 * time.Hour),
} }
} }
func newExecutableBatchCryptoQuoteSameDestination(orgRef bson.ObjectID, quoteRef string) *model.PaymentQuoteRecord {
now := time.Now().UTC()
return &model.PaymentQuoteRecord{
Base: modelBase(now),
OrganizationBoundBase: pm.OrganizationBoundBase{
OrganizationRef: orgRef,
},
QuoteRef: quoteRef,
RequestShape: model.QuoteRequestShapeBatch,
Items: []*model.PaymentQuoteItemV2{
{
Intent: &model.PaymentIntent{
Ref: "intent-a",
Kind: model.PaymentKindPayout,
Source: testLedgerEndpoint("ledger-src"),
Destination: testExternalChainEndpoint("TQfCWxU4ERWstZZcMzEVB9MnnCSzE4agj8"),
Amount: &paymenttypes.Money{Amount: "12", Currency: "USDT"},
SettlementCurrency: "USDT",
},
Quote: &model.PaymentQuoteSnapshot{
QuoteRef: quoteRef,
DebitAmount: &paymenttypes.Money{Amount: "12", Currency: "USDT"},
ExpectedSettlementAmount: &paymenttypes.Money{Amount: "12", Currency: "USDT"},
Route: &paymenttypes.QuoteRouteSpecification{
Rail: "CRYPTO",
Provider: "gw-crypto",
Network: "TRON_NILE",
Hops: []*paymenttypes.QuoteRouteHop{
{Index: 10, Rail: "LEDGER", Role: paymenttypes.QuoteRouteHopRoleSource},
{Index: 20, Rail: "CRYPTO", Gateway: "gw-crypto", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleDestination},
},
},
},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
{
Intent: &model.PaymentIntent{
Ref: "intent-b",
Kind: model.PaymentKindPayout,
Source: testLedgerEndpoint("ledger-src"),
Destination: testExternalChainEndpoint("TQfCWxU4ERWstZZcMzEVB9MnnCSzE4agj8"),
Amount: &paymenttypes.Money{Amount: "7", Currency: "USDT"},
SettlementCurrency: "USDT",
},
Quote: &model.PaymentQuoteSnapshot{
QuoteRef: quoteRef,
DebitAmount: &paymenttypes.Money{Amount: "7", Currency: "USDT"},
ExpectedSettlementAmount: &paymenttypes.Money{Amount: "7", Currency: "USDT"},
Route: &paymenttypes.QuoteRouteSpecification{
Rail: "CRYPTO",
Provider: "gw-crypto",
Network: "TRON_NILE",
Hops: []*paymenttypes.QuoteRouteHop{
{Index: 10, Rail: "LEDGER", Role: paymenttypes.QuoteRouteHopRoleSource},
{Index: 20, Rail: "CRYPTO", Gateway: "gw-crypto", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleDestination},
},
},
},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
},
ExpiresAt: now.Add(1 * time.Hour),
}
}
func testExternalChainEndpoint(address string) model.PaymentEndpoint {
return model.PaymentEndpoint{
Type: model.EndpointTypeExternalChain,
ExternalChain: &model.ExternalChainEndpoint{
Address: address,
Asset: &paymenttypes.Asset{
Chain: "TRON",
TokenSymbol: "USDT",
},
},
}
}

View File

@@ -54,6 +54,7 @@ type Dependencies struct {
Validator reqval.Validator Validator reqval.Validator
Idempotency idem.Service Idempotency idem.Service
Quote qsnap.Resolver Quote qsnap.Resolver
BatchOptimizer BatchOptimizer
Aggregator opagg.Aggregator Aggregator opagg.Aggregator
Aggregate agg.Factory Aggregate agg.Factory
Planner xplan.Compiler Planner xplan.Compiler
@@ -68,6 +69,7 @@ type Dependencies struct {
Producer msg.Producer Producer msg.Producer
RetryPolicy ssched.RetryPolicy RetryPolicy ssched.RetryPolicy
BatchOptimizationPolicy BatchOptimizationPolicy
Now func() time.Time Now func() time.Time
MaxTicks int MaxTicks int
} }

View File

@@ -0,0 +1,361 @@
package psvc
import (
"strings"
"github.com/shopspring/decimal"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
)
// BatchOptimizationMode defines how ExecuteBatchPayment compacts batch operations.
type BatchOptimizationMode string
const (
BatchOptimizationModeUnspecified BatchOptimizationMode = ""
BatchOptimizationModeMergeByDestination BatchOptimizationMode = "merge_by_destination"
BatchOptimizationModeNoOptimization BatchOptimizationMode = "no_optimization"
)
// BatchOptimizationPolicy configures batch operation compaction behavior.
//
// Rules are evaluated by "best match":
// 1. all specified match fields must match the item context;
// 2. winner is highest specificity (number of optional selectors present);
// 3. ties are resolved by larger Priority;
// 4. remaining ties keep the first rule order from config.
//
// Rail is mandatory per rule. Other match fields are optional.
type BatchOptimizationPolicy struct {
DefaultMode BatchOptimizationMode
Rules []BatchOptimizationRule
}
// BatchOptimizationRule defines one matching rule.
type BatchOptimizationRule struct {
ID string
Enabled *bool
Priority int
Mode BatchOptimizationMode
Match BatchOptimizationMatch
}
// BatchOptimizationMatch defines selectors for one rule.
type BatchOptimizationMatch struct {
Rail model.Rail
Providers []string
Networks []string
Currencies []string
Amount *BatchOptimizationAmountRange
}
// BatchOptimizationAmountRange defines optional amount matching boundaries.
type BatchOptimizationAmountRange struct {
Min string
Max string
Currency string
}
// BatchOptimizationContext is runtime data used for matching one batch item.
type BatchOptimizationContext struct {
Rail model.Rail
Provider string
Network string
Currency string
Amount string
}
// BatchOptimizationSelection is the selected optimization decision.
type BatchOptimizationSelection struct {
Mode BatchOptimizationMode
RuleID string
Matched bool
}
func defaultBatchOptimizationPolicy() BatchOptimizationPolicy {
return BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
}
}
func normalizeBatchOptimizationPolicy(in BatchOptimizationPolicy) BatchOptimizationPolicy {
out := defaultBatchOptimizationPolicy()
if normalizedDefault := normalizeBatchOptimizationMode(in.DefaultMode); normalizedDefault != BatchOptimizationModeUnspecified {
out.DefaultMode = normalizedDefault
}
if len(in.Rules) == 0 {
return out
}
rules := make([]BatchOptimizationRule, 0, len(in.Rules))
for i := range in.Rules {
rule, ok := normalizeBatchOptimizationRule(in.Rules[i])
if !ok {
continue
}
rules = append(rules, rule)
}
if len(rules) > 0 {
out.Rules = rules
}
return out
}
func normalizeBatchOptimizationRule(in BatchOptimizationRule) (BatchOptimizationRule, bool) {
out := BatchOptimizationRule{
ID: strings.TrimSpace(in.ID),
Enabled: in.Enabled,
Priority: in.Priority,
Mode: normalizeBatchOptimizationMode(in.Mode),
}
if out.Mode == BatchOptimizationModeUnspecified {
return BatchOptimizationRule{}, false
}
match, ok := normalizeBatchOptimizationMatch(in.Match)
if !ok {
return BatchOptimizationRule{}, false
}
out.Match = match
return out, true
}
func normalizeBatchOptimizationMatch(in BatchOptimizationMatch) (BatchOptimizationMatch, bool) {
out := BatchOptimizationMatch{
Rail: model.ParseRail(string(in.Rail)),
Providers: normalizeOptimizationTokens(in.Providers),
Networks: normalizeOptimizationTokens(in.Networks),
Currencies: normalizeOptimizationCurrencies(in.Currencies),
}
if out.Rail == discovery.RailUnspecified {
return BatchOptimizationMatch{}, false
}
if in.Amount != nil {
normalizedAmount, ok := normalizeBatchOptimizationAmountRange(*in.Amount)
if !ok {
return BatchOptimizationMatch{}, false
}
out.Amount = &normalizedAmount
}
return out, true
}
func normalizeBatchOptimizationAmountRange(in BatchOptimizationAmountRange) (BatchOptimizationAmountRange, bool) {
out := BatchOptimizationAmountRange{
Min: strings.TrimSpace(in.Min),
Max: strings.TrimSpace(in.Max),
Currency: normalizeOptimizationCurrency(in.Currency),
}
if out.Min == "" && out.Max == "" {
return BatchOptimizationAmountRange{}, false
}
var (
min decimal.Decimal
max decimal.Decimal
err error
)
if out.Min != "" {
min, err = decimal.NewFromString(out.Min)
if err != nil {
return BatchOptimizationAmountRange{}, false
}
}
if out.Max != "" {
max, err = decimal.NewFromString(out.Max)
if err != nil {
return BatchOptimizationAmountRange{}, false
}
}
if out.Min != "" && out.Max != "" && min.GreaterThan(max) {
return BatchOptimizationAmountRange{}, false
}
return out, true
}
func (p BatchOptimizationPolicy) Select(in BatchOptimizationContext) BatchOptimizationSelection {
policy := normalizeBatchOptimizationPolicy(p)
ctx := normalizeBatchOptimizationContext(in)
selected := BatchOptimizationSelection{
Mode: policy.DefaultMode,
}
bestSpecificity := -1
bestPriority := 0
bestOrder := len(policy.Rules) + 1
for i := range policy.Rules {
rule := policy.Rules[i]
if rule.Enabled != nil && !*rule.Enabled {
continue
}
specificity, ok := rule.matches(ctx)
if !ok {
continue
}
if specificity > bestSpecificity ||
(specificity == bestSpecificity && rule.Priority > bestPriority) ||
(specificity == bestSpecificity && rule.Priority == bestPriority && i < bestOrder) {
selected = BatchOptimizationSelection{
Mode: rule.Mode,
RuleID: rule.ID,
Matched: true,
}
bestSpecificity = specificity
bestPriority = rule.Priority
bestOrder = i
}
}
return selected
}
func (r BatchOptimizationRule) matches(ctx BatchOptimizationContext) (int, bool) {
match := r.Match
if match.Rail == discovery.RailUnspecified || ctx.Rail == discovery.RailUnspecified {
return 0, false
}
if match.Rail != ctx.Rail {
return 0, false
}
specificity := 0
if len(match.Providers) > 0 {
specificity++
if !containsOptimizationToken(match.Providers, ctx.Provider) {
return 0, false
}
}
if len(match.Networks) > 0 {
specificity++
if !containsOptimizationToken(match.Networks, ctx.Network) {
return 0, false
}
}
if len(match.Currencies) > 0 {
specificity++
if !containsOptimizationToken(match.Currencies, ctx.Currency) {
return 0, false
}
}
if match.Amount != nil {
specificity++
if !amountRangeMatches(*match.Amount, ctx.Amount, ctx.Currency) {
return 0, false
}
}
return specificity, true
}
func amountRangeMatches(rule BatchOptimizationAmountRange, amountRaw string, currencyRaw string) bool {
value, err := decimal.NewFromString(strings.TrimSpace(amountRaw))
if err != nil {
return false
}
if ruleCurrency := normalizeOptimizationCurrency(rule.Currency); ruleCurrency != "" && ruleCurrency != normalizeOptimizationCurrency(currencyRaw) {
return false
}
if rule.Min != "" {
min, parseErr := decimal.NewFromString(rule.Min)
if parseErr != nil || value.LessThan(min) {
return false
}
}
if rule.Max != "" {
max, parseErr := decimal.NewFromString(rule.Max)
if parseErr != nil || value.GreaterThan(max) {
return false
}
}
return true
}
func normalizeBatchOptimizationContext(in BatchOptimizationContext) BatchOptimizationContext {
return BatchOptimizationContext{
Rail: model.ParseRail(string(in.Rail)),
Provider: normalizeOptimizationToken(in.Provider),
Network: normalizeOptimizationToken(in.Network),
Currency: normalizeOptimizationCurrency(in.Currency),
Amount: strings.TrimSpace(in.Amount),
}
}
func normalizeBatchOptimizationMode(raw BatchOptimizationMode) BatchOptimizationMode {
switch strings.ToLower(strings.TrimSpace(string(raw))) {
case "merge_by_destination", "merge-by-destination", "by_destination", "by-destination":
return BatchOptimizationModeMergeByDestination
case "no_optimization", "no-optimization", "none", "never_merge", "never-merge", "never":
return BatchOptimizationModeNoOptimization
default:
return BatchOptimizationModeUnspecified
}
}
func normalizeOptimizationTokens(values []string) []string {
if len(values) == 0 {
return nil
}
seen := make(map[string]struct{}, len(values))
out := make([]string, 0, len(values))
for i := range values {
token := normalizeOptimizationToken(values[i])
if token == "" {
continue
}
if _, exists := seen[token]; exists {
continue
}
seen[token] = struct{}{}
out = append(out, token)
}
if len(out) == 0 {
return nil
}
return out
}
func normalizeOptimizationCurrencies(values []string) []string {
if len(values) == 0 {
return nil
}
seen := make(map[string]struct{}, len(values))
out := make([]string, 0, len(values))
for i := range values {
token := normalizeOptimizationCurrency(values[i])
if token == "" {
continue
}
if _, exists := seen[token]; exists {
continue
}
seen[token] = struct{}{}
out = append(out, token)
}
if len(out) == 0 {
return nil
}
return out
}
func normalizeOptimizationToken(value string) string {
return strings.ToUpper(strings.TrimSpace(value))
}
func normalizeOptimizationCurrency(value string) string {
return strings.ToUpper(strings.TrimSpace(value))
}
func containsOptimizationToken(values []string, value string) bool {
if len(values) == 0 {
return false
}
normalized := normalizeOptimizationToken(value)
if normalized == "" {
return false
}
for i := range values {
if values[i] == normalized {
return true
}
}
return false
}

View File

@@ -0,0 +1,167 @@
package psvc
import (
"testing"
"github.com/tech/sendico/pkg/discovery"
)
func TestNormalizeBatchOptimizationPolicy_DefaultMode_NoOptimization(t *testing.T) {
got := normalizeBatchOptimizationPolicy(BatchOptimizationPolicy{})
if got.DefaultMode != BatchOptimizationModeNoOptimization {
t.Fatalf("default_mode mismatch: got=%q want=%q", got.DefaultMode, BatchOptimizationModeNoOptimization)
}
if got.Rules != nil {
t.Fatalf("expected no rules, got=%d", len(got.Rules))
}
}
func TestNormalizeBatchOptimizationPolicy_DropsInvalidRules(t *testing.T) {
got := normalizeBatchOptimizationPolicy(BatchOptimizationPolicy{
DefaultMode: "merge_by_destination",
Rules: []BatchOptimizationRule{
{
ID: "missing-rail",
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: "unknown",
},
},
{
ID: "invalid-mode",
Mode: "x",
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
{
ID: "valid",
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
},
})
if got.DefaultMode != BatchOptimizationModeMergeByDestination {
t.Fatalf("default_mode mismatch: got=%q want=%q", got.DefaultMode, BatchOptimizationModeMergeByDestination)
}
if len(got.Rules) != 1 {
t.Fatalf("rules count mismatch: got=%d want=%d", len(got.Rules), 1)
}
if got.Rules[0].ID != "valid" {
t.Fatalf("rule id mismatch: got=%q want=%q", got.Rules[0].ID, "valid")
}
}
func TestBatchOptimizationPolicy_Select_UsesBestSpecificityThenPriority(t *testing.T) {
policy := BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-generic",
Priority: 10,
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
{
ID: "crypto-network",
Priority: 5,
Mode: BatchOptimizationModeNoOptimization,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
Networks: []string{"tron_nile"},
},
},
{
ID: "crypto-network-priority",
Priority: 20,
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
Networks: []string{"tron_nile"},
},
},
},
}
out := policy.Select(BatchOptimizationContext{
Rail: discovery.RailCrypto,
Network: "TRON_NILE",
})
if !out.Matched {
t.Fatal("expected matched rule")
}
if out.RuleID != "crypto-network-priority" {
t.Fatalf("rule id mismatch: got=%q want=%q", out.RuleID, "crypto-network-priority")
}
if out.Mode != BatchOptimizationModeMergeByDestination {
t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeMergeByDestination)
}
}
func TestBatchOptimizationPolicy_Select_FallsBackToDefault(t *testing.T) {
policy := BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-only",
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
},
}
out := policy.Select(BatchOptimizationContext{Rail: discovery.RailCardPayout})
if out.Matched {
t.Fatalf("unexpected matched rule: %q", out.RuleID)
}
if out.Mode != BatchOptimizationModeNoOptimization {
t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeNoOptimization)
}
}
func TestBatchOptimizationPolicy_Select_MatchesAmountRange(t *testing.T) {
policy := BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-large-usdt",
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
Amount: &BatchOptimizationAmountRange{
Min: "100",
Currency: "USDT",
},
},
},
},
}
matched := policy.Select(BatchOptimizationContext{
Rail: discovery.RailCrypto,
Amount: "120",
Currency: "USDT",
})
if !matched.Matched {
t.Fatal("expected amount rule match")
}
notMatched := policy.Select(BatchOptimizationContext{
Rail: discovery.RailCrypto,
Amount: "50",
Currency: "USDT",
})
if notMatched.Matched {
t.Fatalf("expected no amount rule match, got rule=%q", notMatched.RuleID)
}
if notMatched.Mode != BatchOptimizationModeNoOptimization {
t.Fatalf("default mode mismatch: got=%q want=%q", notMatched.Mode, BatchOptimizationModeNoOptimization)
}
}

View File

@@ -35,7 +35,7 @@ type svc struct {
validator reqval.Validator validator reqval.Validator
idempotency idem.Service idempotency idem.Service
quote qsnap.Resolver quote qsnap.Resolver
aggregator opagg.Aggregator batchOptimizer BatchOptimizer
aggregate agg.Factory aggregate agg.Factory
planner xplan.Compiler planner xplan.Compiler
state ostate.StateMachine state ostate.StateMachine
@@ -96,7 +96,7 @@ func newService(deps Dependencies) (Service, error) {
validator: firstValidator(deps.Validator, logger), validator: firstValidator(deps.Validator, logger),
idempotency: firstIdempotency(deps.Idempotency, logger), idempotency: firstIdempotency(deps.Idempotency, logger),
quote: firstQuoteResolver(deps.Quote, logger), quote: firstQuoteResolver(deps.Quote, logger),
aggregator: firstAggregator(deps.Aggregator, logger), batchOptimizer: firstBatchOptimizer(deps.BatchOptimizer, deps.BatchOptimizationPolicy, deps.Aggregator, logger),
aggregate: firstAggregateFactory(deps.Aggregate, logger), aggregate: firstAggregateFactory(deps.Aggregate, logger),
planner: firstPlanCompiler(deps.Planner, logger), planner: firstPlanCompiler(deps.Planner, logger),
state: firstStateMachine(deps.State, logger), state: firstStateMachine(deps.State, logger),
@@ -153,6 +153,17 @@ func firstAggregateFactory(v agg.Factory, logger mlogger.Logger) agg.Factory {
return agg.New(agg.Dependencies{Logger: logger}) return agg.New(agg.Dependencies{Logger: logger})
} }
func firstBatchOptimizer(v BatchOptimizer, policy BatchOptimizationPolicy, aggregator opagg.Aggregator, logger mlogger.Logger) BatchOptimizer {
if v != nil {
return v
}
return NewPolicyBatchOptimizer(PolicyBatchOptimizerDependencies{
Logger: logger,
Aggregator: firstAggregator(aggregator, logger),
Policy: policy,
})
}
func firstAggregator(v opagg.Aggregator, logger mlogger.Logger) opagg.Aggregator { func firstAggregator(v opagg.Aggregator, logger mlogger.Logger) opagg.Aggregator {
if v != nil { if v != nil {
return v return v

View File

@@ -366,6 +366,14 @@ type testEnv struct {
} }
func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error)) *testEnv { func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error)) *testEnv {
return newTestEnvWithPolicy(t, BatchOptimizationPolicy{}, handler)
}
func newTestEnvWithPolicy(
t *testing.T,
policy BatchOptimizationPolicy,
handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error),
) *testEnv {
t.Helper() t.Helper()
repo := newMemoryRepo(func() time.Time { repo := newMemoryRepo(func() time.Time {
return time.Now().UTC() return time.Now().UTC()
@@ -394,6 +402,7 @@ func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (
Observer: observer, Observer: observer,
Producer: producer, Producer: producer,
RetryPolicy: ssched.RetryPolicy{MaxAttempts: 2}, RetryPolicy: ssched.RetryPolicy{MaxAttempts: 2},
BatchOptimizationPolicy: policy,
MaxTicks: 20, MaxTicks: 20,
}) })
if err != nil { if err != nil {

View File

@@ -7,6 +7,7 @@ import (
chainclient "github.com/tech/sendico/gateway/chain/client" chainclient "github.com/tech/sendico/gateway/chain/client"
ledgerclient "github.com/tech/sendico/ledger/client" ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
"github.com/tech/sendico/payments/storage/model" "github.com/tech/sendico/payments/storage/model"
clockpkg "github.com/tech/sendico/pkg/clock" clockpkg "github.com/tech/sendico/pkg/clock"
"github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/discovery"
@@ -108,6 +109,16 @@ func WithGatewayRegistry(registry GatewayRegistry) Option {
} }
} }
// WithBatchOptimizationPolicy configures policy-based batch optimizer behavior.
func WithBatchOptimizationPolicy(policy psvc.BatchOptimizationPolicy) Option {
return func(s *Service) {
if s == nil {
return
}
s.batchOptimizationPolicy = policy
}
}
type discoveryGatewayRegistry struct { type discoveryGatewayRegistry struct {
registry *discovery.Registry registry *discovery.Registry
} }

View File

@@ -28,6 +28,7 @@ type Service struct {
gatewayInvokeResolver GatewayInvokeResolver gatewayInvokeResolver GatewayInvokeResolver
gatewayRegistry GatewayRegistry gatewayRegistry GatewayRegistry
cardGatewayRoutes map[string]CardGatewayRoute cardGatewayRoutes map[string]CardGatewayRoute
batchOptimizationPolicy psvc.BatchOptimizationPolicy
paymentGatewayBroker mb.Broker paymentGatewayBroker mb.Broker
gatewayConsumers []msg.Consumer gatewayConsumers []msg.Consumer
stopExternalWorkers context.CancelFunc stopExternalWorkers context.CancelFunc
@@ -57,6 +58,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
GatewayInvokeResolver: svc.gatewayInvokeResolver, GatewayInvokeResolver: svc.gatewayInvokeResolver,
GatewayRegistry: svc.gatewayRegistry, GatewayRegistry: svc.gatewayRegistry,
CardGatewayRoutes: svc.cardGatewayRoutes, CardGatewayRoutes: svc.cardGatewayRoutes,
BatchOptimizationPolicy: svc.batchOptimizationPolicy,
Producer: svc.producer, Producer: svc.producer,
}) })
svc.startExternalRuntime() svc.startExternalRuntime()

View File

@@ -29,6 +29,7 @@ type v2RuntimeDeps struct {
GatewayInvokeResolver GatewayInvokeResolver GatewayInvokeResolver GatewayInvokeResolver
GatewayRegistry GatewayRegistry GatewayRegistry GatewayRegistry
CardGatewayRoutes map[string]CardGatewayRoute CardGatewayRoutes map[string]CardGatewayRoute
BatchOptimizationPolicy psvc.BatchOptimizationPolicy
Producer msg.Producer Producer msg.Producer
} }
@@ -76,6 +77,7 @@ func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository, r
Query: query, Query: query,
Observer: observer, Observer: observer,
Executors: executors, Executors: executors,
BatchOptimizationPolicy: runtimeDeps.BatchOptimizationPolicy,
Producer: runtimeDeps.Producer, Producer: runtimeDeps.Producer,
}) })
if err != nil { if err != nil {