added payment orchestrator optimizer tweaks

This commit is contained in:
Stephan D
2026-03-11 15:58:18 +01:00
parent 27b4ece6c6
commit 9f998b8134
19 changed files with 1164 additions and 157 deletions

View File

@@ -44,4 +44,17 @@ card_gateways:
funding_address: "TUaWaCkiXwYPKm5qjcB27Lhwv976vPvedE"
fee_wallet_ref: "697a062a248dc785125ccb9e"
# Batch optimizer settings:
# - default_mode disables aggregation for unmatched traffic
# - crypto rule enables destination aggregation for crypto rail
optimizer:
aggregation:
default_mode: "no_optimization"
rules:
- id: "crypto_by_destination"
priority: 100
mode: "merge_by_destination"
match:
rail: "CRYPTO"
# Gateway instances and capabilities are sourced from service discovery.

View File

@@ -44,4 +44,17 @@ card_gateways:
funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF"
fee_wallet_ref: "694c124ed76f9f811ac57133"
# Batch optimizer settings:
# - default_mode disables aggregation for unmatched traffic
# - crypto rule enables destination aggregation for crypto rail
optimizer:
aggregation:
default_mode: "no_optimization"
rules:
- id: "crypto_by_destination"
priority: 100
mode: "merge_by_destination"
match:
rail: "CRYPTO"
# Gateway instances and capabilities are sourced from service discovery.

View File

@@ -3,7 +3,9 @@ package serverimp
import (
"strings"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/mlogger"
)
@@ -42,3 +44,52 @@ func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, re
}
return orchestrator.NewDiscoveryGatewayRegistry(logger, registry)
}
func buildBatchOptimizationPolicy(cfg optimizerConfig) psvc.BatchOptimizationPolicy {
rules := make([]psvc.BatchOptimizationRule, 0, len(cfg.Aggregation.Rules))
for i := range cfg.Aggregation.Rules {
ruleCfg := cfg.Aggregation.Rules[i]
rule := psvc.BatchOptimizationRule{
ID: strings.TrimSpace(ruleCfg.ID),
Enabled: ruleCfg.Enabled,
Priority: ruleCfg.Priority,
Mode: psvc.BatchOptimizationMode(strings.TrimSpace(ruleCfg.Mode)),
Match: psvc.BatchOptimizationMatch{
Rail: model.ParseRail(ruleCfg.Match.Rail),
Providers: cloneTrimmedSlice(ruleCfg.Match.Providers),
Networks: cloneTrimmedSlice(ruleCfg.Match.Networks),
Currencies: cloneTrimmedSlice(ruleCfg.Match.Currencies),
},
}
if ruleCfg.Match.Amount != nil {
rule.Match.Amount = &psvc.BatchOptimizationAmountRange{
Min: strings.TrimSpace(ruleCfg.Match.Amount.Min),
Max: strings.TrimSpace(ruleCfg.Match.Amount.Max),
Currency: strings.TrimSpace(ruleCfg.Match.Amount.Currency),
}
}
rules = append(rules, rule)
}
return psvc.BatchOptimizationPolicy{
DefaultMode: psvc.BatchOptimizationMode(strings.TrimSpace(cfg.Aggregation.DefaultMode)),
Rules: rules,
}
}
func cloneTrimmedSlice(values []string) []string {
if len(values) == 0 {
return nil
}
out := make([]string, 0, len(values))
for i := range values {
token := strings.TrimSpace(values[i])
if token == "" {
continue
}
out = append(out, token)
}
if len(out) == 0 {
return nil
}
return out
}

View File

@@ -15,9 +15,41 @@ type config struct {
*grpcapp.Config `yaml:",inline"`
CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"`
GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"`
Optimizer optimizerConfig `yaml:"optimizer"`
QuoteRetentionHrs int `yaml:"quote_retention_hours"`
}
type optimizerConfig struct {
Aggregation aggregationConfig `yaml:"aggregation"`
}
type aggregationConfig struct {
DefaultMode string `yaml:"default_mode"`
Rules []aggregationRuleConfig `yaml:"rules"`
}
type aggregationRuleConfig struct {
ID string `yaml:"id"`
Enabled *bool `yaml:"enabled"`
Priority int `yaml:"priority"`
Mode string `yaml:"mode"`
Match aggregationMatchConfig `yaml:"match"`
}
type aggregationMatchConfig struct {
Rail string `yaml:"rail"`
Providers []string `yaml:"providers"`
Networks []string `yaml:"networks"`
Currencies []string `yaml:"currencies"`
Amount *aggregationAmountConfig `yaml:"amount"`
}
type aggregationAmountConfig struct {
Min string `yaml:"min"`
Max string `yaml:"max"`
Currency string `yaml:"currency"`
}
type cardGatewayRouteConfig struct {
FundingAddress string `yaml:"funding_address"`
FeeAddress string `yaml:"fee_address"`

View File

@@ -40,6 +40,7 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest
if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 {
opts = append(opts, orchestrator.WithCardGatewayRoutes(routes))
}
opts = append(opts, orchestrator.WithBatchOptimizationPolicy(buildBatchOptimizationPolicy(cfg.Optimizer)))
if registry := buildGatewayRegistry(i.logger, cfg.GatewayInstances, i.discoveryReg); registry != nil {
opts = append(opts, orchestrator.WithGatewayRegistry(registry))
}

View File

@@ -16,11 +16,22 @@ type Input struct {
Items []Item
}
// MergeMode controls whether an item can be compacted with compatible peers.
type MergeMode string
const (
MergeModeUnspecified MergeMode = ""
MergeModeByRecipient MergeMode = "merge_by_recipient"
MergeModeNever MergeMode = "never_merge"
)
// Item is one quote-intent pair candidate for aggregation.
type Item struct {
IntentRef string
IntentSnapshot model.PaymentIntent
QuoteSnapshot *model.PaymentQuoteSnapshot
MergeMode MergeMode
PolicyTag string
}
// Group is one aggregated recipient operation group.

View File

@@ -64,6 +64,12 @@ func (s *svc) Aggregate(in Input) (out *Output, err error) {
if !isBatchingEligible(item.QuoteSnapshot) {
key = key + keySep + "non_batching=" + itoa(i)
}
if normalizeMergeMode(item.MergeMode) == MergeModeNever {
key = key + keySep + "merge_mode=never:" + itoa(i)
}
if policyTag := strings.TrimSpace(item.PolicyTag); policyTag != "" {
key = key + keySep + "policy_tag=" + policyTag
}
intentRef := firstNonEmpty(
strings.TrimSpace(item.IntentRef),
@@ -190,3 +196,12 @@ func isBatchingEligible(quote *model.PaymentQuoteSnapshot) bool {
}
return quote.ExecutionConditions.BatchingEligible
}
func normalizeMergeMode(mode MergeMode) MergeMode {
switch strings.ToLower(strings.TrimSpace(string(mode))) {
case strings.ToLower(string(MergeModeNever)):
return MergeModeNever
default:
return MergeModeByRecipient
}
}

View File

@@ -0,0 +1,184 @@
package psvc
import (
"strconv"
"strings"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/opagg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/qsnap"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.uber.org/zap"
)
// BatchOptimizeInput is optimizer payload for one ExecuteBatchPayment call.
type BatchOptimizeInput struct {
Items []qsnap.ResolvedItem
}
// BatchOptimizeOutput is optimizer result payload.
type BatchOptimizeOutput struct {
Groups []opagg.Group
}
// BatchOptimizer encapsulates batch item compaction policy and aggregation.
type BatchOptimizer interface {
Optimize(in BatchOptimizeInput) (*BatchOptimizeOutput, error)
}
// PolicyBatchOptimizerDependencies configures policy-based optimizer implementation.
type PolicyBatchOptimizerDependencies struct {
Logger mlogger.Logger
Aggregator opagg.Aggregator
Policy BatchOptimizationPolicy
}
type policyBatchOptimizer struct {
logger mlogger.Logger
aggregator opagg.Aggregator
policy BatchOptimizationPolicy
}
// NewPolicyBatchOptimizer creates default rule-based optimizer implementation.
func NewPolicyBatchOptimizer(deps PolicyBatchOptimizerDependencies) BatchOptimizer {
logger := deps.Logger
if logger == nil {
logger = zap.NewNop()
}
aggregator := deps.Aggregator
if aggregator == nil {
aggregator = opagg.New(opagg.Dependencies{Logger: logger})
}
return &policyBatchOptimizer{
logger: logger.Named("batch_optimizer"),
aggregator: aggregator,
policy: normalizeBatchOptimizationPolicy(deps.Policy),
}
}
func (o *policyBatchOptimizer) Optimize(in BatchOptimizeInput) (*BatchOptimizeOutput, error) {
if len(in.Items) == 0 {
return nil, merrors.InvalidArgument("items are required")
}
aggItems := make([]opagg.Item, 0, len(in.Items))
for i := range in.Items {
item := in.Items[i]
selection := o.policy.Select(batchOptimizationContextFromItem(item))
intentSnapshot := applyBatchOptimizationSelection(item.IntentSnapshot, selection)
aggItems = append(aggItems, opagg.Item{
IntentRef: item.IntentRef,
IntentSnapshot: intentSnapshot,
QuoteSnapshot: item.QuoteSnapshot,
MergeMode: mergeModeFromBatchOptimization(selection.Mode),
PolicyTag: batchOptimizationSelectionTag(selection),
})
}
aggOutput, err := o.aggregator.Aggregate(opagg.Input{Items: aggItems})
if err != nil {
return nil, err
}
if aggOutput == nil || len(aggOutput.Groups) == 0 {
return nil, merrors.InvalidArgument("aggregation produced no groups")
}
return &BatchOptimizeOutput{Groups: aggOutput.Groups}, nil
}
func batchOptimizationContextFromItem(item qsnap.ResolvedItem) BatchOptimizationContext {
rail, provider, network := destinationHopSignature(item.QuoteSnapshot)
money := selectSettlementMoney(item)
ctx := BatchOptimizationContext{
Rail: rail,
Provider: provider,
Network: network,
}
if money != nil {
ctx.Amount = strings.TrimSpace(money.Amount)
ctx.Currency = strings.TrimSpace(money.Currency)
}
return ctx
}
func selectSettlementMoney(item qsnap.ResolvedItem) *paymenttypes.Money {
if item.QuoteSnapshot != nil && item.QuoteSnapshot.ExpectedSettlementAmount != nil {
return item.QuoteSnapshot.ExpectedSettlementAmount
}
return item.IntentSnapshot.Amount
}
func destinationHopSignature(snapshot *model.PaymentQuoteSnapshot) (model.Rail, string, string) {
if snapshot == nil || snapshot.Route == nil {
return discovery.RailUnspecified, "", ""
}
route := snapshot.Route
rail := model.ParseRail(route.Rail)
provider := strings.TrimSpace(route.Provider)
network := strings.TrimSpace(route.Network)
var destinationHop *paymenttypes.QuoteRouteHop
for i := range route.Hops {
hop := route.Hops[i]
if hop == nil {
continue
}
destinationHop = hop
if hop.Role == paymenttypes.QuoteRouteHopRoleDestination {
break
}
}
if destinationHop != nil {
if parsed := model.ParseRail(destinationHop.Rail); parsed != discovery.RailUnspecified {
rail = parsed
}
if provider == "" {
provider = strings.TrimSpace(destinationHop.Gateway)
}
if network == "" {
network = strings.TrimSpace(destinationHop.Network)
}
}
return rail, provider, network
}
func mergeModeFromBatchOptimization(mode BatchOptimizationMode) opagg.MergeMode {
switch normalizeBatchOptimizationMode(mode) {
case BatchOptimizationModeMergeByDestination:
return opagg.MergeModeByRecipient
default:
return opagg.MergeModeNever
}
}
func batchOptimizationSelectionTag(sel BatchOptimizationSelection) string {
mode := normalizeBatchOptimizationMode(sel.Mode)
matched := strconv.FormatBool(sel.Matched)
ruleID := strings.TrimSpace(sel.RuleID)
if ruleID == "" {
ruleID = "default"
}
return string(mode) + "|" + matched + "|" + ruleID
}
func applyBatchOptimizationSelection(intent model.PaymentIntent, sel BatchOptimizationSelection) model.PaymentIntent {
if intent.Attributes == nil {
intent.Attributes = map[string]string{}
}
mode := normalizeBatchOptimizationMode(sel.Mode)
if mode == BatchOptimizationModeUnspecified {
mode = BatchOptimizationModeNoOptimization
}
intent.Attributes[attrBatchOptimizerMode] = string(mode)
intent.Attributes[attrBatchOptimizerRuleMatch] = strconv.FormatBool(sel.Matched)
ruleID := strings.TrimSpace(sel.RuleID)
if ruleID == "" {
delete(intent.Attributes, attrBatchOptimizerRuleID)
} else {
intent.Attributes[attrBatchOptimizerRuleID] = ruleID
}
return intent
}

View File

@@ -28,8 +28,11 @@ import (
)
const (
attrAggregatedByRecipient = "orchestrator.v2.aggregated_by_recipient"
attrAggregatedItems = "orchestrator.v2.aggregated_items"
attrAggregatedByRecipient = "orchestrator.v2.aggregated_by_recipient"
attrAggregatedItems = "orchestrator.v2.aggregated_items"
attrBatchOptimizerMode = "orchestrator.v2.batch_optimizer_mode"
attrBatchOptimizerRuleID = "orchestrator.v2.batch_optimizer_rule_id"
attrBatchOptimizerRuleMatch = "orchestrator.v2.batch_optimizer_rule_matched"
)
func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (resp *orchestrationv2.ExecuteBatchPaymentResponse, err error) {
@@ -71,31 +74,27 @@ func (s *svc) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.Exec
return nil, merrors.InvalidArgument("quotation has no executable items")
}
aggItems := make([]opagg.Item, 0, len(resolved.Items))
for _, item := range resolved.Items {
aggItems = append(aggItems, opagg.Item{
IntentRef: item.IntentRef,
IntentSnapshot: item.IntentSnapshot,
QuoteSnapshot: item.QuoteSnapshot,
})
}
aggOutput, err := s.aggregator.Aggregate(opagg.Input{Items: aggItems})
optimized, err := s.batchOptimizer.Optimize(BatchOptimizeInput{Items: resolved.Items})
if err != nil {
return nil, err
}
group, err := s.buildBatchOperationGroup(aggOutput.Groups)
groups, err := s.buildBatchOperationGroups(optimized.Groups)
if err != nil {
return nil, err
}
payment, err := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, group)
if err != nil {
return nil, err
payments := make([]*orchestrationv2.Payment, 0, len(groups))
for i := range groups {
payment, groupErr := s.executeGroup(ctx, requestCtx, resolved.QuotationRef, groups[i])
if groupErr != nil {
return nil, groupErr
}
protoPayment, mapErr := s.mapPayment(payment)
if mapErr != nil {
return nil, mapErr
}
payments = append(payments, protoPayment)
}
protoPayment, err := s.mapPayment(payment)
if err != nil {
return nil, err
}
return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: []*orchestrationv2.Payment{protoPayment}}, nil
return &orchestrationv2.ExecuteBatchPaymentResponse{Payments: payments}, nil
}
func (s *svc) prepareBatchExecute(req *orchestrationv2.ExecuteBatchPaymentRequest) (*reqval.Ctx, error) {
@@ -270,56 +269,44 @@ func normalizeIntentRefs(values []string) []string {
return out
}
func (s *svc) buildBatchOperationGroup(groups []opagg.Group) (opagg.Group, error) {
func (s *svc) buildBatchOperationGroups(groups []opagg.Group) ([]opagg.Group, error) {
if len(groups) == 0 {
return opagg.Group{}, merrors.InvalidArgument("aggregation produced no groups")
return nil, merrors.InvalidArgument("aggregation produced no groups")
}
anchorDestination := groups[0].IntentSnapshot.Destination
synthetic := make([]opagg.Item, 0, len(groups))
allIntentRefs := make([]string, 0, len(groups))
out := make([]opagg.Group, 0, len(groups))
for i := range groups {
group := groups[i]
intent := group.IntentSnapshot
intent.Destination = anchorDestination
synthetic = append(synthetic, opagg.Item{
IntentRef: strings.Join(normalizeIntentRefs(group.IntentRefs), ","),
IntentSnapshot: intent,
QuoteSnapshot: group.QuoteSnapshot,
})
allIntentRefs = append(allIntentRefs, group.IntentRefs...)
}
collapsed, err := s.aggregator.Aggregate(opagg.Input{Items: synthetic})
if err != nil {
return opagg.Group{}, err
}
if collapsed == nil || len(collapsed.Groups) != 1 {
return opagg.Group{}, merrors.InvalidArgument("batch quotation contains incompatible operation groups")
}
out := collapsed.Groups[0]
out.IntentRefs = normalizeIntentRefs(allIntentRefs)
if len(out.IntentRefs) == 0 {
return opagg.Group{}, merrors.InvalidArgument("aggregated group has no intent refs")
}
if out.IntentSnapshot.Attributes == nil {
out.IntentSnapshot.Attributes = map[string]string{}
}
if len(groups) > 1 {
out.IntentSnapshot.Attributes[attrAggregatedByRecipient] = "true"
}
out.IntentSnapshot.Attributes[attrAggregatedItems] = strconv.Itoa(len(out.IntentRefs))
targets := buildBatchPayoutTargets(groups)
if routeContainsCardPayout(out.QuoteSnapshot) && len(targets) > 0 {
raw, err := batchmeta.EncodePayoutTargets(targets)
if err != nil {
return opagg.Group{}, err
group.IntentRefs = normalizeIntentRefs(group.IntentRefs)
if len(group.IntentRefs) == 0 {
return nil, merrors.InvalidArgument("aggregated group has no intent refs")
}
if strings.TrimSpace(raw) != "" {
out.IntentSnapshot.Attributes[batchmeta.AttrPayoutTargets] = raw
if group.IntentSnapshot.Attributes == nil {
group.IntentSnapshot.Attributes = map[string]string{}
}
if len(group.IntentRefs) > 1 {
group.IntentSnapshot.Attributes[attrAggregatedByRecipient] = "true"
} else {
delete(group.IntentSnapshot.Attributes, attrAggregatedByRecipient)
}
group.IntentSnapshot.Attributes[attrAggregatedItems] = strconv.Itoa(len(group.IntentRefs))
targets := buildBatchPayoutTargets([]opagg.Group{group})
if routeContainsCardPayout(group.QuoteSnapshot) && len(targets) > 0 {
raw, err := batchmeta.EncodePayoutTargets(targets)
if err != nil {
return nil, err
}
if strings.TrimSpace(raw) != "" {
group.IntentSnapshot.Attributes[batchmeta.AttrPayoutTargets] = raw
}
} else {
delete(group.IntentSnapshot.Attributes, batchmeta.AttrPayoutTargets)
}
out = append(out, group)
}
if len(out) == 0 {
return nil, merrors.InvalidArgument("aggregation produced no groups")
}
return out, nil
}

View File

@@ -10,13 +10,14 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/batchmeta"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
pm "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
"go.mongodb.org/mongo-driver/v2/bson"
)
func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) {
func TestExecuteBatchPayment_DefaultNoOptimization_DoesNotMergeSameDestination(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
@@ -37,15 +38,17 @@ func TestExecuteBatchPayment_SameDestinationMerges(t *testing.T) {
if resp == nil {
t.Fatal("expected response")
}
if got, want := len(resp.GetPayments()), 1; got != want {
t.Fatalf("expected %d payment(s) for same-destination merge, got=%d", want, got)
if got, want := len(resp.GetPayments()), 2; got != want {
t.Fatalf("expected %d payment(s) with default no optimization, got=%d", want, got)
}
if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("state mismatch: got=%s want=%s", got, want)
for i, p := range resp.GetPayments() {
if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want)
}
}
}
func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *testing.T) {
func TestExecuteBatchPayment_DefaultNoOptimization_DoesNotMergeDifferentDestinations(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
@@ -63,8 +66,8 @@ func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *t
if err != nil {
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
}
if got, want := len(resp.GetPayments()), 1; got != want {
t.Fatalf("expected %d payment for batched execution, got=%d", want, got)
if got, want := len(resp.GetPayments()), 2; got != want {
t.Fatalf("expected %d payments for batched execution without optimization, got=%d", want, got)
}
for i, p := range resp.GetPayments() {
if got, want := p.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
@@ -73,7 +76,7 @@ func TestExecuteBatchPayment_DifferentDestinationsCompactsIntoSinglePayment(t *t
}
}
func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment(t *testing.T) {
func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsWithoutMerging(t *testing.T) {
var (
targetRefs []string
amounts []string
@@ -99,11 +102,13 @@ func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment
if err != nil {
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
}
if got, want := len(resp.GetPayments()), 1; got != want {
t.Fatalf("expected %d payment for card batch, got=%d", want, got)
if got, want := len(resp.GetPayments()), 2; got != want {
t.Fatalf("expected %d payments for card batch without optimization, got=%d", want, got)
}
if got, want := resp.GetPayments()[0].GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("state mismatch: got=%s want=%s", got, want)
for i, payment := range resp.GetPayments() {
if got, want := payment.GetState(), orchestrationv2.OrchestrationState_ORCHESTRATION_STATE_SETTLED; got != want {
t.Fatalf("payments[%d] state mismatch: got=%s want=%s", i, got, want)
}
}
if got, want := len(targetRefs), 2; got != want {
t.Fatalf("expected %d card payout send calls, got=%d", want, got)
@@ -122,6 +127,41 @@ func TestExecuteBatchPayment_CardBatchCreatesPerTargetPayoutStepsInSinglePayment
}
}
func TestExecuteBatchPayment_CryptoPolicyMergesByDestination(t *testing.T) {
env := newTestEnvWithPolicy(t, BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-merge",
Priority: 100,
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
},
}, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
return &sexec.ExecuteOutput{StepExecution: step}, nil
})
quote := newExecutableBatchCryptoQuoteSameDestination(env.orgID, "quote-batch-crypto-merge")
env.quotes.Put(quote)
resp, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{
Meta: testMeta(env.orgID, "idem-batch-crypto-merge"),
QuotationRef: "quote-batch-crypto-merge",
ClientPaymentRef: "client-batch-crypto-merge",
})
if err != nil {
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
}
if got, want := len(resp.GetPayments()), 1; got != want {
t.Fatalf("expected %d merged payment for crypto policy, got=%d", want, got)
}
}
func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
@@ -150,8 +190,23 @@ func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) {
if got, want := len(resp2.GetPayments()), len(resp1.GetPayments()); got != want {
t.Fatalf("expected same number of payments on retry: got=%d want=%d", got, want)
}
if got, want := resp2.GetPayments()[0].GetPaymentRef(), resp1.GetPayments()[0].GetPaymentRef(); got != want {
t.Fatalf("expected same payment_ref on retry: got=%q want=%q", got, want)
refs1 := make([]string, 0, len(resp1.GetPayments()))
refs2 := make([]string, 0, len(resp2.GetPayments()))
for i := range resp1.GetPayments() {
refs1 = append(refs1, resp1.GetPayments()[i].GetPaymentRef())
}
for i := range resp2.GetPayments() {
refs2 = append(refs2, resp2.GetPayments()[i].GetPaymentRef())
}
sort.Strings(refs1)
sort.Strings(refs2)
if len(refs1) != len(refs2) {
t.Fatalf("payment refs count mismatch: got=%d want=%d", len(refs2), len(refs1))
}
for i := range refs1 {
if refs1[i] != refs2[i] {
t.Fatalf("payment_ref mismatch on retry at index %d: got=%q want=%q", i, refs2[i], refs1[i])
}
}
}
@@ -307,3 +362,81 @@ func newExecutableBatchCardQuoteDiffDest(orgRef bson.ObjectID, quoteRef string)
ExpiresAt: now.Add(1 * time.Hour),
}
}
func newExecutableBatchCryptoQuoteSameDestination(orgRef bson.ObjectID, quoteRef string) *model.PaymentQuoteRecord {
now := time.Now().UTC()
return &model.PaymentQuoteRecord{
Base: modelBase(now),
OrganizationBoundBase: pm.OrganizationBoundBase{
OrganizationRef: orgRef,
},
QuoteRef: quoteRef,
RequestShape: model.QuoteRequestShapeBatch,
Items: []*model.PaymentQuoteItemV2{
{
Intent: &model.PaymentIntent{
Ref: "intent-a",
Kind: model.PaymentKindPayout,
Source: testLedgerEndpoint("ledger-src"),
Destination: testExternalChainEndpoint("TQfCWxU4ERWstZZcMzEVB9MnnCSzE4agj8"),
Amount: &paymenttypes.Money{Amount: "12", Currency: "USDT"},
SettlementCurrency: "USDT",
},
Quote: &model.PaymentQuoteSnapshot{
QuoteRef: quoteRef,
DebitAmount: &paymenttypes.Money{Amount: "12", Currency: "USDT"},
ExpectedSettlementAmount: &paymenttypes.Money{Amount: "12", Currency: "USDT"},
Route: &paymenttypes.QuoteRouteSpecification{
Rail: "CRYPTO",
Provider: "gw-crypto",
Network: "TRON_NILE",
Hops: []*paymenttypes.QuoteRouteHop{
{Index: 10, Rail: "LEDGER", Role: paymenttypes.QuoteRouteHopRoleSource},
{Index: 20, Rail: "CRYPTO", Gateway: "gw-crypto", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleDestination},
},
},
},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
{
Intent: &model.PaymentIntent{
Ref: "intent-b",
Kind: model.PaymentKindPayout,
Source: testLedgerEndpoint("ledger-src"),
Destination: testExternalChainEndpoint("TQfCWxU4ERWstZZcMzEVB9MnnCSzE4agj8"),
Amount: &paymenttypes.Money{Amount: "7", Currency: "USDT"},
SettlementCurrency: "USDT",
},
Quote: &model.PaymentQuoteSnapshot{
QuoteRef: quoteRef,
DebitAmount: &paymenttypes.Money{Amount: "7", Currency: "USDT"},
ExpectedSettlementAmount: &paymenttypes.Money{Amount: "7", Currency: "USDT"},
Route: &paymenttypes.QuoteRouteSpecification{
Rail: "CRYPTO",
Provider: "gw-crypto",
Network: "TRON_NILE",
Hops: []*paymenttypes.QuoteRouteHop{
{Index: 10, Rail: "LEDGER", Role: paymenttypes.QuoteRouteHopRoleSource},
{Index: 20, Rail: "CRYPTO", Gateway: "gw-crypto", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleDestination},
},
},
},
Status: &model.QuoteStatusV2{State: model.QuoteStateExecutable},
},
},
ExpiresAt: now.Add(1 * time.Hour),
}
}
func testExternalChainEndpoint(address string) model.PaymentEndpoint {
return model.PaymentEndpoint{
Type: model.EndpointTypeExternalChain,
ExternalChain: &model.ExternalChainEndpoint{
Address: address,
Asset: &paymenttypes.Asset{
Chain: "TRON",
TokenSymbol: "USDT",
},
},
}
}

View File

@@ -51,25 +51,27 @@ type Dependencies struct {
QuoteStore qsnap.Store
Validator reqval.Validator
Idempotency idem.Service
Quote qsnap.Resolver
Aggregator opagg.Aggregator
Aggregate agg.Factory
Planner xplan.Compiler
State ostate.StateMachine
Scheduler ssched.Runtime
Executors sexec.Registry
Reconciler erecon.Reconciler
Repository prepo.Repository
Query pquery.Service
Mapper prmap.Mapper
Observer oobs.Observer
Producer msg.Producer
Validator reqval.Validator
Idempotency idem.Service
Quote qsnap.Resolver
BatchOptimizer BatchOptimizer
Aggregator opagg.Aggregator
Aggregate agg.Factory
Planner xplan.Compiler
State ostate.StateMachine
Scheduler ssched.Runtime
Executors sexec.Registry
Reconciler erecon.Reconciler
Repository prepo.Repository
Query pquery.Service
Mapper prmap.Mapper
Observer oobs.Observer
Producer msg.Producer
RetryPolicy ssched.RetryPolicy
Now func() time.Time
MaxTicks int
RetryPolicy ssched.RetryPolicy
BatchOptimizationPolicy BatchOptimizationPolicy
Now func() time.Time
MaxTicks int
}
func New(deps Dependencies) (Service, error) {

View File

@@ -0,0 +1,361 @@
package psvc
import (
"strings"
"github.com/shopspring/decimal"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
)
// BatchOptimizationMode defines how ExecuteBatchPayment compacts batch operations.
type BatchOptimizationMode string
const (
BatchOptimizationModeUnspecified BatchOptimizationMode = ""
BatchOptimizationModeMergeByDestination BatchOptimizationMode = "merge_by_destination"
BatchOptimizationModeNoOptimization BatchOptimizationMode = "no_optimization"
)
// BatchOptimizationPolicy configures batch operation compaction behavior.
//
// Rules are evaluated by "best match":
// 1. all specified match fields must match the item context;
// 2. winner is highest specificity (number of optional selectors present);
// 3. ties are resolved by larger Priority;
// 4. remaining ties keep the first rule order from config.
//
// Rail is mandatory per rule. Other match fields are optional.
type BatchOptimizationPolicy struct {
DefaultMode BatchOptimizationMode
Rules []BatchOptimizationRule
}
// BatchOptimizationRule defines one matching rule.
type BatchOptimizationRule struct {
ID string
Enabled *bool
Priority int
Mode BatchOptimizationMode
Match BatchOptimizationMatch
}
// BatchOptimizationMatch defines selectors for one rule.
type BatchOptimizationMatch struct {
Rail model.Rail
Providers []string
Networks []string
Currencies []string
Amount *BatchOptimizationAmountRange
}
// BatchOptimizationAmountRange defines optional amount matching boundaries.
type BatchOptimizationAmountRange struct {
Min string
Max string
Currency string
}
// BatchOptimizationContext is runtime data used for matching one batch item.
type BatchOptimizationContext struct {
Rail model.Rail
Provider string
Network string
Currency string
Amount string
}
// BatchOptimizationSelection is the selected optimization decision.
type BatchOptimizationSelection struct {
Mode BatchOptimizationMode
RuleID string
Matched bool
}
func defaultBatchOptimizationPolicy() BatchOptimizationPolicy {
return BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
}
}
func normalizeBatchOptimizationPolicy(in BatchOptimizationPolicy) BatchOptimizationPolicy {
out := defaultBatchOptimizationPolicy()
if normalizedDefault := normalizeBatchOptimizationMode(in.DefaultMode); normalizedDefault != BatchOptimizationModeUnspecified {
out.DefaultMode = normalizedDefault
}
if len(in.Rules) == 0 {
return out
}
rules := make([]BatchOptimizationRule, 0, len(in.Rules))
for i := range in.Rules {
rule, ok := normalizeBatchOptimizationRule(in.Rules[i])
if !ok {
continue
}
rules = append(rules, rule)
}
if len(rules) > 0 {
out.Rules = rules
}
return out
}
func normalizeBatchOptimizationRule(in BatchOptimizationRule) (BatchOptimizationRule, bool) {
out := BatchOptimizationRule{
ID: strings.TrimSpace(in.ID),
Enabled: in.Enabled,
Priority: in.Priority,
Mode: normalizeBatchOptimizationMode(in.Mode),
}
if out.Mode == BatchOptimizationModeUnspecified {
return BatchOptimizationRule{}, false
}
match, ok := normalizeBatchOptimizationMatch(in.Match)
if !ok {
return BatchOptimizationRule{}, false
}
out.Match = match
return out, true
}
func normalizeBatchOptimizationMatch(in BatchOptimizationMatch) (BatchOptimizationMatch, bool) {
out := BatchOptimizationMatch{
Rail: model.ParseRail(string(in.Rail)),
Providers: normalizeOptimizationTokens(in.Providers),
Networks: normalizeOptimizationTokens(in.Networks),
Currencies: normalizeOptimizationCurrencies(in.Currencies),
}
if out.Rail == discovery.RailUnspecified {
return BatchOptimizationMatch{}, false
}
if in.Amount != nil {
normalizedAmount, ok := normalizeBatchOptimizationAmountRange(*in.Amount)
if !ok {
return BatchOptimizationMatch{}, false
}
out.Amount = &normalizedAmount
}
return out, true
}
func normalizeBatchOptimizationAmountRange(in BatchOptimizationAmountRange) (BatchOptimizationAmountRange, bool) {
out := BatchOptimizationAmountRange{
Min: strings.TrimSpace(in.Min),
Max: strings.TrimSpace(in.Max),
Currency: normalizeOptimizationCurrency(in.Currency),
}
if out.Min == "" && out.Max == "" {
return BatchOptimizationAmountRange{}, false
}
var (
min decimal.Decimal
max decimal.Decimal
err error
)
if out.Min != "" {
min, err = decimal.NewFromString(out.Min)
if err != nil {
return BatchOptimizationAmountRange{}, false
}
}
if out.Max != "" {
max, err = decimal.NewFromString(out.Max)
if err != nil {
return BatchOptimizationAmountRange{}, false
}
}
if out.Min != "" && out.Max != "" && min.GreaterThan(max) {
return BatchOptimizationAmountRange{}, false
}
return out, true
}
func (p BatchOptimizationPolicy) Select(in BatchOptimizationContext) BatchOptimizationSelection {
policy := normalizeBatchOptimizationPolicy(p)
ctx := normalizeBatchOptimizationContext(in)
selected := BatchOptimizationSelection{
Mode: policy.DefaultMode,
}
bestSpecificity := -1
bestPriority := 0
bestOrder := len(policy.Rules) + 1
for i := range policy.Rules {
rule := policy.Rules[i]
if rule.Enabled != nil && !*rule.Enabled {
continue
}
specificity, ok := rule.matches(ctx)
if !ok {
continue
}
if specificity > bestSpecificity ||
(specificity == bestSpecificity && rule.Priority > bestPriority) ||
(specificity == bestSpecificity && rule.Priority == bestPriority && i < bestOrder) {
selected = BatchOptimizationSelection{
Mode: rule.Mode,
RuleID: rule.ID,
Matched: true,
}
bestSpecificity = specificity
bestPriority = rule.Priority
bestOrder = i
}
}
return selected
}
func (r BatchOptimizationRule) matches(ctx BatchOptimizationContext) (int, bool) {
match := r.Match
if match.Rail == discovery.RailUnspecified || ctx.Rail == discovery.RailUnspecified {
return 0, false
}
if match.Rail != ctx.Rail {
return 0, false
}
specificity := 0
if len(match.Providers) > 0 {
specificity++
if !containsOptimizationToken(match.Providers, ctx.Provider) {
return 0, false
}
}
if len(match.Networks) > 0 {
specificity++
if !containsOptimizationToken(match.Networks, ctx.Network) {
return 0, false
}
}
if len(match.Currencies) > 0 {
specificity++
if !containsOptimizationToken(match.Currencies, ctx.Currency) {
return 0, false
}
}
if match.Amount != nil {
specificity++
if !amountRangeMatches(*match.Amount, ctx.Amount, ctx.Currency) {
return 0, false
}
}
return specificity, true
}
func amountRangeMatches(rule BatchOptimizationAmountRange, amountRaw string, currencyRaw string) bool {
value, err := decimal.NewFromString(strings.TrimSpace(amountRaw))
if err != nil {
return false
}
if ruleCurrency := normalizeOptimizationCurrency(rule.Currency); ruleCurrency != "" && ruleCurrency != normalizeOptimizationCurrency(currencyRaw) {
return false
}
if rule.Min != "" {
min, parseErr := decimal.NewFromString(rule.Min)
if parseErr != nil || value.LessThan(min) {
return false
}
}
if rule.Max != "" {
max, parseErr := decimal.NewFromString(rule.Max)
if parseErr != nil || value.GreaterThan(max) {
return false
}
}
return true
}
func normalizeBatchOptimizationContext(in BatchOptimizationContext) BatchOptimizationContext {
return BatchOptimizationContext{
Rail: model.ParseRail(string(in.Rail)),
Provider: normalizeOptimizationToken(in.Provider),
Network: normalizeOptimizationToken(in.Network),
Currency: normalizeOptimizationCurrency(in.Currency),
Amount: strings.TrimSpace(in.Amount),
}
}
func normalizeBatchOptimizationMode(raw BatchOptimizationMode) BatchOptimizationMode {
switch strings.ToLower(strings.TrimSpace(string(raw))) {
case "merge_by_destination", "merge-by-destination", "by_destination", "by-destination":
return BatchOptimizationModeMergeByDestination
case "no_optimization", "no-optimization", "none", "never_merge", "never-merge", "never":
return BatchOptimizationModeNoOptimization
default:
return BatchOptimizationModeUnspecified
}
}
func normalizeOptimizationTokens(values []string) []string {
if len(values) == 0 {
return nil
}
seen := make(map[string]struct{}, len(values))
out := make([]string, 0, len(values))
for i := range values {
token := normalizeOptimizationToken(values[i])
if token == "" {
continue
}
if _, exists := seen[token]; exists {
continue
}
seen[token] = struct{}{}
out = append(out, token)
}
if len(out) == 0 {
return nil
}
return out
}
func normalizeOptimizationCurrencies(values []string) []string {
if len(values) == 0 {
return nil
}
seen := make(map[string]struct{}, len(values))
out := make([]string, 0, len(values))
for i := range values {
token := normalizeOptimizationCurrency(values[i])
if token == "" {
continue
}
if _, exists := seen[token]; exists {
continue
}
seen[token] = struct{}{}
out = append(out, token)
}
if len(out) == 0 {
return nil
}
return out
}
func normalizeOptimizationToken(value string) string {
return strings.ToUpper(strings.TrimSpace(value))
}
func normalizeOptimizationCurrency(value string) string {
return strings.ToUpper(strings.TrimSpace(value))
}
func containsOptimizationToken(values []string, value string) bool {
if len(values) == 0 {
return false
}
normalized := normalizeOptimizationToken(value)
if normalized == "" {
return false
}
for i := range values {
if values[i] == normalized {
return true
}
}
return false
}

View File

@@ -0,0 +1,167 @@
package psvc
import (
"testing"
"github.com/tech/sendico/pkg/discovery"
)
func TestNormalizeBatchOptimizationPolicy_DefaultMode_NoOptimization(t *testing.T) {
got := normalizeBatchOptimizationPolicy(BatchOptimizationPolicy{})
if got.DefaultMode != BatchOptimizationModeNoOptimization {
t.Fatalf("default_mode mismatch: got=%q want=%q", got.DefaultMode, BatchOptimizationModeNoOptimization)
}
if got.Rules != nil {
t.Fatalf("expected no rules, got=%d", len(got.Rules))
}
}
func TestNormalizeBatchOptimizationPolicy_DropsInvalidRules(t *testing.T) {
got := normalizeBatchOptimizationPolicy(BatchOptimizationPolicy{
DefaultMode: "merge_by_destination",
Rules: []BatchOptimizationRule{
{
ID: "missing-rail",
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: "unknown",
},
},
{
ID: "invalid-mode",
Mode: "x",
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
{
ID: "valid",
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
},
})
if got.DefaultMode != BatchOptimizationModeMergeByDestination {
t.Fatalf("default_mode mismatch: got=%q want=%q", got.DefaultMode, BatchOptimizationModeMergeByDestination)
}
if len(got.Rules) != 1 {
t.Fatalf("rules count mismatch: got=%d want=%d", len(got.Rules), 1)
}
if got.Rules[0].ID != "valid" {
t.Fatalf("rule id mismatch: got=%q want=%q", got.Rules[0].ID, "valid")
}
}
func TestBatchOptimizationPolicy_Select_UsesBestSpecificityThenPriority(t *testing.T) {
policy := BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-generic",
Priority: 10,
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
{
ID: "crypto-network",
Priority: 5,
Mode: BatchOptimizationModeNoOptimization,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
Networks: []string{"tron_nile"},
},
},
{
ID: "crypto-network-priority",
Priority: 20,
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
Networks: []string{"tron_nile"},
},
},
},
}
out := policy.Select(BatchOptimizationContext{
Rail: discovery.RailCrypto,
Network: "TRON_NILE",
})
if !out.Matched {
t.Fatal("expected matched rule")
}
if out.RuleID != "crypto-network-priority" {
t.Fatalf("rule id mismatch: got=%q want=%q", out.RuleID, "crypto-network-priority")
}
if out.Mode != BatchOptimizationModeMergeByDestination {
t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeMergeByDestination)
}
}
func TestBatchOptimizationPolicy_Select_FallsBackToDefault(t *testing.T) {
policy := BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-only",
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
},
}
out := policy.Select(BatchOptimizationContext{Rail: discovery.RailCardPayout})
if out.Matched {
t.Fatalf("unexpected matched rule: %q", out.RuleID)
}
if out.Mode != BatchOptimizationModeNoOptimization {
t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeNoOptimization)
}
}
func TestBatchOptimizationPolicy_Select_MatchesAmountRange(t *testing.T) {
policy := BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-large-usdt",
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
Amount: &BatchOptimizationAmountRange{
Min: "100",
Currency: "USDT",
},
},
},
},
}
matched := policy.Select(BatchOptimizationContext{
Rail: discovery.RailCrypto,
Amount: "120",
Currency: "USDT",
})
if !matched.Matched {
t.Fatal("expected amount rule match")
}
notMatched := policy.Select(BatchOptimizationContext{
Rail: discovery.RailCrypto,
Amount: "50",
Currency: "USDT",
})
if notMatched.Matched {
t.Fatalf("expected no amount rule match, got rule=%q", notMatched.RuleID)
}
if notMatched.Mode != BatchOptimizationModeNoOptimization {
t.Fatalf("default mode mismatch: got=%q want=%q", notMatched.Mode, BatchOptimizationModeNoOptimization)
}
}

View File

@@ -32,21 +32,21 @@ type svc struct {
quoteStore qsnap.Store
validator reqval.Validator
idempotency idem.Service
quote qsnap.Resolver
aggregator opagg.Aggregator
aggregate agg.Factory
planner xplan.Compiler
state ostate.StateMachine
scheduler ssched.Runtime
executors sexec.Registry
reconciler erecon.Reconciler
repository prepo.Repository
query pquery.Service
mapper prmap.Mapper
observer oobs.Observer
statuses paymentStatusPublisher
validator reqval.Validator
idempotency idem.Service
quote qsnap.Resolver
batchOptimizer BatchOptimizer
aggregate agg.Factory
planner xplan.Compiler
state ostate.StateMachine
scheduler ssched.Runtime
executors sexec.Registry
reconciler erecon.Reconciler
repository prepo.Repository
query pquery.Service
mapper prmap.Mapper
observer oobs.Observer
statuses paymentStatusPublisher
retryPolicy ssched.RetryPolicy
now func() time.Time
@@ -93,21 +93,21 @@ func newService(deps Dependencies) (Service, error) {
quoteStore: deps.QuoteStore,
validator: firstValidator(deps.Validator, logger),
idempotency: firstIdempotency(deps.Idempotency, logger),
quote: firstQuoteResolver(deps.Quote, logger),
aggregator: firstAggregator(deps.Aggregator, logger),
aggregate: firstAggregateFactory(deps.Aggregate, logger),
planner: firstPlanCompiler(deps.Planner, logger),
state: firstStateMachine(deps.State, logger),
scheduler: firstScheduler(deps.Scheduler, logger),
executors: firstExecutors(deps.Executors, logger),
reconciler: firstReconciler(deps.Reconciler, logger),
repository: deps.Repository,
query: query,
mapper: firstMapper(deps.Mapper, logger),
observer: observer,
statuses: newPaymentStatusPublisher(logger, deps.Producer),
validator: firstValidator(deps.Validator, logger),
idempotency: firstIdempotency(deps.Idempotency, logger),
quote: firstQuoteResolver(deps.Quote, logger),
batchOptimizer: firstBatchOptimizer(deps.BatchOptimizer, deps.BatchOptimizationPolicy, deps.Aggregator, logger),
aggregate: firstAggregateFactory(deps.Aggregate, logger),
planner: firstPlanCompiler(deps.Planner, logger),
state: firstStateMachine(deps.State, logger),
scheduler: firstScheduler(deps.Scheduler, logger),
executors: firstExecutors(deps.Executors, logger),
reconciler: firstReconciler(deps.Reconciler, logger),
repository: deps.Repository,
query: query,
mapper: firstMapper(deps.Mapper, logger),
observer: observer,
statuses: newPaymentStatusPublisher(logger, deps.Producer),
retryPolicy: deps.RetryPolicy,
now: deps.Now,
@@ -153,6 +153,17 @@ func firstAggregateFactory(v agg.Factory, logger mlogger.Logger) agg.Factory {
return agg.New(agg.Dependencies{Logger: logger})
}
func firstBatchOptimizer(v BatchOptimizer, policy BatchOptimizationPolicy, aggregator opagg.Aggregator, logger mlogger.Logger) BatchOptimizer {
if v != nil {
return v
}
return NewPolicyBatchOptimizer(PolicyBatchOptimizerDependencies{
Logger: logger,
Aggregator: firstAggregator(aggregator, logger),
Policy: policy,
})
}
func firstAggregator(v opagg.Aggregator, logger mlogger.Logger) opagg.Aggregator {
if v != nil {
return v

View File

@@ -366,6 +366,14 @@ type testEnv struct {
}
func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error)) *testEnv {
return newTestEnvWithPolicy(t, BatchOptimizationPolicy{}, handler)
}
func newTestEnvWithPolicy(
t *testing.T,
policy BatchOptimizationPolicy,
handler func(kind string, req sexec.StepRequest) (*sexec.ExecuteOutput, error),
) *testEnv {
t.Helper()
repo := newMemoryRepo(func() time.Time {
return time.Now().UTC()
@@ -388,13 +396,14 @@ func newTestEnv(t *testing.T, handler func(kind string, req sexec.StepRequest) (
producer := &capturingProducer{}
svc, err := New(Dependencies{
QuoteStore: quotes,
Repository: repo,
Executors: registry,
Observer: observer,
Producer: producer,
RetryPolicy: ssched.RetryPolicy{MaxAttempts: 2},
MaxTicks: 20,
QuoteStore: quotes,
Repository: repo,
Executors: registry,
Observer: observer,
Producer: producer,
RetryPolicy: ssched.RetryPolicy{MaxAttempts: 2},
BatchOptimizationPolicy: policy,
MaxTicks: 20,
})
if err != nil {
t.Fatalf("New returned error: %v", err)

View File

@@ -7,6 +7,7 @@ import (
chainclient "github.com/tech/sendico/gateway/chain/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
"github.com/tech/sendico/payments/storage/model"
clockpkg "github.com/tech/sendico/pkg/clock"
"github.com/tech/sendico/pkg/discovery"
@@ -108,6 +109,16 @@ func WithGatewayRegistry(registry GatewayRegistry) Option {
}
}
// WithBatchOptimizationPolicy configures policy-based batch optimizer behavior.
func WithBatchOptimizationPolicy(policy psvc.BatchOptimizationPolicy) Option {
return func(s *Service) {
if s == nil {
return
}
s.batchOptimizationPolicy = policy
}
}
type discoveryGatewayRegistry struct {
registry *discovery.Registry
}

View File

@@ -24,13 +24,14 @@ type Service struct {
paymentRepo prepo.Repository
producer msg.Producer
ledgerClient ledgerclient.Client
gatewayInvokeResolver GatewayInvokeResolver
gatewayRegistry GatewayRegistry
cardGatewayRoutes map[string]CardGatewayRoute
paymentGatewayBroker mb.Broker
gatewayConsumers []msg.Consumer
stopExternalWorkers context.CancelFunc
ledgerClient ledgerclient.Client
gatewayInvokeResolver GatewayInvokeResolver
gatewayRegistry GatewayRegistry
cardGatewayRoutes map[string]CardGatewayRoute
batchOptimizationPolicy psvc.BatchOptimizationPolicy
paymentGatewayBroker mb.Broker
gatewayConsumers []msg.Consumer
stopExternalWorkers context.CancelFunc
}
// NewService constructs the v2 orchestrator service.
@@ -53,11 +54,12 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
var err error
svc.v2, svc.paymentRepo, err = newOrchestrationV2Service(svc.logger, repo, v2RuntimeDeps{
LedgerClient: svc.ledgerClient,
GatewayInvokeResolver: svc.gatewayInvokeResolver,
GatewayRegistry: svc.gatewayRegistry,
CardGatewayRoutes: svc.cardGatewayRoutes,
Producer: svc.producer,
LedgerClient: svc.ledgerClient,
GatewayInvokeResolver: svc.gatewayInvokeResolver,
GatewayRegistry: svc.gatewayRegistry,
CardGatewayRoutes: svc.cardGatewayRoutes,
BatchOptimizationPolicy: svc.batchOptimizationPolicy,
Producer: svc.producer,
})
svc.startExternalRuntime()
if err != nil {

View File

@@ -25,11 +25,12 @@ type v2MongoDBProvider interface {
}
type v2RuntimeDeps struct {
LedgerClient ledgerclient.Client
GatewayInvokeResolver GatewayInvokeResolver
GatewayRegistry GatewayRegistry
CardGatewayRoutes map[string]CardGatewayRoute
Producer msg.Producer
LedgerClient ledgerclient.Client
GatewayInvokeResolver GatewayInvokeResolver
GatewayRegistry GatewayRegistry
CardGatewayRoutes map[string]CardGatewayRoute
BatchOptimizationPolicy psvc.BatchOptimizationPolicy
Producer msg.Producer
}
func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository, runtimeDeps v2RuntimeDeps) (psvc.Service, prepo.Repository, error) {
@@ -70,13 +71,14 @@ func newOrchestrationV2Service(logger mlogger.Logger, repo storage.Repository, r
executors := buildOrchestrationV2Executors(logger, runtimeDeps)
svc, err := psvc.New(psvc.Dependencies{
Logger: logger.Named("v2"),
QuoteStore: repo.Quotes(),
Repository: paymentRepo,
Query: query,
Observer: observer,
Executors: executors,
Producer: runtimeDeps.Producer,
Logger: logger.Named("v2"),
QuoteStore: repo.Quotes(),
Repository: paymentRepo,
Query: query,
Observer: observer,
Executors: executors,
BatchOptimizationPolicy: runtimeDeps.BatchOptimizationPolicy,
Producer: runtimeDeps.Producer,
})
if err != nil {
logger.Error("Orchestration v2 disabled: service init failed", zap.Error(err))