From 9f998b8134780f4696123b769b7f4ec7daa7d6d8 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Wed, 11 Mar 2026 15:58:18 +0100 Subject: [PATCH] added payment orchestrator optimizer tweaks --- .gitignore | 2 + api/payments/orchestrator/config.dev.yml | 13 + api/payments/orchestrator/config.yml | 13 + .../internal/server/internal/builders.go | 51 +++ .../internal/server/internal/config.go | 32 ++ .../internal/server/internal/dependencies.go | 1 + .../service/orchestrationv2/opagg/module.go | 11 + .../service/orchestrationv2/opagg/service.go | 15 + .../orchestrationv2/psvc/batch_optimizer.go | 184 +++++++++ .../orchestrationv2/psvc/execute_batch.go | 113 +++--- .../psvc/execute_batch_test.go | 163 +++++++- .../service/orchestrationv2/psvc/module.go | 38 +- .../psvc/optimization_policy.go | 361 ++++++++++++++++++ .../psvc/optimization_policy_test.go | 167 ++++++++ .../service/orchestrationv2/psvc/service.go | 71 ++-- .../orchestrationv2/psvc/service_e2e_test.go | 23 +- .../internal/service/orchestrator/options.go | 11 + .../internal/service/orchestrator/service.go | 26 +- .../service/orchestrator/service_v2.go | 26 +- 19 files changed, 1164 insertions(+), 157 deletions(-) create mode 100644 api/payments/orchestrator/internal/service/orchestrationv2/psvc/batch_optimizer.go create mode 100644 api/payments/orchestrator/internal/service/orchestrationv2/psvc/optimization_policy.go create mode 100644 api/payments/orchestrator/internal/service/orchestrationv2/psvc/optimization_policy_test.go diff --git a/.gitignore b/.gitignore index 7123c9b5..d76e8ac9 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,9 @@ test.sh .gocache/ .golangci-cache/ .cache/ + .claude/ +.codex/ # Air hot reload build artifacts **/tmp/ diff --git a/api/payments/orchestrator/config.dev.yml b/api/payments/orchestrator/config.dev.yml index 2239b23b..6f270a77 100644 --- a/api/payments/orchestrator/config.dev.yml +++ b/api/payments/orchestrator/config.dev.yml @@ -44,4 +44,17 @@ card_gateways: funding_address: "TUaWaCkiXwYPKm5qjcB27Lhwv976vPvedE" fee_wallet_ref: "697a062a248dc785125ccb9e" +# Batch optimizer settings: +# - default_mode disables aggregation for unmatched traffic +# - crypto rule enables destination aggregation for crypto rail +optimizer: + aggregation: + default_mode: "no_optimization" + rules: + - id: "crypto_by_destination" + priority: 100 + mode: "merge_by_destination" + match: + rail: "CRYPTO" + # Gateway instances and capabilities are sourced from service discovery. diff --git a/api/payments/orchestrator/config.yml b/api/payments/orchestrator/config.yml index e7682871..820a2115 100644 --- a/api/payments/orchestrator/config.yml +++ b/api/payments/orchestrator/config.yml @@ -44,4 +44,17 @@ card_gateways: funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF" fee_wallet_ref: "694c124ed76f9f811ac57133" +# Batch optimizer settings: +# - default_mode disables aggregation for unmatched traffic +# - crypto rule enables destination aggregation for crypto rail +optimizer: + aggregation: + default_mode: "no_optimization" + rules: + - id: "crypto_by_destination" + priority: 100 + mode: "merge_by_destination" + match: + rail: "CRYPTO" + # Gateway instances and capabilities are sourced from service discovery. diff --git a/api/payments/orchestrator/internal/server/internal/builders.go b/api/payments/orchestrator/internal/server/internal/builders.go index 9cf701e6..efa2aeae 100644 --- a/api/payments/orchestrator/internal/server/internal/builders.go +++ b/api/payments/orchestrator/internal/server/internal/builders.go @@ -3,7 +3,9 @@ package serverimp import ( "strings" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator" + "github.com/tech/sendico/payments/storage/model" "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/mlogger" ) @@ -42,3 +44,52 @@ func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, re } return orchestrator.NewDiscoveryGatewayRegistry(logger, registry) } + +func buildBatchOptimizationPolicy(cfg optimizerConfig) psvc.BatchOptimizationPolicy { + rules := make([]psvc.BatchOptimizationRule, 0, len(cfg.Aggregation.Rules)) + for i := range cfg.Aggregation.Rules { + ruleCfg := cfg.Aggregation.Rules[i] + rule := psvc.BatchOptimizationRule{ + ID: strings.TrimSpace(ruleCfg.ID), + Enabled: ruleCfg.Enabled, + Priority: ruleCfg.Priority, + Mode: psvc.BatchOptimizationMode(strings.TrimSpace(ruleCfg.Mode)), + Match: psvc.BatchOptimizationMatch{ + Rail: model.ParseRail(ruleCfg.Match.Rail), + Providers: cloneTrimmedSlice(ruleCfg.Match.Providers), + Networks: cloneTrimmedSlice(ruleCfg.Match.Networks), + Currencies: cloneTrimmedSlice(ruleCfg.Match.Currencies), + }, + } + if ruleCfg.Match.Amount != nil { + rule.Match.Amount = &psvc.BatchOptimizationAmountRange{ + Min: strings.TrimSpace(ruleCfg.Match.Amount.Min), + Max: strings.TrimSpace(ruleCfg.Match.Amount.Max), + Currency: strings.TrimSpace(ruleCfg.Match.Amount.Currency), + } + } + rules = append(rules, rule) + } + return psvc.BatchOptimizationPolicy{ + DefaultMode: psvc.BatchOptimizationMode(strings.TrimSpace(cfg.Aggregation.DefaultMode)), + Rules: rules, + } +} + +func cloneTrimmedSlice(values []string) []string { + if len(values) == 0 { + return nil + } + out := make([]string, 0, len(values)) + for i := range values { + token := strings.TrimSpace(values[i]) + if token == "" { + continue + } + out = append(out, token) + } + if len(out) == 0 { + return nil + } + return out +} diff --git a/api/payments/orchestrator/internal/server/internal/config.go b/api/payments/orchestrator/internal/server/internal/config.go index 504797dc..c1cd08aa 100644 --- a/api/payments/orchestrator/internal/server/internal/config.go +++ b/api/payments/orchestrator/internal/server/internal/config.go @@ -15,9 +15,41 @@ type config struct { *grpcapp.Config `yaml:",inline"` CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"` GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"` + Optimizer optimizerConfig `yaml:"optimizer"` QuoteRetentionHrs int `yaml:"quote_retention_hours"` } +type optimizerConfig struct { + Aggregation aggregationConfig `yaml:"aggregation"` +} + +type aggregationConfig struct { + DefaultMode string `yaml:"default_mode"` + Rules []aggregationRuleConfig `yaml:"rules"` +} + +type aggregationRuleConfig struct { + ID string `yaml:"id"` + Enabled *bool `yaml:"enabled"` + Priority int `yaml:"priority"` + Mode string `yaml:"mode"` + Match aggregationMatchConfig `yaml:"match"` +} + +type aggregationMatchConfig struct { + Rail string `yaml:"rail"` + Providers []string `yaml:"providers"` + Networks []string `yaml:"networks"` + Currencies []string `yaml:"currencies"` + Amount *aggregationAmountConfig `yaml:"amount"` +} + +type aggregationAmountConfig struct { + Min string `yaml:"min"` + Max string `yaml:"max"` + Currency string `yaml:"currency"` +} + type cardGatewayRouteConfig struct { FundingAddress string `yaml:"funding_address"` FeeAddress string `yaml:"fee_address"` diff --git a/api/payments/orchestrator/internal/server/internal/dependencies.go b/api/payments/orchestrator/internal/server/internal/dependencies.go index 07d4ba6c..211157d1 100644 --- a/api/payments/orchestrator/internal/server/internal/dependencies.go +++ b/api/payments/orchestrator/internal/server/internal/dependencies.go @@ -40,6 +40,7 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 { opts = append(opts, orchestrator.WithCardGatewayRoutes(routes)) } + opts = append(opts, orchestrator.WithBatchOptimizationPolicy(buildBatchOptimizationPolicy(cfg.Optimizer))) if registry := buildGatewayRegistry(i.logger, cfg.GatewayInstances, i.discoveryReg); registry != nil { opts = append(opts, orchestrator.WithGatewayRegistry(registry)) } diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/opagg/module.go b/api/payments/orchestrator/internal/service/orchestrationv2/opagg/module.go index 1620f5a6..74ba68ba 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/opagg/module.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/opagg/module.go @@ -16,11 +16,22 @@ type Input struct { Items []Item } +// MergeMode controls whether an item can be compacted with compatible peers. +type MergeMode string + +const ( + MergeModeUnspecified MergeMode = "" + MergeModeByRecipient MergeMode = "merge_by_recipient" + MergeModeNever MergeMode = "never_merge" +) + // Item is one quote-intent pair candidate for aggregation. type Item struct { IntentRef string IntentSnapshot model.PaymentIntent QuoteSnapshot *model.PaymentQuoteSnapshot + MergeMode MergeMode + PolicyTag string } // Group is one aggregated recipient operation group. diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/opagg/service.go b/api/payments/orchestrator/internal/service/orchestrationv2/opagg/service.go index 22264e68..3a7fde8e 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/opagg/service.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/opagg/service.go @@ -64,6 +64,12 @@ func (s *svc) Aggregate(in Input) (out *Output, err error) { if !isBatchingEligible(item.QuoteSnapshot) { key = key + keySep + "non_batching=" + itoa(i) } + if normalizeMergeMode(item.MergeMode) == MergeModeNever { + key = key + keySep + "merge_mode=never:" + itoa(i) + } + if policyTag := strings.TrimSpace(item.PolicyTag); policyTag != "" { + key = key + keySep + "policy_tag=" + policyTag + } intentRef := firstNonEmpty( strings.TrimSpace(item.IntentRef), @@ -190,3 +196,12 @@ func isBatchingEligible(quote *model.PaymentQuoteSnapshot) bool { } return quote.ExecutionConditions.BatchingEligible } + +func normalizeMergeMode(mode MergeMode) MergeMode { + switch strings.ToLower(strings.TrimSpace(string(mode))) { + case strings.ToLower(string(MergeModeNever)): + return MergeModeNever + default: + return MergeModeByRecipient + } +} diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/batch_optimizer.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/batch_optimizer.go new file mode 100644 index 00000000..60c0e769 --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/batch_optimizer.go @@ -0,0 +1,184 @@ +package psvc + +import ( + "strconv" + "strings" + + "github.com/tech/sendico/pkg/discovery" + + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/qsnap" + "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" + paymenttypes "github.com/tech/sendico/pkg/payments/types" + "go.uber.org/zap" +) + +// BatchOptimizeInput is optimizer payload for one ExecuteBatchPayment call. +type BatchOptimizeInput struct { + Items []qsnap.ResolvedItem +} + +// BatchOptimizeOutput is optimizer result payload. +type BatchOptimizeOutput struct { + Groups []opagg.Group +} + +// BatchOptimizer encapsulates batch item compaction policy and aggregation. +type BatchOptimizer interface { + Optimize(in BatchOptimizeInput) (*BatchOptimizeOutput, error) +} + +// PolicyBatchOptimizerDependencies configures policy-based optimizer implementation. +type PolicyBatchOptimizerDependencies struct { + Logger mlogger.Logger + Aggregator opagg.Aggregator + Policy BatchOptimizationPolicy +} + +type policyBatchOptimizer struct { + logger mlogger.Logger + aggregator opagg.Aggregator + policy BatchOptimizationPolicy +} + +// NewPolicyBatchOptimizer creates default rule-based optimizer implementation. +func NewPolicyBatchOptimizer(deps PolicyBatchOptimizerDependencies) BatchOptimizer { + logger := deps.Logger + if logger == nil { + logger = zap.NewNop() + } + aggregator := deps.Aggregator + if aggregator == nil { + aggregator = opagg.New(opagg.Dependencies{Logger: logger}) + } + return &policyBatchOptimizer{ + logger: logger.Named("batch_optimizer"), + aggregator: aggregator, + policy: normalizeBatchOptimizationPolicy(deps.Policy), + } +} + +func (o *policyBatchOptimizer) Optimize(in BatchOptimizeInput) (*BatchOptimizeOutput, error) { + if len(in.Items) == 0 { + return nil, merrors.InvalidArgument("items are required") + } + + aggItems := make([]opagg.Item, 0, len(in.Items)) + for i := range in.Items { + item := in.Items[i] + selection := o.policy.Select(batchOptimizationContextFromItem(item)) + intentSnapshot := applyBatchOptimizationSelection(item.IntentSnapshot, selection) + aggItems = append(aggItems, opagg.Item{ + IntentRef: item.IntentRef, + IntentSnapshot: intentSnapshot, + QuoteSnapshot: item.QuoteSnapshot, + MergeMode: mergeModeFromBatchOptimization(selection.Mode), + PolicyTag: batchOptimizationSelectionTag(selection), + }) + } + + aggOutput, err := o.aggregator.Aggregate(opagg.Input{Items: aggItems}) + if err != nil { + return nil, err + } + if aggOutput == nil || len(aggOutput.Groups) == 0 { + return nil, merrors.InvalidArgument("aggregation produced no groups") + } + return &BatchOptimizeOutput{Groups: aggOutput.Groups}, nil +} + +func batchOptimizationContextFromItem(item qsnap.ResolvedItem) BatchOptimizationContext { + rail, provider, network := destinationHopSignature(item.QuoteSnapshot) + money := selectSettlementMoney(item) + ctx := BatchOptimizationContext{ + Rail: rail, + Provider: provider, + Network: network, + } + if money != nil { + ctx.Amount = strings.TrimSpace(money.Amount) + ctx.Currency = strings.TrimSpace(money.Currency) + } + return ctx +} + +func selectSettlementMoney(item qsnap.ResolvedItem) *paymenttypes.Money { + if item.QuoteSnapshot != nil && item.QuoteSnapshot.ExpectedSettlementAmount != nil { + return item.QuoteSnapshot.ExpectedSettlementAmount + } + return item.IntentSnapshot.Amount +} + +func destinationHopSignature(snapshot *model.PaymentQuoteSnapshot) (model.Rail, string, string) { + if snapshot == nil || snapshot.Route == nil { + return discovery.RailUnspecified, "", "" + } + route := snapshot.Route + rail := model.ParseRail(route.Rail) + provider := strings.TrimSpace(route.Provider) + network := strings.TrimSpace(route.Network) + + var destinationHop *paymenttypes.QuoteRouteHop + for i := range route.Hops { + hop := route.Hops[i] + if hop == nil { + continue + } + destinationHop = hop + if hop.Role == paymenttypes.QuoteRouteHopRoleDestination { + break + } + } + if destinationHop != nil { + if parsed := model.ParseRail(destinationHop.Rail); parsed != discovery.RailUnspecified { + rail = parsed + } + if provider == "" { + provider = strings.TrimSpace(destinationHop.Gateway) + } + if network == "" { + network = strings.TrimSpace(destinationHop.Network) + } + } + return rail, provider, network +} + +func mergeModeFromBatchOptimization(mode BatchOptimizationMode) opagg.MergeMode { + switch normalizeBatchOptimizationMode(mode) { + case BatchOptimizationModeMergeByDestination: + return opagg.MergeModeByRecipient + default: + return opagg.MergeModeNever + } +} + +func batchOptimizationSelectionTag(sel BatchOptimizationSelection) string { + mode := normalizeBatchOptimizationMode(sel.Mode) + matched := strconv.FormatBool(sel.Matched) + ruleID := strings.TrimSpace(sel.RuleID) + if ruleID == "" { + ruleID = "default" + } + return string(mode) + "|" + matched + "|" + ruleID +} + +func applyBatchOptimizationSelection(intent model.PaymentIntent, sel BatchOptimizationSelection) model.PaymentIntent { + if intent.Attributes == nil { + intent.Attributes = map[string]string{} + } + mode := normalizeBatchOptimizationMode(sel.Mode) + if mode == BatchOptimizationModeUnspecified { + mode = BatchOptimizationModeNoOptimization + } + intent.Attributes[attrBatchOptimizerMode] = string(mode) + intent.Attributes[attrBatchOptimizerRuleMatch] = strconv.FormatBool(sel.Matched) + ruleID := strings.TrimSpace(sel.RuleID) + if ruleID == "" { + delete(intent.Attributes, attrBatchOptimizerRuleID) + } else { + intent.Attributes[attrBatchOptimizerRuleID] = ruleID + } + return intent +} diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch.go index fb830661..312d391d 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch.go @@ -28,8 +28,11 @@ import ( ) const ( - attrAggregatedByRecipient = "orchestrator.v2.aggregated_by_recipient" - attrAggregatedItems = "orchestrator.v2.aggregated_items" + attrAggregatedByRecipient = "orchestrator.v2.aggregated_by_recipient" + attrAggregatedItems = "orchestrator.v2.aggregated_items" + attrBatchOptimizerMode = "orchestrator.v2.batch_optimizer_mode" + attrBatchOptimizerRuleID = "orchestrator.v2.batch_optimizer_rule_id" + attrBatchOptimizerRuleMatch = "orchestrator.v2.batch_optimizer_rule_matched" ) func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (resp *orchestrationv2.ExecuteBatchPaymentResponse, err error) { @@ -71,31 +74,27 @@ func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.Exec return nil, merrors.InvalidArgument("quotation has no executable items") } - aggItems := make([]opagg.Item, 0, len(resolved.Items)) - for _, item := range resolved.Items { - aggItems = append(aggItems, opagg.Item{ - IntentRef: item.IntentRef, - IntentSnapshot: item.IntentSnapshot, - QuoteSnapshot: item.QuoteSnapshot, - }) - } - aggOutput, err := s.aggregator.Aggregate(opagg.Input{Items: aggItems}) + optimized, err := s.batchOptimizer.Optimize(BatchOptimizeInput{Items: resolved.Items}) if err != nil { return nil, err } - group, err := s.buildBatchOperationGroup(aggOutput.Groups) + groups, err := s.buildBatchOperationGroups(optimized.Groups) if err != nil { return nil, err } - payment, err := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, group) - if err != nil { - return nil, err + payments := make([]*orchestrationv2.Payment, 0, len(groups)) + for i := range groups { + payment, groupErr := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, groups[i]) + if groupErr != nil { + return nil, groupErr + } + protoPayment, mapErr := s.mapPayment(payment) + if mapErr != nil { + return nil, mapErr + } + payments = append(payments, protoPayment) } - protoPayment, err := s.mapPayment(payment) - if err != nil { - return nil, err - } - return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: []*orchestrationv2.Payment{protoPayment}}, nil + return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: payments}, nil } func (s *svc) prepareBatchExecute(req *orchestrationv2.ExecuteBatchPaymentRequest) (*reqval.Ctx, error) { @@ -270,56 +269,44 @@ func normalizeIntentRefs(values []string) []string { return out } -func (s *svc) buildBatchOperationGroup(groups []opagg.Group) (opagg.Group, error) { +func (s *svc) buildBatchOperationGroups(groups []opagg.Group) ([]opagg.Group, error) { if len(groups) == 0 { - return opagg.Group{}, merrors.InvalidArgument("aggregation produced no groups") + return nil, merrors.InvalidArgument("aggregation produced no groups") } - anchorDestination := groups[0].IntentSnapshot.Destination - synthetic := make([]opagg.Item, 0, len(groups)) - allIntentRefs := make([]string, 0, len(groups)) + out := make([]opagg.Group, 0, len(groups)) for i := range groups { group := groups[i] - intent := group.IntentSnapshot - intent.Destination = anchorDestination - synthetic = append(synthetic, opagg.Item{ - IntentRef: strings.Join(normalizeIntentRefs(group.IntentRefs), ","), - IntentSnapshot: intent, - QuoteSnapshot: group.QuoteSnapshot, - }) - allIntentRefs = append(allIntentRefs, group.IntentRefs...) - } - - collapsed, err := s.aggregator.Aggregate(opagg.Input{Items: synthetic}) - if err != nil { - return opagg.Group{}, err - } - if collapsed == nil || len(collapsed.Groups) != 1 { - return opagg.Group{}, merrors.InvalidArgument("batch quotation contains incompatible operation groups") - } - - out := collapsed.Groups[0] - out.IntentRefs = normalizeIntentRefs(allIntentRefs) - if len(out.IntentRefs) == 0 { - return opagg.Group{}, merrors.InvalidArgument("aggregated group has no intent refs") - } - if out.IntentSnapshot.Attributes == nil { - out.IntentSnapshot.Attributes = map[string]string{} - } - if len(groups) > 1 { - out.IntentSnapshot.Attributes[attrAggregatedByRecipient] = "true" - } - out.IntentSnapshot.Attributes[attrAggregatedItems] = strconv.Itoa(len(out.IntentRefs)) - - targets := buildBatchPayoutTargets(groups) - if routeContainsCardPayout(out.QuoteSnapshot) && len(targets) > 0 { - raw, err := batchmeta.EncodePayoutTargets(targets) - if err != nil { - return opagg.Group{}, err + group.IntentRefs = normalizeIntentRefs(group.IntentRefs) + if len(group.IntentRefs) == 0 { + return nil, merrors.InvalidArgument("aggregated group has no intent refs") } - if strings.TrimSpace(raw) != "" { - out.IntentSnapshot.Attributes[batchmeta.AttrPayoutTargets] = raw + if group.IntentSnapshot.Attributes == nil { + group.IntentSnapshot.Attributes = map[string]string{} } + if len(group.IntentRefs) > 1 { + group.IntentSnapshot.Attributes[attrAggregatedByRecipient] = "true" + } else { + delete(group.IntentSnapshot.Attributes, attrAggregatedByRecipient) + } + group.IntentSnapshot.Attributes[attrAggregatedItems] = strconv.Itoa(len(group.IntentRefs)) + + targets := buildBatchPayoutTargets([]opagg.Group{group}) + if routeContainsCardPayout(group.QuoteSnapshot) && len(targets) > 0 { + raw, err := batchmeta.EncodePayoutTargets(targets) + if err != nil { + return nil, err + } + if strings.TrimSpace(raw) != "" { + group.IntentSnapshot.Attributes[batchmeta.AttrPayoutTargets] = raw + } + } else { + delete(group.IntentSnapshot.Attributes, batchmeta.AttrPayoutTargets) + } + out = append(out, group) + } + if len(out) == 0 { + return nil, merrors.InvalidArgument("aggregation produced no groups") } return out, nil } diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch_test.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch_test.go index 4be95db9..12496423 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch_test.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute_batch_test.go @@ -10,13 +10,14 @@ import ( "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/batchmeta" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec" "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/discovery" pm "github.com/tech/sendico/pkg/model" paymenttypes "github.com/tech/sendico/pkg/payments/types" orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2" "go.mongodb.org/mongo-driver/v2/bson" ) -func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) { +func TestExecuteBatchPayment_DefaultNoOptimization_DoesNotMergeSameDestination(t *testing.T) { env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { step := req.StepExecution step.State = agg.StepStateCompleted @@ -37,15 +38,17 @@ func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) { if resp == nil { t.Fatal("expected response") } - if got, want := len(resp.GetPayments()), 1; got != want { - t.Fatalf("expected %d payment(s) for same-destination merge, got=%d", want, got) + if got, want := len(resp.GetPayments()), 2; got != want { + t.Fatalf("expected %d payment(s) with default no optimization, got=%d", want, got) } - if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want { - t.Fatalf("state mismatch: got=%s want=%s", got, want) + for i, p := range resp.GetPayments() { + if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want { + t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want) + } } } -func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *testing.T) { +func TestExecuteBatchPayment_DefaultNoOptimization_DoesNotMergeDifferentDestinations(t *testing.T) { env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { step := req.StepExecution step.State = agg.StepStateCompleted @@ -63,8 +66,8 @@ func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *t if err != nil { t.Fatalf("ExecuteBatchPayment returned error: %v", err) } - if got, want := len(resp.GetPayments()), 1; got != want { - t.Fatalf("expected %d payment for batched execution, got=%d", want, got) + if got, want := len(resp.GetPayments()), 2; got != want { + t.Fatalf("expected %d payments for batched execution without optimization, got=%d", want, got) } for i, p := range resp.GetPayments() { if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want { @@ -73,7 +76,7 @@ func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *t } } -func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment(t *testing.T) { +func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsWithoutMerging(t *testing.T) { var ( targetRefs []string amounts []string @@ -99,11 +102,13 @@ func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment if err != nil { t.Fatalf("ExecuteBatchPayment returned error: %v", err) } - if got, want := len(resp.GetPayments()), 1; got != want { - t.Fatalf("expected %d payment for card batch, got=%d", want, got) + if got, want := len(resp.GetPayments()), 2; got != want { + t.Fatalf("expected %d payments for card batch without optimization, got=%d", want, got) } - if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want { - t.Fatalf("state mismatch: got=%s want=%s", got, want) + for i, payment := range resp.GetPayments() { + if got, want := payment.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want { + t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want) + } } if got, want := len(targetRefs), 2; got != want { t.Fatalf("expected %d card payout send calls, got=%d", want, got) @@ -122,6 +127,41 @@ func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment } } +func TestExecuteBatchPayment_CryptoPolicyMergesByDestination(t *testing.T) { + env := newTestEnvWithPolicy(t, BatchOptimizationPolicy{ + DefaultMode: BatchOptimizationModeNoOptimization, + Rules: []BatchOptimizationRule{ + { + ID: "crypto-merge", + Priority: 100, + Mode: BatchOptimizationModeMergeByDestination, + Match: BatchOptimizationMatch{ + Rail: discovery.RailCrypto, + }, + }, + }, + }, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { + step := req.StepExecution + step.State = agg.StepStateCompleted + return &sexec.ExecuteOutput{StepExecution: step}, nil + }) + + quote := newExecutableBatchCryptoQuoteSameDestination(env.orgID, "quote-batch-crypto-merge") + env.quotes.Put(quote) + + resp, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{ + Meta: testMeta(env.orgID, "idem-batch-crypto-merge"), + QuotationRef: "quote-batch-crypto-merge", + ClientPaymentRef: "client-batch-crypto-merge", + }) + if err != nil { + t.Fatalf("ExecuteBatchPayment returned error: %v", err) + } + if got, want := len(resp.GetPayments()), 1; got != want { + t.Fatalf("expected %d merged payment for crypto policy, got=%d", want, got) + } +} + func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) { env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) { step := req.StepExecution @@ -150,8 +190,23 @@ func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) { if got, want := len(resp2.GetPayments()), len(resp1.GetPayments()); got != want { t.Fatalf("expected same number of payments on retry: got=%d want=%d", got, want) } - if got, want := resp2.GetPayments()[0].GetPaymentRef(), resp1.GetPayments()[0].GetPaymentRef(); got != want { - t.Fatalf("expected same payment_ref on retry: got=%q want=%q", got, want) + refs1 := make([]string, 0, len(resp1.GetPayments())) + refs2 := make([]string, 0, len(resp2.GetPayments())) + for i := range resp1.GetPayments() { + refs1 = append(refs1, resp1.GetPayments()[i].GetPaymentRef()) + } + for i := range resp2.GetPayments() { + refs2 = append(refs2, resp2.GetPayments()[i].GetPaymentRef()) + } + sort.Strings(refs1) + sort.Strings(refs2) + if len(refs1) != len(refs2) { + t.Fatalf("payment refs count mismatch: got=%d want=%d", len(refs2), len(refs1)) + } + for i := range refs1 { + if refs1[i] != refs2[i] { + t.Fatalf("payment_ref mismatch on retry at index %d: got=%q want=%q", i, refs2[i], refs1[i]) + } } } @@ -307,3 +362,81 @@ func newExecutableBatchCardQuoteDiffDest(orgRef bson.ObjectID, quoteRef string) ExpiresAt: now.Add(1 * time.Hour), } } + +func newExecutableBatchCryptoQuoteSameDestination(orgRef bson.ObjectID, quoteRef string) *model.PaymentQuoteRecord { + now := time.Now().UTC() + return &model.PaymentQuoteRecord{ + Base: modelBase(now), + OrganizationBoundBase: pm.OrganizationBoundBase{ + OrganizationRef: orgRef, + }, + QuoteRef: quoteRef, + RequestShape: model.QuoteRequestShapeBatch, + Items: []*model.PaymentQuoteItemV2{ + { + Intent: &model.PaymentIntent{ + Ref: "intent-a", + Kind: model.PaymentKindPayout, + Source: testLedgerEndpoint("ledger-src"), + Destination: testExternalChainEndpoint("TQfCWxU4ERWstZZcMzEVB9MnnCSzE4agj8"), + Amount: &paymenttypes.Money{Amount: "12", Currency: "USDT"}, + SettlementCurrency: "USDT", + }, + Quote: &model.PaymentQuoteSnapshot{ + QuoteRef: quoteRef, + DebitAmount: &paymenttypes.Money{Amount: "12", Currency: "USDT"}, + ExpectedSettlementAmount: &paymenttypes.Money{Amount: "12", Currency: "USDT"}, + Route: &paymenttypes.QuoteRouteSpecification{ + Rail: "CRYPTO", + Provider: "gw-crypto", + Network: "TRON_NILE", + Hops: []*paymenttypes.QuoteRouteHop{ + {Index: 10, Rail: "LEDGER", Role: paymenttypes.QuoteRouteHopRoleSource}, + {Index: 20, Rail: "CRYPTO", Gateway: "gw-crypto", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleDestination}, + }, + }, + }, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + { + Intent: &model.PaymentIntent{ + Ref: "intent-b", + Kind: model.PaymentKindPayout, + Source: testLedgerEndpoint("ledger-src"), + Destination: testExternalChainEndpoint("TQfCWxU4ERWstZZcMzEVB9MnnCSzE4agj8"), + Amount: &paymenttypes.Money{Amount: "7", Currency: "USDT"}, + SettlementCurrency: "USDT", + }, + Quote: &model.PaymentQuoteSnapshot{ + QuoteRef: quoteRef, + DebitAmount: &paymenttypes.Money{Amount: "7", Currency: "USDT"}, + ExpectedSettlementAmount: &paymenttypes.Money{Amount: "7", Currency: "USDT"}, + Route: &paymenttypes.QuoteRouteSpecification{ + Rail: "CRYPTO", + Provider: "gw-crypto", + Network: "TRON_NILE", + Hops: []*paymenttypes.QuoteRouteHop{ + {Index: 10, Rail: "LEDGER", Role: paymenttypes.QuoteRouteHopRoleSource}, + {Index: 20, Rail: "CRYPTO", Gateway: "gw-crypto", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleDestination}, + }, + }, + }, + Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable}, + }, + }, + ExpiresAt: now.Add(1 * time.Hour), + } +} + +func testExternalChainEndpoint(address string) model.PaymentEndpoint { + return model.PaymentEndpoint{ + Type: model.EndpointTypeExternalChain, + ExternalChain: &model.ExternalChainEndpoint{ + Address: address, + Asset: &paymenttypes.Asset{ + Chain: "TRON", + TokenSymbol: "USDT", + }, + }, + } +} diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/module.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/module.go index 6f1335fb..4534eaca 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/module.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/module.go @@ -51,25 +51,27 @@ type Dependencies struct { QuoteStore qsnap.Store - Validator reqval.Validator - Idempotency idem.Service - Quote qsnap.Resolver - Aggregator opagg.Aggregator - Aggregate agg.Factory - Planner xplan.Compiler - State ostate.StateMachine - Scheduler ssched.Runtime - Executors sexec.Registry - Reconciler erecon.Reconciler - Repository prepo.Repository - Query pquery.Service - Mapper prmap.Mapper - Observer oobs.Observer - Producer msg.Producer + Validator reqval.Validator + Idempotency idem.Service + Quote qsnap.Resolver + BatchOptimizer BatchOptimizer + Aggregator opagg.Aggregator + Aggregate agg.Factory + Planner xplan.Compiler + State ostate.StateMachine + Scheduler ssched.Runtime + Executors sexec.Registry + Reconciler erecon.Reconciler + Repository prepo.Repository + Query pquery.Service + Mapper prmap.Mapper + Observer oobs.Observer + Producer msg.Producer - RetryPolicy ssched.RetryPolicy - Now func() time.Time - MaxTicks int + RetryPolicy ssched.RetryPolicy + BatchOptimizationPolicy BatchOptimizationPolicy + Now func() time.Time + MaxTicks int } func New(deps Dependencies) (Service, error) { diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/optimization_policy.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/optimization_policy.go new file mode 100644 index 00000000..dded7f9f --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/optimization_policy.go @@ -0,0 +1,361 @@ +package psvc + +import ( + "strings" + + "github.com/shopspring/decimal" + "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/discovery" +) + +// BatchOptimizationMode defines how ExecuteBatchPayment compacts batch operations. +type BatchOptimizationMode string + +const ( + BatchOptimizationModeUnspecified BatchOptimizationMode = "" + BatchOptimizationModeMergeByDestination BatchOptimizationMode = "merge_by_destination" + BatchOptimizationModeNoOptimization BatchOptimizationMode = "no_optimization" +) + +// BatchOptimizationPolicy configures batch operation compaction behavior. +// +// Rules are evaluated by "best match": +// 1. all specified match fields must match the item context; +// 2. winner is highest specificity (number of optional selectors present); +// 3. ties are resolved by larger Priority; +// 4. remaining ties keep the first rule order from config. +// +// Rail is mandatory per rule. Other match fields are optional. +type BatchOptimizationPolicy struct { + DefaultMode BatchOptimizationMode + Rules []BatchOptimizationRule +} + +// BatchOptimizationRule defines one matching rule. +type BatchOptimizationRule struct { + ID string + Enabled *bool + Priority int + Mode BatchOptimizationMode + Match BatchOptimizationMatch +} + +// BatchOptimizationMatch defines selectors for one rule. +type BatchOptimizationMatch struct { + Rail model.Rail + Providers []string + Networks []string + Currencies []string + Amount *BatchOptimizationAmountRange +} + +// BatchOptimizationAmountRange defines optional amount matching boundaries. +type BatchOptimizationAmountRange struct { + Min string + Max string + Currency string +} + +// BatchOptimizationContext is runtime data used for matching one batch item. +type BatchOptimizationContext struct { + Rail model.Rail + Provider string + Network string + Currency string + Amount string +} + +// BatchOptimizationSelection is the selected optimization decision. +type BatchOptimizationSelection struct { + Mode BatchOptimizationMode + RuleID string + Matched bool +} + +func defaultBatchOptimizationPolicy() BatchOptimizationPolicy { + return BatchOptimizationPolicy{ + DefaultMode: BatchOptimizationModeNoOptimization, + } +} + +func normalizeBatchOptimizationPolicy(in BatchOptimizationPolicy) BatchOptimizationPolicy { + out := defaultBatchOptimizationPolicy() + if normalizedDefault := normalizeBatchOptimizationMode(in.DefaultMode); normalizedDefault != BatchOptimizationModeUnspecified { + out.DefaultMode = normalizedDefault + } + if len(in.Rules) == 0 { + return out + } + + rules := make([]BatchOptimizationRule, 0, len(in.Rules)) + for i := range in.Rules { + rule, ok := normalizeBatchOptimizationRule(in.Rules[i]) + if !ok { + continue + } + rules = append(rules, rule) + } + if len(rules) > 0 { + out.Rules = rules + } + return out +} + +func normalizeBatchOptimizationRule(in BatchOptimizationRule) (BatchOptimizationRule, bool) { + out := BatchOptimizationRule{ + ID: strings.TrimSpace(in.ID), + Enabled: in.Enabled, + Priority: in.Priority, + Mode: normalizeBatchOptimizationMode(in.Mode), + } + if out.Mode == BatchOptimizationModeUnspecified { + return BatchOptimizationRule{}, false + } + + match, ok := normalizeBatchOptimizationMatch(in.Match) + if !ok { + return BatchOptimizationRule{}, false + } + out.Match = match + return out, true +} + +func normalizeBatchOptimizationMatch(in BatchOptimizationMatch) (BatchOptimizationMatch, bool) { + out := BatchOptimizationMatch{ + Rail: model.ParseRail(string(in.Rail)), + Providers: normalizeOptimizationTokens(in.Providers), + Networks: normalizeOptimizationTokens(in.Networks), + Currencies: normalizeOptimizationCurrencies(in.Currencies), + } + if out.Rail == discovery.RailUnspecified { + return BatchOptimizationMatch{}, false + } + if in.Amount != nil { + normalizedAmount, ok := normalizeBatchOptimizationAmountRange(*in.Amount) + if !ok { + return BatchOptimizationMatch{}, false + } + out.Amount = &normalizedAmount + } + return out, true +} + +func normalizeBatchOptimizationAmountRange(in BatchOptimizationAmountRange) (BatchOptimizationAmountRange, bool) { + out := BatchOptimizationAmountRange{ + Min: strings.TrimSpace(in.Min), + Max: strings.TrimSpace(in.Max), + Currency: normalizeOptimizationCurrency(in.Currency), + } + if out.Min == "" && out.Max == "" { + return BatchOptimizationAmountRange{}, false + } + + var ( + min decimal.Decimal + max decimal.Decimal + err error + ) + if out.Min != "" { + min, err = decimal.NewFromString(out.Min) + if err != nil { + return BatchOptimizationAmountRange{}, false + } + } + if out.Max != "" { + max, err = decimal.NewFromString(out.Max) + if err != nil { + return BatchOptimizationAmountRange{}, false + } + } + if out.Min != "" && out.Max != "" && min.GreaterThan(max) { + return BatchOptimizationAmountRange{}, false + } + return out, true +} + +func (p BatchOptimizationPolicy) Select(in BatchOptimizationContext) BatchOptimizationSelection { + policy := normalizeBatchOptimizationPolicy(p) + ctx := normalizeBatchOptimizationContext(in) + + selected := BatchOptimizationSelection{ + Mode: policy.DefaultMode, + } + + bestSpecificity := -1 + bestPriority := 0 + bestOrder := len(policy.Rules) + 1 + for i := range policy.Rules { + rule := policy.Rules[i] + if rule.Enabled != nil && !*rule.Enabled { + continue + } + specificity, ok := rule.matches(ctx) + if !ok { + continue + } + if specificity > bestSpecificity || + (specificity == bestSpecificity && rule.Priority > bestPriority) || + (specificity == bestSpecificity && rule.Priority == bestPriority && i < bestOrder) { + selected = BatchOptimizationSelection{ + Mode: rule.Mode, + RuleID: rule.ID, + Matched: true, + } + bestSpecificity = specificity + bestPriority = rule.Priority + bestOrder = i + } + } + return selected +} + +func (r BatchOptimizationRule) matches(ctx BatchOptimizationContext) (int, bool) { + match := r.Match + if match.Rail == discovery.RailUnspecified || ctx.Rail == discovery.RailUnspecified { + return 0, false + } + if match.Rail != ctx.Rail { + return 0, false + } + + specificity := 0 + if len(match.Providers) > 0 { + specificity++ + if !containsOptimizationToken(match.Providers, ctx.Provider) { + return 0, false + } + } + if len(match.Networks) > 0 { + specificity++ + if !containsOptimizationToken(match.Networks, ctx.Network) { + return 0, false + } + } + if len(match.Currencies) > 0 { + specificity++ + if !containsOptimizationToken(match.Currencies, ctx.Currency) { + return 0, false + } + } + if match.Amount != nil { + specificity++ + if !amountRangeMatches(*match.Amount, ctx.Amount, ctx.Currency) { + return 0, false + } + } + return specificity, true +} + +func amountRangeMatches(rule BatchOptimizationAmountRange, amountRaw string, currencyRaw string) bool { + value, err := decimal.NewFromString(strings.TrimSpace(amountRaw)) + if err != nil { + return false + } + if ruleCurrency := normalizeOptimizationCurrency(rule.Currency); ruleCurrency != "" && ruleCurrency != normalizeOptimizationCurrency(currencyRaw) { + return false + } + if rule.Min != "" { + min, parseErr := decimal.NewFromString(rule.Min) + if parseErr != nil || value.LessThan(min) { + return false + } + } + if rule.Max != "" { + max, parseErr := decimal.NewFromString(rule.Max) + if parseErr != nil || value.GreaterThan(max) { + return false + } + } + return true +} + +func normalizeBatchOptimizationContext(in BatchOptimizationContext) BatchOptimizationContext { + return BatchOptimizationContext{ + Rail: model.ParseRail(string(in.Rail)), + Provider: normalizeOptimizationToken(in.Provider), + Network: normalizeOptimizationToken(in.Network), + Currency: normalizeOptimizationCurrency(in.Currency), + Amount: strings.TrimSpace(in.Amount), + } +} + +func normalizeBatchOptimizationMode(raw BatchOptimizationMode) BatchOptimizationMode { + switch strings.ToLower(strings.TrimSpace(string(raw))) { + case "merge_by_destination", "merge-by-destination", "by_destination", "by-destination": + return BatchOptimizationModeMergeByDestination + case "no_optimization", "no-optimization", "none", "never_merge", "never-merge", "never": + return BatchOptimizationModeNoOptimization + default: + return BatchOptimizationModeUnspecified + } +} + +func normalizeOptimizationTokens(values []string) []string { + if len(values) == 0 { + return nil + } + seen := make(map[string]struct{}, len(values)) + out := make([]string, 0, len(values)) + for i := range values { + token := normalizeOptimizationToken(values[i]) + if token == "" { + continue + } + if _, exists := seen[token]; exists { + continue + } + seen[token] = struct{}{} + out = append(out, token) + } + if len(out) == 0 { + return nil + } + return out +} + +func normalizeOptimizationCurrencies(values []string) []string { + if len(values) == 0 { + return nil + } + seen := make(map[string]struct{}, len(values)) + out := make([]string, 0, len(values)) + for i := range values { + token := normalizeOptimizationCurrency(values[i]) + if token == "" { + continue + } + if _, exists := seen[token]; exists { + continue + } + seen[token] = struct{}{} + out = append(out, token) + } + if len(out) == 0 { + return nil + } + return out +} + +func normalizeOptimizationToken(value string) string { + return strings.ToUpper(strings.TrimSpace(value)) +} + +func normalizeOptimizationCurrency(value string) string { + return strings.ToUpper(strings.TrimSpace(value)) +} + +func containsOptimizationToken(values []string, value string) bool { + if len(values) == 0 { + return false + } + normalized := normalizeOptimizationToken(value) + if normalized == "" { + return false + } + for i := range values { + if values[i] == normalized { + return true + } + } + return false +} diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/optimization_policy_test.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/optimization_policy_test.go new file mode 100644 index 00000000..11bef0d0 --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/optimization_policy_test.go @@ -0,0 +1,167 @@ +package psvc + +import ( + "testing" + + "github.com/tech/sendico/pkg/discovery" +) + +func TestNormalizeBatchOptimizationPolicy_DefaultMode_NoOptimization(t *testing.T) { + got := normalizeBatchOptimizationPolicy(BatchOptimizationPolicy{}) + if got.DefaultMode != BatchOptimizationModeNoOptimization { + t.Fatalf("default_mode mismatch: got=%q want=%q", got.DefaultMode, BatchOptimizationModeNoOptimization) + } + if got.Rules != nil { + t.Fatalf("expected no rules, got=%d", len(got.Rules)) + } +} + +func TestNormalizeBatchOptimizationPolicy_DropsInvalidRules(t *testing.T) { + got := normalizeBatchOptimizationPolicy(BatchOptimizationPolicy{ + DefaultMode: "merge_by_destination", + Rules: []BatchOptimizationRule{ + { + ID: "missing-rail", + Mode: BatchOptimizationModeMergeByDestination, + Match: BatchOptimizationMatch{ + Rail: "unknown", + }, + }, + { + ID: "invalid-mode", + Mode: "x", + Match: BatchOptimizationMatch{ + Rail: discovery.RailCrypto, + }, + }, + { + ID: "valid", + Mode: BatchOptimizationModeMergeByDestination, + Match: BatchOptimizationMatch{ + Rail: discovery.RailCrypto, + }, + }, + }, + }) + + if got.DefaultMode != BatchOptimizationModeMergeByDestination { + t.Fatalf("default_mode mismatch: got=%q want=%q", got.DefaultMode, BatchOptimizationModeMergeByDestination) + } + if len(got.Rules) != 1 { + t.Fatalf("rules count mismatch: got=%d want=%d", len(got.Rules), 1) + } + if got.Rules[0].ID != "valid" { + t.Fatalf("rule id mismatch: got=%q want=%q", got.Rules[0].ID, "valid") + } +} + +func TestBatchOptimizationPolicy_Select_UsesBestSpecificityThenPriority(t *testing.T) { + policy := BatchOptimizationPolicy{ + DefaultMode: BatchOptimizationModeNoOptimization, + Rules: []BatchOptimizationRule{ + { + ID: "crypto-generic", + Priority: 10, + Mode: BatchOptimizationModeMergeByDestination, + Match: BatchOptimizationMatch{ + Rail: discovery.RailCrypto, + }, + }, + { + ID: "crypto-network", + Priority: 5, + Mode: BatchOptimizationModeNoOptimization, + Match: BatchOptimizationMatch{ + Rail: discovery.RailCrypto, + Networks: []string{"tron_nile"}, + }, + }, + { + ID: "crypto-network-priority", + Priority: 20, + Mode: BatchOptimizationModeMergeByDestination, + Match: BatchOptimizationMatch{ + Rail: discovery.RailCrypto, + Networks: []string{"tron_nile"}, + }, + }, + }, + } + + out := policy.Select(BatchOptimizationContext{ + Rail: discovery.RailCrypto, + Network: "TRON_NILE", + }) + if !out.Matched { + t.Fatal("expected matched rule") + } + if out.RuleID != "crypto-network-priority" { + t.Fatalf("rule id mismatch: got=%q want=%q", out.RuleID, "crypto-network-priority") + } + if out.Mode != BatchOptimizationModeMergeByDestination { + t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeMergeByDestination) + } +} + +func TestBatchOptimizationPolicy_Select_FallsBackToDefault(t *testing.T) { + policy := BatchOptimizationPolicy{ + DefaultMode: BatchOptimizationModeNoOptimization, + Rules: []BatchOptimizationRule{ + { + ID: "crypto-only", + Mode: BatchOptimizationModeMergeByDestination, + Match: BatchOptimizationMatch{ + Rail: discovery.RailCrypto, + }, + }, + }, + } + + out := policy.Select(BatchOptimizationContext{Rail: discovery.RailCardPayout}) + if out.Matched { + t.Fatalf("unexpected matched rule: %q", out.RuleID) + } + if out.Mode != BatchOptimizationModeNoOptimization { + t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeNoOptimization) + } +} + +func TestBatchOptimizationPolicy_Select_MatchesAmountRange(t *testing.T) { + policy := BatchOptimizationPolicy{ + DefaultMode: BatchOptimizationModeNoOptimization, + Rules: []BatchOptimizationRule{ + { + ID: "crypto-large-usdt", + Mode: BatchOptimizationModeMergeByDestination, + Match: BatchOptimizationMatch{ + Rail: discovery.RailCrypto, + Amount: &BatchOptimizationAmountRange{ + Min: "100", + Currency: "USDT", + }, + }, + }, + }, + } + + matched := policy.Select(BatchOptimizationContext{ + Rail: discovery.RailCrypto, + Amount: "120", + Currency: "USDT", + }) + if !matched.Matched { + t.Fatal("expected amount rule match") + } + + notMatched := policy.Select(BatchOptimizationContext{ + Rail: discovery.RailCrypto, + Amount: "50", + Currency: "USDT", + }) + if notMatched.Matched { + t.Fatalf("expected no amount rule match, got rule=%q", notMatched.RuleID) + } + if notMatched.Mode != BatchOptimizationModeNoOptimization { + t.Fatalf("default mode mismatch: got=%q want=%q", notMatched.Mode, BatchOptimizationModeNoOptimization) + } +} diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service.go index ccd3981e..9bb9a69d 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service.go @@ -32,21 +32,21 @@ type svc struct { quoteStore qsnap.Store - validator reqval.Validator - idempotency idem.Service - quote qsnap.Resolver - aggregator opagg.Aggregator - aggregate agg.Factory - planner xplan.Compiler - state ostate.StateMachine - scheduler ssched.Runtime - executors sexec.Registry - reconciler erecon.Reconciler - repository prepo.Repository - query pquery.Service - mapper prmap.Mapper - observer oobs.Observer - statuses paymentStatusPublisher + validator reqval.Validator + idempotency idem.Service + quote qsnap.Resolver + batchOptimizer BatchOptimizer + aggregate agg.Factory + planner xplan.Compiler + state ostate.StateMachine + scheduler ssched.Runtime + executors sexec.Registry + reconciler erecon.Reconciler + repository prepo.Repository + query pquery.Service + mapper prmap.Mapper + observer oobs.Observer + statuses paymentStatusPublisher retryPolicy ssched.RetryPolicy now func() time.Time @@ -93,21 +93,21 @@ func newService(deps Dependencies) (Service, error) { quoteStore: deps.QuoteStore, - validator: firstValidator(deps.Validator, logger), - idempotency: firstIdempotency(deps.Idempotency, logger), - quote: firstQuoteResolver(deps.Quote, logger), - aggregator: firstAggregator(deps.Aggregator, logger), - aggregate: firstAggregateFactory(deps.Aggregate, logger), - planner: firstPlanCompiler(deps.Planner, logger), - state: firstStateMachine(deps.State, logger), - scheduler: firstScheduler(deps.Scheduler, logger), - executors: firstExecutors(deps.Executors, logger), - reconciler: firstReconciler(deps.Reconciler, logger), - repository: deps.Repository, - query: query, - mapper: firstMapper(deps.Mapper, logger), - observer: observer, - statuses: newPaymentStatusPublisher(logger, deps.Producer), + validator: firstValidator(deps.Validator, logger), + idempotency: firstIdempotency(deps.Idempotency, logger), + quote: firstQuoteResolver(deps.Quote, logger), + batchOptimizer: firstBatchOptimizer(deps.BatchOptimizer, deps.BatchOptimizationPolicy, deps.Aggregator, logger), + aggregate: firstAggregateFactory(deps.Aggregate, logger), + planner: firstPlanCompiler(deps.Planner, logger), + state: firstStateMachine(deps.State, logger), + scheduler: firstScheduler(deps.Scheduler, logger), + executors: firstExecutors(deps.Executors, logger), + reconciler: firstReconciler(deps.Reconciler, logger), + repository: deps.Repository, + query: query, + mapper: firstMapper(deps.Mapper, logger), + observer: observer, + statuses: newPaymentStatusPublisher(logger, deps.Producer), retryPolicy: deps.RetryPolicy, now: deps.Now, @@ -153,6 +153,17 @@ func firstAggregateFactory(v agg.Factory, logger mlogger.Logger) agg.Factory { return agg.New(agg.Dependencies{Logger: logger}) } +func firstBatchOptimizer(v BatchOptimizer, policy BatchOptimizationPolicy, aggregator opagg.Aggregator, logger mlogger.Logger) BatchOptimizer { + if v != nil { + return v + } + return NewPolicyBatchOptimizer(PolicyBatchOptimizerDependencies{ + Logger: logger, + Aggregator: firstAggregator(aggregator, logger), + Policy: policy, + }) +} + func firstAggregator(v opagg.Aggregator, logger mlogger.Logger) opagg.Aggregator { if v != nil { return v diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service_e2e_test.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service_e2e_test.go index 0d74c6e9..a7abc0d1 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service_e2e_test.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/service_e2e_test.go @@ -366,6 +366,14 @@ type testEnv struct { } func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error)) *testEnv { + return newTestEnvWithPolicy(t, BatchOptimizationPolicy{}, handler) +} + +func newTestEnvWithPolicy( + t *testing.T, + policy BatchOptimizationPolicy, + handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error), +) *testEnv { t.Helper() repo := newMemoryRepo(func() time.Time { return time.Now().UTC() @@ -388,13 +396,14 @@ func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) ( producer := &capturingProducer{} svc, err := New(Dependencies{ - QuoteStore: quotes, - Repository: repo, - Executors: registry, - Observer: observer, - Producer: producer, - RetryPolicy: ssched.RetryPolicy{MaxAttempts: 2}, - MaxTicks: 20, + QuoteStore: quotes, + Repository: repo, + Executors: registry, + Observer: observer, + Producer: producer, + RetryPolicy: ssched.RetryPolicy{MaxAttempts: 2}, + BatchOptimizationPolicy: policy, + MaxTicks: 20, }) if err != nil { t.Fatalf("New returned error: %v", err) diff --git a/api/payments/orchestrator/internal/service/orchestrator/options.go b/api/payments/orchestrator/internal/service/orchestrator/options.go index b9eec828..705fe7d0 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/options.go +++ b/api/payments/orchestrator/internal/service/orchestrator/options.go @@ -7,6 +7,7 @@ import ( chainclient "github.com/tech/sendico/gateway/chain/client" ledgerclient "github.com/tech/sendico/ledger/client" + "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc" "github.com/tech/sendico/payments/storage/model" clockpkg "github.com/tech/sendico/pkg/clock" "github.com/tech/sendico/pkg/discovery" @@ -108,6 +109,16 @@ func WithGatewayRegistry(registry GatewayRegistry) Option { } } +// WithBatchOptimizationPolicy configures policy-based batch optimizer behavior. +func WithBatchOptimizationPolicy(policy psvc.BatchOptimizationPolicy) Option { + return func(s *Service) { + if s == nil { + return + } + s.batchOptimizationPolicy = policy + } +} + type discoveryGatewayRegistry struct { registry *discovery.Registry } diff --git a/api/payments/orchestrator/internal/service/orchestrator/service.go b/api/payments/orchestrator/internal/service/orchestrator/service.go index 10116f2b..17976cd5 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service.go @@ -24,13 +24,14 @@ type Service struct { paymentRepo prepo.Repository producer msg.Producer - ledgerClient ledgerclient.Client - gatewayInvokeResolver GatewayInvokeResolver - gatewayRegistry GatewayRegistry - cardGatewayRoutes map[string]CardGatewayRoute - paymentGatewayBroker mb.Broker - gatewayConsumers []msg.Consumer - stopExternalWorkers context.CancelFunc + ledgerClient ledgerclient.Client + gatewayInvokeResolver GatewayInvokeResolver + gatewayRegistry GatewayRegistry + cardGatewayRoutes map[string]CardGatewayRoute + batchOptimizationPolicy psvc.BatchOptimizationPolicy + paymentGatewayBroker mb.Broker + gatewayConsumers []msg.Consumer + stopExternalWorkers context.CancelFunc } // NewService constructs the v2 orchestrator service. @@ -53,11 +54,12 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro var err error svc.v2, svc.paymentRepo, err = newOrchestrationV2Service(svc.logger, repo, v2RuntimeDeps{ - LedgerClient: svc.ledgerClient, - GatewayInvokeResolver: svc.gatewayInvokeResolver, - GatewayRegistry: svc.gatewayRegistry, - CardGatewayRoutes: svc.cardGatewayRoutes, - Producer: svc.producer, + LedgerClient: svc.ledgerClient, + GatewayInvokeResolver: svc.gatewayInvokeResolver, + GatewayRegistry: svc.gatewayRegistry, + CardGatewayRoutes: svc.cardGatewayRoutes, + BatchOptimizationPolicy: svc.batchOptimizationPolicy, + Producer: svc.producer, }) svc.startExternalRuntime() if err != nil { diff --git a/api/payments/orchestrator/internal/service/orchestrator/service_v2.go b/api/payments/orchestrator/internal/service/orchestrator/service_v2.go index 95233f1e..766c3334 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service_v2.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service_v2.go @@ -25,11 +25,12 @@ type v2MongoDBProvider interface { } type v2RuntimeDeps struct { - LedgerClient ledgerclient.Client - GatewayInvokeResolver GatewayInvokeResolver - GatewayRegistry GatewayRegistry - CardGatewayRoutes map[string]CardGatewayRoute - Producer msg.Producer + LedgerClient ledgerclient.Client + GatewayInvokeResolver GatewayInvokeResolver + GatewayRegistry GatewayRegistry + CardGatewayRoutes map[string]CardGatewayRoute + BatchOptimizationPolicy psvc.BatchOptimizationPolicy + Producer msg.Producer } func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository, runtimeDeps v2RuntimeDeps) (psvc.Service, prepo.Repository, error) { @@ -70,13 +71,14 @@ func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository, r executors := buildOrchestrationV2Executors(logger, runtimeDeps) svc, err := psvc.New(psvc.Dependencies{ - Logger: logger.Named("v2"), - QuoteStore: repo.Quotes(), - Repository: paymentRepo, - Query: query, - Observer: observer, - Executors: executors, - Producer: runtimeDeps.Producer, + Logger: logger.Named("v2"), + QuoteStore: repo.Quotes(), + Repository: paymentRepo, + Query: query, + Observer: observer, + Executors: executors, + BatchOptimizationPolicy: runtimeDeps.BatchOptimizationPolicy, + Producer: runtimeDeps.Producer, }) if err != nil { logger.Error("Orchestration v2 disabled: service init failed", zap.Error(err))