Orchestrator refactoring + planned amounts

This commit is contained in:
Stephan D
2026-03-11 20:04:10 +01:00
parent 208b4283d0
commit f578278205
111 changed files with 2485 additions and 1517 deletions

View File

@@ -44,33 +44,37 @@ const (
// StepShell defines one initial step telemetry item.
type StepShell struct {
StepRef string `bson:"stepRef" json:"stepRef"`
StepCode string `bson:"stepCode" json:"stepCode"`
Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"`
Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"`
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"`
UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"`
StepRef string `bson:"stepRef" json:"stepRef"`
StepCode string `bson:"stepCode" json:"stepCode"`
Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"`
Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"`
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"`
UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"`
PlannedMoney *paymenttypes.Money `bson:"plannedMoney,omitempty" json:"plannedMoney,omitempty"`
PlannedConvertedMoney *paymenttypes.Money `bson:"plannedConvertedMoney,omitempty" json:"plannedConvertedMoney,omitempty"`
}
// StepExecution is runtime telemetry for one step.
type StepExecution struct {
StepRef string `bson:"stepRef" json:"stepRef"`
StepCode string `bson:"stepCode" json:"stepCode"`
Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"`
Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"`
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"`
UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"`
State StepState `bson:"state" json:"state"`
Attempt uint32 `bson:"attempt" json:"attempt"`
StartedAt *time.Time `bson:"startedAt,omitempty" json:"startedAt,omitempty"`
CompletedAt *time.Time `bson:"completedAt,omitempty" json:"completedAt,omitempty"`
FailureCode string `bson:"failureCode,omitempty" json:"failureCode,omitempty"`
FailureMsg string `bson:"failureMsg,omitempty" json:"failureMsg,omitempty"`
ExternalRefs []ExternalRef `bson:"externalRefs,omitempty" json:"externalRefs,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executedMoney,omitempty"`
ConvertedMoney *paymenttypes.Money `bson:"convertedMoney,omitempty" json:"convertedMoney,omitempty"`
StepRef string `bson:"stepRef" json:"stepRef"`
StepCode string `bson:"stepCode" json:"stepCode"`
Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"`
Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"`
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"`
UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"`
State StepState `bson:"state" json:"state"`
Attempt uint32 `bson:"attempt" json:"attempt"`
StartedAt *time.Time `bson:"startedAt,omitempty" json:"startedAt,omitempty"`
CompletedAt *time.Time `bson:"completedAt,omitempty" json:"completedAt,omitempty"`
FailureCode string `bson:"failureCode,omitempty" json:"failureCode,omitempty"`
FailureMsg string `bson:"failureMsg,omitempty" json:"failureMsg,omitempty"`
ExternalRefs []ExternalRef `bson:"externalRefs,omitempty" json:"externalRefs,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executedMoney,omitempty"`
ConvertedMoney *paymenttypes.Money `bson:"convertedMoney,omitempty" json:"convertedMoney,omitempty"`
PlannedMoney *paymenttypes.Money `bson:"plannedMoney,omitempty" json:"plannedMoney,omitempty"`
PlannedConvertedMoney *paymenttypes.Money `bson:"plannedConvertedMoney,omitempty" json:"plannedConvertedMoney,omitempty"`
}
// ExternalRef links step execution to an external operation.

View File

@@ -4,6 +4,7 @@ import (
"strings"
"time"
svcshared "github.com/tech/sendico/payments/orchestrator/internal/service/shared"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/db/storable"
"github.com/tech/sendico/pkg/merrors"
@@ -148,15 +149,17 @@ func buildInitialStepTelemetry(shell []StepShell) ([]StepExecution, error) {
instanceID := strings.TrimSpace(shell[i].InstanceID)
out = append(out, StepExecution{
StepRef: stepRef,
StepCode: stepCode,
Rail: railValue,
Gateway: gatewayID,
InstanceID: instanceID,
ReportVisibility: visibility,
UserLabel: userLabel,
State: StepStatePending,
Attempt: 1,
StepRef: stepRef,
StepCode: stepCode,
Rail: railValue,
Gateway: gatewayID,
InstanceID: instanceID,
ReportVisibility: visibility,
UserLabel: userLabel,
State: StepStatePending,
Attempt: 1,
PlannedMoney: svcshared.CloneMoneyTrimNonEmpty(shell[i].PlannedMoney),
PlannedConvertedMoney: svcshared.CloneMoneyTrimNonEmpty(shell[i].PlannedConvertedMoney),
})
}
return out, nil

View File

@@ -5,6 +5,8 @@ import (
"time"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oshared"
svcshared "github.com/tech/sendico/payments/orchestrator/internal/service/shared"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
@@ -123,9 +125,9 @@ func normalizeGatewayEvent(src GatewayEvent) (*normalizedEvent, error) {
ev := &normalizedEvent{
stepRef: strings.TrimSpace(src.StepRef),
targetState: target,
failureInfo: buildFailureInfo(failureCode, failureMsg, normalizeTimePtr(src.OccurredAt)),
failureInfo: buildFailureInfo(failureCode, failureMsg, oshared.CloneTimeUTC(src.OccurredAt)),
forceAggregate: buildForceAggregate(src.TerminalFailure, needsAttention),
executedMoney: normalizeEventMoney(src.ExecutedMoney),
executedMoney: svcshared.CloneMoneyTrimNonEmpty(src.ExecutedMoney),
}
ev.matchRefs = normalizeRefList([]agg.ExternalRef{
{
@@ -162,7 +164,7 @@ func normalizeLedgerEvent(src LedgerEvent) (*normalizedEvent, error) {
ev := &normalizedEvent{
stepRef: strings.TrimSpace(src.StepRef),
targetState: target,
failureInfo: buildFailureInfo(failureCode, failureMsg, normalizeTimePtr(src.OccurredAt)),
failureInfo: buildFailureInfo(failureCode, failureMsg, oshared.CloneTimeUTC(src.OccurredAt)),
forceAggregate: buildForceAggregate(src.TerminalFailure, needsAttention),
}
ev.matchRefs = normalizeRefList([]agg.ExternalRef{
@@ -194,7 +196,7 @@ func normalizeCardEvent(src CardEvent) (*normalizedEvent, error) {
ev := &normalizedEvent{
stepRef: strings.TrimSpace(src.StepRef),
targetState: target,
failureInfo: buildFailureInfo(failureCode, failureMsg, normalizeTimePtr(src.OccurredAt)),
failureInfo: buildFailureInfo(failureCode, failureMsg, oshared.CloneTimeUTC(src.OccurredAt)),
forceAggregate: buildForceAggregate(src.TerminalFailure, needsAttention),
}
ev.matchRefs = normalizeRefList([]agg.ExternalRef{
@@ -268,18 +270,7 @@ func normalizeCardStatus(status CardStatus) (CardStatus, bool) {
}
func normalizeEventMoney(money *paymenttypes.Money) *paymenttypes.Money {
if money == nil {
return nil
}
amount := strings.TrimSpace(money.GetAmount())
currency := strings.TrimSpace(money.GetCurrency())
if amount == "" || currency == "" {
return nil
}
return &paymenttypes.Money{
Amount: amount,
Currency: currency,
}
return svcshared.CloneMoneyTrimNonEmpty(money)
}
func mapFailureTarget(status any, retryable *bool) (agg.StepState, bool) {
@@ -302,14 +293,6 @@ func mapFailureTarget(status any, retryable *bool) (agg.StepState, bool) {
}
}
func normalizeTimePtr(ts *time.Time) *time.Time {
if ts == nil {
return nil
}
val := ts.UTC()
return &val
}
func normalizeRefList(refs []agg.ExternalRef) []agg.ExternalRef {
if len(refs) == 0 {
return nil

View File

@@ -197,3 +197,57 @@ func TestAggregate_UserBatchQuoteSampleCompactsToSingleRecipientOperation(t *tes
t.Fatalf("aggregated items mismatch: got=%q want=%q", got, want)
}
}
func TestAggregate_MergeByRecipient_IgnoresSourceAndQuoteRefs(t *testing.T) {
aggregator := New()
firstIntent := sampleIntent("intent-a", "card-1", "100")
secondIntent := sampleIntent("intent-b", "card-1", "125")
secondIntent.Source.ManagedWallet.ManagedWalletRef = "src-wallet-2"
firstQuote := sampleQuote("quote-a", "100", "9150", "1.8")
secondQuote := sampleQuote("quote-b", "125", "11437.5", "1.8")
secondQuote.Route.RouteRef = "route-b"
secondQuote.FXQuote.QuoteRef = "fx-b"
secondQuote.FXQuote.RateRef = "rate-b"
secondQuote.FXQuote.Price.Value = "92.7"
in := Input{
Items: []Item{
{
IntentSnapshot: firstIntent,
QuoteSnapshot: firstQuote,
MergeMode: MergeModeByRecipient,
},
{
IntentSnapshot: secondIntent,
QuoteSnapshot: secondQuote,
MergeMode: MergeModeByRecipient,
},
},
}
out, err := aggregator.Aggregate(in)
if err != nil {
t.Fatalf("Aggregate returned error: %v", err)
}
if out == nil {
t.Fatal("expected output")
}
if got, want := len(out.Groups), 1; got != want {
t.Fatalf("groups count mismatch: got=%d want=%d", got, want)
}
group := out.Groups[0]
if got, want := group.IntentSnapshot.Amount.Amount, "225"; got != want {
t.Fatalf("intent amount mismatch: got=%q want=%q", got, want)
}
if group.QuoteSnapshot == nil {
t.Fatal("expected quote snapshot")
}
if got, want := group.QuoteSnapshot.DebitAmount.Amount, "225"; got != want {
t.Fatalf("debit amount mismatch: got=%q want=%q", got, want)
}
if got, want := group.QuoteSnapshot.ExpectedSettlementAmount.Amount, "20587.5"; got != want {
t.Fatalf("settlement amount mismatch: got=%q want=%q", got, want)
}
}

View File

@@ -219,6 +219,33 @@ func cloneStringSlice(src []string) []string {
return out
}
func cloneGroupMembers(src []GroupMember) []GroupMember {
if len(src) == 0 {
return nil
}
out := make([]GroupMember, 0, len(src))
for i := range src {
member := src[i]
intentSnapshot, err := cloneIntentSnapshot(member.IntentSnapshot)
if err != nil {
continue
}
quoteSnapshot, err := cloneQuoteSnapshot(member.QuoteSnapshot)
if err != nil {
continue
}
out = append(out, GroupMember{
IntentRef: strings.TrimSpace(member.IntentRef),
IntentSnapshot: intentSnapshot,
QuoteSnapshot: quoteSnapshot,
})
}
if len(out) == 0 {
return nil
}
return out
}
func cloneIntentSnapshot(src model.PaymentIntent) (model.PaymentIntent, error) {
var dst model.PaymentIntent
if err := bsonClone(src, &dst); err != nil {

View File

@@ -95,8 +95,6 @@ func mergeQuoteSnapshot(dst *model.PaymentQuoteSnapshot, src *model.PaymentQuote
if strings.TrimSpace(dst.QuoteRef) == "" {
dst.QuoteRef = strings.TrimSpace(src.QuoteRef)
} else if srcRef := strings.TrimSpace(src.QuoteRef); srcRef != "" && dst.QuoteRef != srcRef {
return merrors.InvalidArgument("quote_snapshot.quote_ref mismatch")
}
return nil

View File

@@ -3,7 +3,6 @@ package opagg
import (
"strings"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
@@ -14,9 +13,7 @@ func mergeRoute(dst, src *paymenttypes.QuoteRouteSpecification) (*paymenttypes.Q
if src == nil {
return dst, nil
}
if routeSignature(dst) != routeSignature(src) {
return nil, merrors.InvalidArgument("quote_snapshot.route mismatch")
}
// Aggregation is destination-driven; keep the first route as execution template.
return dst, nil
}
@@ -27,9 +24,6 @@ func mergeFXQuote(dst, src *paymenttypes.FXQuote) (*paymenttypes.FXQuote, error)
if src == nil {
return dst, nil
}
if fxQuoteSignature(dst) != fxQuoteSignature(src) {
return nil, merrors.InvalidArgument("quote_snapshot.fx_quote mismatch")
}
var err error
dst.BaseAmount, err = mergeMoney(dst.BaseAmount, src.BaseAmount, "quote_snapshot.fx_quote.base_amount")
if err != nil {

View File

@@ -31,15 +31,24 @@ type Item struct {
IntentSnapshot model.PaymentIntent
QuoteSnapshot *model.PaymentQuoteSnapshot
MergeMode MergeMode
MergeKey string
PolicyTag string
}
// GroupMember preserves one original batch member inside an aggregated group.
type GroupMember struct {
IntentRef string
IntentSnapshot model.PaymentIntent
QuoteSnapshot *model.PaymentQuoteSnapshot
}
// Group is one aggregated recipient operation group.
type Group struct {
RecipientKey string
IntentRefs []string
IntentSnapshot model.PaymentIntent
QuoteSnapshot *model.PaymentQuoteSnapshot
Members []GroupMember
}
// Output is the aggregation result.

View File

@@ -27,6 +27,7 @@ type groupAccumulator struct {
intentRefs []string
intent model.PaymentIntent
quote *model.PaymentQuoteSnapshot
members []GroupMember
}
func (s *svc) Aggregate(in Input) (out *Output, err error) {
@@ -90,12 +91,23 @@ func (s *svc) Aggregate(in Input) (out *Output, err error) {
if quoteSnapshot == nil {
return nil, merrors.InvalidArgument("items[" + itoa(i) + "].quote_snapshot is required")
}
member, memberErr := makeGroupMember(intentRef, item.IntentSnapshot, item.QuoteSnapshot)
if memberErr != nil {
return nil, memberErr
}
groupRecipientKey := recipientKey
if normalizeMergeMode(item.MergeMode) == MergeModeByRecipient {
if override := strings.TrimSpace(item.MergeKey); override != "" {
groupRecipientKey = override
}
}
groups[key] = &groupAccumulator{
recipientKey: recipientKey,
recipientKey: groupRecipientKey,
intentRefs: []string{intentRef},
intent: intentSnapshot,
quote: quoteSnapshot,
members: []GroupMember{member},
}
order = append(order, key)
continue
@@ -108,6 +120,11 @@ func (s *svc) Aggregate(in Input) (out *Output, err error) {
return nil, merrors.InvalidArgument("items[" + itoa(i) + "]: " + err.Error())
}
acc.intentRefs = append(acc.intentRefs, intentRef)
member, memberErr := makeGroupMember(intentRef, item.IntentSnapshot, item.QuoteSnapshot)
if memberErr != nil {
return nil, memberErr
}
acc.members = append(acc.members, member)
}
out = &Output{
@@ -139,6 +156,7 @@ func (s *svc) Aggregate(in Input) (out *Output, err error) {
IntentRefs: cloneStringSlice(acc.intentRefs),
IntentSnapshot: finalIntent,
QuoteSnapshot: finalQuote,
Members: cloneGroupMembers(acc.members),
})
}
@@ -165,15 +183,28 @@ func validateItem(item Item) error {
}
func groupingKey(item Item) (string, string, error) {
sourceKey, err := endpointKey(item.IntentSnapshot.Source)
if err != nil {
return "", "", merrors.InvalidArgument("intent_snapshot.source: " + err.Error())
}
recipientKey, err := endpointKey(item.IntentSnapshot.Destination)
if err != nil {
return "", "", merrors.InvalidArgument("intent_snapshot.destination: " + err.Error())
}
if normalizeMergeMode(item.MergeMode) == MergeModeByRecipient {
mergeKey := strings.TrimSpace(item.MergeKey)
if mergeKey == "" {
mergeKey = recipientKey
}
key := strings.Join([]string{
"kind=" + strings.ToLower(strings.TrimSpace(string(item.IntentSnapshot.Kind))),
"merge_key=" + mergeKey,
}, keySep)
return key, recipientKey, nil
}
sourceKey, err := endpointKey(item.IntentSnapshot.Source)
if err != nil {
return "", "", merrors.InvalidArgument("intent_snapshot.source: " + err.Error())
}
quote := item.QuoteSnapshot
key := strings.Join([]string{
"kind=" + strings.ToLower(strings.TrimSpace(string(item.IntentSnapshot.Kind))),
@@ -190,6 +221,22 @@ func groupingKey(item Item) (string, string, error) {
return key, recipientKey, nil
}
func makeGroupMember(intentRef string, intent model.PaymentIntent, quote *model.PaymentQuoteSnapshot) (GroupMember, error) {
intentSnapshot, err := cloneIntentSnapshot(intent)
if err != nil {
return GroupMember{}, err
}
quoteSnapshot, err := cloneQuoteSnapshot(quote)
if err != nil {
return GroupMember{}, err
}
return GroupMember{
IntentRef: strings.TrimSpace(intentRef),
IntentSnapshot: intentSnapshot,
QuoteSnapshot: quoteSnapshot,
}, nil
}
func isBatchingEligible(quote *model.PaymentQuoteSnapshot) bool {
if quote == nil || quote.ExecutionConditions == nil {
return true

View File

@@ -0,0 +1,31 @@
package oshared
import (
"strings"
"time"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
)
func CloneExternalRefs(refs []agg.ExternalRef) []agg.ExternalRef {
if len(refs) == 0 {
return nil
}
out := make([]agg.ExternalRef, 0, len(refs))
for i := range refs {
ref := refs[i]
ref.GatewayInstanceID = strings.TrimSpace(ref.GatewayInstanceID)
ref.Kind = strings.TrimSpace(ref.Kind)
ref.Ref = strings.TrimSpace(ref.Ref)
out = append(out, ref)
}
return out
}
func CloneTimeUTC(ts *time.Time) *time.Time {
if ts == nil {
return nil
}
val := ts.UTC()
return &val
}

View File

@@ -2,14 +2,14 @@ package prepo
import (
"strings"
"time"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oshared"
svcshared "github.com/tech/sendico/payments/orchestrator/internal/service/shared"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/db/storable"
pm "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.mongodb.org/mongo-driver/v2/bson"
)
@@ -108,50 +108,14 @@ func cloneStepExecutions(src []agg.StepExecution) []agg.StepExecution {
if step.Attempt == 0 {
step.Attempt = 1
}
step.ExternalRefs = cloneExternalRefs(step.ExternalRefs)
step.ExecutedMoney = cloneStepMoney(step.ExecutedMoney)
step.ConvertedMoney = cloneStepMoney(step.ConvertedMoney)
step.StartedAt = cloneTime(step.StartedAt)
step.CompletedAt = cloneTime(step.CompletedAt)
step.ExternalRefs = oshared.CloneExternalRefs(step.ExternalRefs)
step.ExecutedMoney = svcshared.CloneMoneyTrimNonEmpty(step.ExecutedMoney)
step.ConvertedMoney = svcshared.CloneMoneyTrimNonEmpty(step.ConvertedMoney)
step.PlannedMoney = svcshared.CloneMoneyTrimNonEmpty(step.PlannedMoney)
step.PlannedConvertedMoney = svcshared.CloneMoneyTrimNonEmpty(step.PlannedConvertedMoney)
step.StartedAt = oshared.CloneTimeUTC(step.StartedAt)
step.CompletedAt = oshared.CloneTimeUTC(step.CompletedAt)
out = append(out, step)
}
return out
}
func cloneStepMoney(money *paymenttypes.Money) *paymenttypes.Money {
if money == nil {
return nil
}
amount := strings.TrimSpace(money.GetAmount())
currency := strings.TrimSpace(money.GetCurrency())
if amount == "" || currency == "" {
return nil
}
return &paymenttypes.Money{
Amount: amount,
Currency: currency,
}
}
func cloneExternalRefs(refs []agg.ExternalRef) []agg.ExternalRef {
if len(refs) == 0 {
return nil
}
out := make([]agg.ExternalRef, 0, len(refs))
for i := range refs {
ref := refs[i]
ref.GatewayInstanceID = strings.TrimSpace(ref.GatewayInstanceID)
ref.Kind = strings.TrimSpace(ref.Kind)
ref.Ref = strings.TrimSpace(ref.Ref)
out = append(out, ref)
}
return out
}
func cloneTime(ts *time.Time) *time.Time {
if ts == nil {
return nil
}
val := ts.UTC()
return &val
}

View File

@@ -118,23 +118,26 @@ func TestMap_Success(t *testing.T) {
if got, want := steps[0].GetUserLabel(), "Card payout"; got != want {
t.Fatalf("user_label mismatch: got=%q want=%q", got, want)
}
if steps[0].GetExecutedMoney() == nil {
t.Fatal("expected executed_money to be mapped")
if steps[0].GetMoney() == nil {
t.Fatal("expected structured money envelope to be mapped")
}
if got, want := steps[0].GetExecutedMoney().GetAmount(), "95"; got != want {
t.Fatalf("executed_money.amount mismatch: got=%q want=%q", got, want)
if steps[0].GetMoney().GetPlanned() == nil {
t.Fatal("expected structured planned money snapshot")
}
if got, want := steps[0].GetExecutedMoney().GetCurrency(), "USD"; got != want {
t.Fatalf("executed_money.currency mismatch: got=%q want=%q", got, want)
if got, want := steps[0].GetMoney().GetPlanned().GetAmount().GetAmount(), "96"; got != want {
t.Fatalf("money.planned.amount.amount mismatch: got=%q want=%q", got, want)
}
if steps[0].GetConvertedMoney() == nil {
t.Fatal("expected converted_money to be mapped")
if got, want := steps[0].GetMoney().GetPlanned().GetConvertedAmount().GetAmount(), "91"; got != want {
t.Fatalf("money.planned.converted_amount.amount mismatch: got=%q want=%q", got, want)
}
if got, want := steps[0].GetConvertedMoney().GetAmount(), "90"; got != want {
t.Fatalf("converted_money.amount mismatch: got=%q want=%q", got, want)
if steps[0].GetMoney().GetExecuted() == nil {
t.Fatal("expected structured executed money snapshot")
}
if got, want := steps[0].GetConvertedMoney().GetCurrency(), "EUR"; got != want {
t.Fatalf("converted_money.currency mismatch: got=%q want=%q", got, want)
if got, want := steps[0].GetMoney().GetExecuted().GetAmount().GetAmount(), "95"; got != want {
t.Fatalf("money.executed.amount.amount mismatch: got=%q want=%q", got, want)
}
if got, want := steps[0].GetMoney().GetExecuted().GetConvertedAmount().GetAmount(), "90"; got != want {
t.Fatalf("money.executed.converted_amount.amount mismatch: got=%q want=%q", got, want)
}
if got, want := steps[1].GetReportVisibility(), orchestrationv2.ReportVisibility_REPORT_VISIBILITY_HIDDEN; got != want {
t.Fatalf("report_visibility mismatch: got=%s want=%s", got.String(), want.String())
@@ -389,6 +392,14 @@ func newPaymentFixture() *agg.Payment {
Amount: "90",
Currency: "EUR",
},
PlannedMoney: &paymenttypes.Money{
Amount: "96",
Currency: "USD",
},
PlannedConvertedMoney: &paymenttypes.Money{
Amount: "91",
Currency: "EUR",
},
StartedAt: &startedAt,
},
{

View File

@@ -6,6 +6,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/pkg/merrors"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
)
@@ -35,6 +36,12 @@ func mapStepExecution(step agg.StepExecution, index int) (*orchestrationv2.StepE
attempt = 1
}
plannedAmount := moneyToProto(step.PlannedMoney)
plannedConverted := moneyToProto(step.PlannedConvertedMoney)
executedAmount := moneyToProto(step.ExecutedMoney)
executedConverted := moneyToProto(step.ConvertedMoney)
moneyEnvelope := mapStepMoneyEnvelope(plannedAmount, plannedConverted, executedAmount, executedConverted)
return &orchestrationv2.StepExecution{
StepRef: strings.TrimSpace(step.StepRef),
StepCode: strings.TrimSpace(step.StepCode),
@@ -46,11 +53,37 @@ func mapStepExecution(step agg.StepExecution, index int) (*orchestrationv2.StepE
Refs: mapExternalRefs(step.StepCode, step.ExternalRefs),
ReportVisibility: mapReportVisibility(step.ReportVisibility),
UserLabel: strings.TrimSpace(step.UserLabel),
ExecutedMoney: moneyToProto(step.ExecutedMoney),
ConvertedMoney: moneyToProto(step.ConvertedMoney),
Money: moneyEnvelope,
}, nil
}
func mapStepMoneyEnvelope(
plannedAmount *moneyv1.Money,
plannedConverted *moneyv1.Money,
executedAmount *moneyv1.Money,
executedConverted *moneyv1.Money,
) *orchestrationv2.StepExecutionMoney {
planned := mapStepMoneySnapshot(plannedAmount, plannedConverted)
executed := mapStepMoneySnapshot(executedAmount, executedConverted)
if planned == nil && executed == nil {
return nil
}
return &orchestrationv2.StepExecutionMoney{
Planned: planned,
Executed: executed,
}
}
func mapStepMoneySnapshot(amount *moneyv1.Money, converted *moneyv1.Money) *orchestrationv2.StepExecutionMoneySnapshot {
if amount == nil && converted == nil {
return nil
}
return &orchestrationv2.StepExecutionMoneySnapshot{
Amount: amount,
ConvertedAmount: converted,
}
}
func mapStepFailure(step agg.StepExecution, state agg.StepState) *orchestrationv2.Failure {
if state != agg.StepStateFailed && state != agg.StepStateNeedsAttention {
return nil

View File

@@ -30,17 +30,24 @@ type BatchOptimizer interface {
Optimize(in BatchOptimizeInput) (*BatchOptimizeOutput, error)
}
// BatchOptimizationMergeKeyResolver allows external packages to resolve custom merge keys.
type BatchOptimizationMergeKeyResolver interface {
ResolveMergeKey(item qsnap.ResolvedItem, selection BatchOptimizationSelection) (string, bool)
}
// PolicyBatchOptimizerDependencies configures policy-based optimizer implementation.
type PolicyBatchOptimizerDependencies struct {
Logger mlogger.Logger
Aggregator opagg.Aggregator
Policy BatchOptimizationPolicy
Logger mlogger.Logger
Aggregator opagg.Aggregator
Policy BatchOptimizationPolicy
MergeKeyResolver BatchOptimizationMergeKeyResolver
}
type policyBatchOptimizer struct {
logger mlogger.Logger
aggregator opagg.Aggregator
policy BatchOptimizationPolicy
logger mlogger.Logger
aggregator opagg.Aggregator
policy BatchOptimizationPolicy
mergeKeyResolver BatchOptimizationMergeKeyResolver
}
// NewPolicyBatchOptimizer creates default rule-based optimizer implementation.
@@ -54,9 +61,10 @@ func NewPolicyBatchOptimizer(deps PolicyBatchOptimizerDependencies) BatchOptimiz
aggregator = opagg.New(opagg.Dependencies{Logger: logger})
}
return &policyBatchOptimizer{
logger: logger.Named("batch_optimizer"),
aggregator: aggregator,
policy: normalizeBatchOptimizationPolicy(deps.Policy),
logger: logger.Named("batch_optimizer"),
aggregator: aggregator,
policy: normalizeBatchOptimizationPolicy(deps.Policy),
mergeKeyResolver: deps.MergeKeyResolver,
}
}
@@ -68,13 +76,15 @@ func (o *policyBatchOptimizer) Optimize(in BatchOptimizeInput) (*BatchOptimizeOu
aggItems := make([]opagg.Item, 0, len(in.Items))
for i := range in.Items {
item := in.Items[i]
selection := o.policy.Select(batchOptimizationContextFromItem(item))
selection := o.policy.SelectMany(batchOptimizationContextsFromItem(item))
intentSnapshot := applyBatchOptimizationSelection(item.IntentSnapshot, selection)
mergeKey := batchOptimizationMergeKey(item, selection, o.mergeKeyResolver)
aggItems = append(aggItems, opagg.Item{
IntentRef: item.IntentRef,
IntentSnapshot: intentSnapshot,
QuoteSnapshot: item.QuoteSnapshot,
MergeMode: mergeModeFromBatchOptimization(selection.Mode),
MergeKey: mergeKey,
PolicyTag: batchOptimizationSelectionTag(selection),
})
}
@@ -89,13 +99,211 @@ func (o *policyBatchOptimizer) Optimize(in BatchOptimizeInput) (*BatchOptimizeOu
return &BatchOptimizeOutput{Groups: aggOutput.Groups}, nil
}
func batchOptimizationContextsFromItem(item qsnap.ResolvedItem) []BatchOptimizationContext {
out := make([]BatchOptimizationContext, 0, 6)
add := func(base BatchOptimizationContext) {
if base.Rail == discovery.RailUnspecified {
return
}
money := selectSettlementMoney(item)
if money != nil {
base.Amount = strings.TrimSpace(money.Amount)
base.Currency = strings.TrimSpace(money.Currency)
}
out = append(out, base)
}
add(batchOptimizationContextFromItem(item))
add(mergeBatchOptimizationContext(
endpointOptimizationContext(item.IntentSnapshot.Source),
sourceHopContext(item.QuoteSnapshot),
))
route := item.QuoteSnapshot
if route != nil && route.Route != nil {
if parsed := model.ParseRail(route.Route.Rail); parsed != discovery.RailUnspecified {
add(BatchOptimizationContext{
Rail: parsed,
Provider: strings.TrimSpace(route.Route.Provider),
Network: strings.TrimSpace(route.Route.Network),
})
}
for i := range route.Route.Hops {
hop := route.Route.Hops[i]
if hop == nil {
continue
}
parsed := model.ParseRail(hop.Rail)
if parsed == discovery.RailUnspecified {
continue
}
add(BatchOptimizationContext{
Rail: parsed,
Provider: strings.TrimSpace(hop.Gateway),
Network: strings.TrimSpace(hop.Network),
})
}
}
if len(out) == 0 {
add(batchOptimizationContextFromItem(item))
}
return out
}
func batchOptimizationMergeKey(
item qsnap.ResolvedItem,
sel BatchOptimizationSelection,
resolver BatchOptimizationMergeKeyResolver,
) string {
if mergeModeFromBatchOptimization(sel.Mode) != opagg.MergeModeByRecipient {
return ""
}
groupBy := normalizeBatchOptimizationGrouping(sel.GroupBy)
if groupBy == BatchOptimizationGroupingUnspecified {
groupBy = BatchOptimizationGroupingOperationDestination
}
switch groupBy {
case BatchOptimizationGroupingOperationSource:
return mergeEndpointKey(item.IntentSnapshot.Source)
case BatchOptimizationGroupingRailTarget:
if resolver != nil {
if key, ok := resolver.ResolveMergeKey(item, sel); ok {
return strings.TrimSpace(key)
}
}
return railTargetMergeKey(item, sel.MatchRail)
default:
return mergeEndpointKey(item.IntentSnapshot.Destination)
}
}
func railTargetMergeKey(item qsnap.ResolvedItem, rail model.Rail) string {
matchRail := model.ParseRail(string(rail))
if matchRail == discovery.RailUnspecified {
ctx := batchOptimizationContextFromItem(item)
matchRail = ctx.Rail
}
switch matchRail {
case discovery.RailCardPayout:
return mergeEndpointKey(item.IntentSnapshot.Destination)
case discovery.RailCrypto:
if endpointRail(item.IntentSnapshot.Destination) == discovery.RailCrypto {
return mergeEndpointKey(item.IntentSnapshot.Destination)
}
if endpointRail(item.IntentSnapshot.Source) == discovery.RailCrypto {
return mergeEndpointKey(item.IntentSnapshot.Source)
}
return routeRailTargetKey(item.QuoteSnapshot, matchRail)
default:
return routeRailTargetKey(item.QuoteSnapshot, matchRail)
}
}
func routeRailTargetKey(snapshot *model.PaymentQuoteSnapshot, rail model.Rail) string {
if snapshot == nil || snapshot.Route == nil {
return ""
}
route := snapshot.Route
target := ""
fallback := ""
for i := range route.Hops {
hop := route.Hops[i]
if hop == nil || model.ParseRail(hop.Rail) != rail {
continue
}
key := strings.Join([]string{
"rail=" + strings.ToUpper(strings.TrimSpace(hop.Rail)),
"gateway=" + strings.TrimSpace(hop.Gateway),
"instance=" + strings.TrimSpace(hop.InstanceID),
"network=" + strings.TrimSpace(hop.Network),
}, "|")
if fallback == "" {
fallback = key
}
if hop.Role == paymenttypes.QuoteRouteHopRoleDestination {
target = key
break
}
}
if target != "" {
return target
}
if fallback != "" {
return fallback
}
return ""
}
func mergeEndpointKey(ep model.PaymentEndpoint) string {
switch normalizeEndpointType(ep) {
case model.EndpointTypeLedger:
if ep.Ledger == nil {
return ""
}
return strings.Join([]string{
"type=ledger",
"account=" + strings.TrimSpace(ep.Ledger.LedgerAccountRef),
"contra=" + strings.TrimSpace(ep.Ledger.ContraLedgerAccountRef),
}, "|")
case model.EndpointTypeManagedWallet:
if ep.ManagedWallet == nil {
return ""
}
return strings.Join([]string{
"type=managed_wallet",
"wallet=" + strings.TrimSpace(ep.ManagedWallet.ManagedWalletRef),
"asset=" + mergeAssetKey(ep.ManagedWallet.Asset),
}, "|")
case model.EndpointTypeExternalChain:
if ep.ExternalChain == nil {
return ""
}
return strings.Join([]string{
"type=external_chain",
"address=" + strings.TrimSpace(ep.ExternalChain.Address),
"memo=" + strings.TrimSpace(ep.ExternalChain.Memo),
"asset=" + mergeAssetKey(ep.ExternalChain.Asset),
}, "|")
case model.EndpointTypeCard:
if ep.Card == nil {
return ""
}
return strings.Join([]string{
"type=card",
"token=" + strings.TrimSpace(ep.Card.Token),
"pan=" + strings.TrimSpace(ep.Card.Pan),
"masked=" + strings.TrimSpace(ep.Card.MaskedPan),
"country=" + strings.TrimSpace(ep.Card.Country),
"exp=" + strconv.FormatUint(uint64(ep.Card.ExpMonth), 10) + "-" + strconv.FormatUint(uint64(ep.Card.ExpYear), 10),
}, "|")
default:
return ""
}
}
func mergeAssetKey(asset *paymenttypes.Asset) string {
if asset == nil {
return ""
}
return strings.Join([]string{
strings.ToUpper(strings.TrimSpace(asset.Chain)),
strings.ToUpper(strings.TrimSpace(asset.TokenSymbol)),
strings.TrimSpace(asset.ContractAddress),
}, ":")
}
func batchOptimizationContextFromItem(item qsnap.ResolvedItem) BatchOptimizationContext {
rail, provider, network := destinationHopSignature(item.QuoteSnapshot)
// Match policy against operation target (intent destination), not route leg semantics.
target := mergeBatchOptimizationContext(
endpointOptimizationContext(item.IntentSnapshot.Destination),
destinationHopContext(item.QuoteSnapshot),
)
money := selectSettlementMoney(item)
ctx := BatchOptimizationContext{
Rail: rail,
Provider: provider,
Network: network,
Rail: target.Rail,
Provider: target.Provider,
Network: target.Network,
}
if money != nil {
ctx.Amount = strings.TrimSpace(money.Amount)
@@ -104,6 +312,70 @@ func batchOptimizationContextFromItem(item qsnap.ResolvedItem) BatchOptimization
return ctx
}
func mergeBatchOptimizationContext(primary, fallback BatchOptimizationContext) BatchOptimizationContext {
out := primary
if out.Rail == "" || out.Rail == discovery.RailUnspecified {
out.Rail = fallback.Rail
}
if strings.TrimSpace(out.Provider) == "" {
out.Provider = fallback.Provider
}
if strings.TrimSpace(out.Network) == "" {
out.Network = fallback.Network
}
return out
}
func endpointOptimizationContext(endpoint model.PaymentEndpoint) BatchOptimizationContext {
out := BatchOptimizationContext{
Rail: endpointRail(endpoint),
}
switch normalizeEndpointType(endpoint) {
case model.EndpointTypeManagedWallet:
if endpoint.ManagedWallet != nil && endpoint.ManagedWallet.Asset != nil {
out.Network = strings.TrimSpace(endpoint.ManagedWallet.Asset.Chain)
}
case model.EndpointTypeExternalChain:
if endpoint.ExternalChain != nil {
if endpoint.ExternalChain.Asset != nil {
out.Network = strings.TrimSpace(endpoint.ExternalChain.Asset.Chain)
}
}
}
return out
}
func endpointRail(endpoint model.PaymentEndpoint) model.Rail {
switch normalizeEndpointType(endpoint) {
case model.EndpointTypeManagedWallet, model.EndpointTypeExternalChain:
return discovery.RailCrypto
case model.EndpointTypeCard:
return discovery.RailCardPayout
case model.EndpointTypeLedger:
return discovery.RailLedger
default:
return discovery.RailUnspecified
}
}
func normalizeEndpointType(endpoint model.PaymentEndpoint) model.PaymentEndpointType {
if endpoint.Type != model.EndpointTypeUnspecified {
return endpoint.Type
}
switch {
case endpoint.Ledger != nil:
return model.EndpointTypeLedger
case endpoint.ManagedWallet != nil:
return model.EndpointTypeManagedWallet
case endpoint.ExternalChain != nil:
return model.EndpointTypeExternalChain
case endpoint.Card != nil:
return model.EndpointTypeCard
default:
return model.EndpointTypeUnspecified
}
}
func selectSettlementMoney(item qsnap.ResolvedItem) *paymenttypes.Money {
if item.QuoteSnapshot != nil && item.QuoteSnapshot.ExpectedSettlementAmount != nil {
return item.QuoteSnapshot.ExpectedSettlementAmount
@@ -111,14 +383,16 @@ func selectSettlementMoney(item qsnap.ResolvedItem) *paymenttypes.Money {
return item.IntentSnapshot.Amount
}
func destinationHopSignature(snapshot *model.PaymentQuoteSnapshot) (model.Rail, string, string) {
func destinationHopContext(snapshot *model.PaymentQuoteSnapshot) BatchOptimizationContext {
if snapshot == nil || snapshot.Route == nil {
return discovery.RailUnspecified, "", ""
return BatchOptimizationContext{}
}
route := snapshot.Route
rail := model.ParseRail(route.Rail)
provider := strings.TrimSpace(route.Provider)
network := strings.TrimSpace(route.Network)
ctx := BatchOptimizationContext{
Rail: model.ParseRail(route.Rail),
Provider: strings.TrimSpace(route.Provider),
Network: strings.TrimSpace(route.Network),
}
var destinationHop *paymenttypes.QuoteRouteHop
for i := range route.Hops {
@@ -133,16 +407,54 @@ func destinationHopSignature(snapshot *model.PaymentQuoteSnapshot) (model.Rail,
}
if destinationHop != nil {
if parsed := model.ParseRail(destinationHop.Rail); parsed != discovery.RailUnspecified {
rail = parsed
ctx.Rail = parsed
}
if provider == "" {
provider = strings.TrimSpace(destinationHop.Gateway)
if ctx.Provider == "" {
ctx.Provider = strings.TrimSpace(destinationHop.Gateway)
}
if network == "" {
network = strings.TrimSpace(destinationHop.Network)
if ctx.Network == "" {
ctx.Network = strings.TrimSpace(destinationHop.Network)
}
}
return rail, provider, network
return ctx
}
func sourceHopContext(snapshot *model.PaymentQuoteSnapshot) BatchOptimizationContext {
if snapshot == nil || snapshot.Route == nil {
return BatchOptimizationContext{}
}
route := snapshot.Route
ctx := BatchOptimizationContext{
Rail: model.ParseRail(route.Rail),
Provider: strings.TrimSpace(route.Provider),
Network: strings.TrimSpace(route.Network),
}
var sourceHop *paymenttypes.QuoteRouteHop
for i := range route.Hops {
hop := route.Hops[i]
if hop == nil {
continue
}
if sourceHop == nil {
sourceHop = hop
}
if hop.Role == paymenttypes.QuoteRouteHopRoleSource {
sourceHop = hop
break
}
}
if sourceHop != nil {
if parsed := model.ParseRail(sourceHop.Rail); parsed != discovery.RailUnspecified {
ctx.Rail = parsed
}
if ctx.Provider == "" {
ctx.Provider = strings.TrimSpace(sourceHop.Gateway)
}
if ctx.Network == "" {
ctx.Network = strings.TrimSpace(sourceHop.Network)
}
}
return ctx
}
func mergeModeFromBatchOptimization(mode BatchOptimizationMode) opagg.MergeMode {
@@ -174,6 +486,12 @@ func applyBatchOptimizationSelection(intent model.PaymentIntent, sel BatchOptimi
}
intent.Attributes[attrBatchOptimizerMode] = string(mode)
intent.Attributes[attrBatchOptimizerRuleMatch] = strconv.FormatBool(sel.Matched)
groupBy := normalizeBatchOptimizationGrouping(sel.GroupBy)
if groupBy == BatchOptimizationGroupingUnspecified {
delete(intent.Attributes, attrBatchOptimizerGrouping)
} else {
intent.Attributes[attrBatchOptimizerGrouping] = string(groupBy)
}
ruleID := strings.TrimSpace(sel.RuleID)
if ruleID == "" {
delete(intent.Attributes, attrBatchOptimizerRuleID)

View File

@@ -0,0 +1,48 @@
package psvc
import (
"testing"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/qsnap"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
func TestBatchOptimizationContextFromItem_UsesOperationTargetContext(t *testing.T) {
item := qsnap.ResolvedItem{
IntentRef: "intent-1",
IntentSnapshot: model.PaymentIntent{
Ref: "intent-1",
Destination: model.PaymentEndpoint{
Type: model.EndpointTypeCard,
Card: &model.CardEndpoint{
Pan: "2200001100000001",
ExpMonth: 1,
ExpYear: 2030,
},
},
Amount: &paymenttypes.Money{Amount: "100", Currency: "RUB"},
SettlementCurrency: "RUB",
},
QuoteSnapshot: &model.PaymentQuoteSnapshot{
Route: &paymenttypes.QuoteRouteSpecification{
Hops: []*paymenttypes.QuoteRouteHop{
{Index: 1, Rail: "CRYPTO", Gateway: "gw-crypto", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleSource},
{Index: 2, Rail: "CARD", Gateway: "mcards", Network: "TRON_NILE", Role: paymenttypes.QuoteRouteHopRoleDestination},
},
},
},
}
ctx := batchOptimizationContextFromItem(item)
if got, want := ctx.Rail, model.ParseRail(string(discovery.RailCardPayout)); got != want {
t.Fatalf("operation rail mismatch: got=%q want=%q", got, want)
}
if got, want := ctx.Provider, "mcards"; got != want {
t.Fatalf("provider mismatch: got=%q want=%q", got, want)
}
if got, want := ctx.Network, "TRON_NILE"; got != want {
t.Fatalf("network mismatch: got=%q want=%q", got, want)
}
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
svcshared "github.com/tech/sendico/payments/orchestrator/internal/service/shared"
"github.com/tech/sendico/pkg/mlogger"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
@@ -201,7 +202,7 @@ func inheritedExternalRefs(payment *agg.Payment, step xplan.Step, current agg.St
}
func inheritedExecutedMoney(payment *agg.Payment, step xplan.Step, current agg.StepExecution) *paymenttypes.Money {
if money := cloneStepMoney(current.ExecutedMoney); money != nil {
if money := svcshared.CloneMoneyTrimNonEmpty(current.ExecutedMoney); money != nil {
return money
}
if payment == nil || len(step.DependsOn) == 0 {
@@ -213,7 +214,7 @@ func inheritedExecutedMoney(payment *agg.Payment, step xplan.Step, current agg.S
if !ok || idx < 0 || idx >= len(payment.StepExecutions) {
continue
}
if money := cloneStepMoney(payment.StepExecutions[idx].ExecutedMoney); money != nil {
if money := svcshared.CloneMoneyTrimNonEmpty(payment.StepExecutions[idx].ExecutedMoney); money != nil {
return money
}
}
@@ -221,7 +222,7 @@ func inheritedExecutedMoney(payment *agg.Payment, step xplan.Step, current agg.S
}
func inheritedConvertedMoney(payment *agg.Payment, step xplan.Step, current agg.StepExecution) *paymenttypes.Money {
if money := cloneStepMoney(current.ConvertedMoney); money != nil {
if money := svcshared.CloneMoneyTrimNonEmpty(current.ConvertedMoney); money != nil {
return money
}
if payment == nil || len(step.DependsOn) == 0 {
@@ -233,7 +234,7 @@ func inheritedConvertedMoney(payment *agg.Payment, step xplan.Step, current agg.
if !ok || idx < 0 || idx >= len(payment.StepExecutions) {
continue
}
if money := cloneStepMoney(payment.StepExecutions[idx].ConvertedMoney); money != nil {
if money := svcshared.CloneMoneyTrimNonEmpty(payment.StepExecutions[idx].ConvertedMoney); money != nil {
return money
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/qsnap"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/reqval"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/merrors"
orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2"
"go.uber.org/zap"
@@ -168,7 +169,7 @@ func (s *svc) createNewPayment(ctx context.Context, requestCtx *reqval.Ctx) (*ag
ClientPaymentRef: requestCtx.ClientPaymentRef,
IntentSnapshot: resolved.IntentSnapshot,
QuoteSnapshot: resolved.QuoteSnapshot,
Steps: toStepShells(graph),
Steps: toStepShells(graph, resolved.IntentSnapshot, resolved.QuoteSnapshot),
})
if err != nil {
return nil, err
@@ -212,20 +213,23 @@ func (s *svc) resolveAndPlan(ctx context.Context, requestCtx *reqval.Ctx) (*qsna
return resolved, graph, nil
}
func toStepShells(graph *xplan.Graph) []agg.StepShell {
func toStepShells(graph *xplan.Graph, intent model.PaymentIntent, quote *model.PaymentQuoteSnapshot) []agg.StepShell {
if graph == nil || len(graph.Steps) == 0 {
return nil
}
out := make([]agg.StepShell, 0, len(graph.Steps))
for i := range graph.Steps {
plannedMoney, plannedConverted := plannedMoneyForStep(graph.Steps[i], intent, quote)
out = append(out, agg.StepShell{
StepRef: graph.Steps[i].StepRef,
StepCode: graph.Steps[i].StepCode,
Rail: graph.Steps[i].Rail,
Gateway: graph.Steps[i].Gateway,
InstanceID: graph.Steps[i].InstanceID,
ReportVisibility: graph.Steps[i].Visibility,
UserLabel: graph.Steps[i].UserLabel,
StepRef: graph.Steps[i].StepRef,
StepCode: graph.Steps[i].StepCode,
Rail: graph.Steps[i].Rail,
Gateway: graph.Steps[i].Gateway,
InstanceID: graph.Steps[i].InstanceID,
ReportVisibility: graph.Steps[i].Visibility,
UserLabel: graph.Steps[i].UserLabel,
PlannedMoney: plannedMoney,
PlannedConvertedMoney: plannedConverted,
})
}
return out

View File

@@ -31,6 +31,7 @@ const (
attrAggregatedByRecipient = "orchestrator.v2.aggregated_by_recipient"
attrAggregatedItems = "orchestrator.v2.aggregated_items"
attrBatchOptimizerMode = "orchestrator.v2.batch_optimizer_mode"
attrBatchOptimizerGrouping = "orchestrator.v2.batch_optimizer_group_by"
attrBatchOptimizerRuleID = "orchestrator.v2.batch_optimizer_rule_id"
attrBatchOptimizerRuleMatch = "orchestrator.v2.batch_optimizer_rule_matched"
)
@@ -198,7 +199,7 @@ func (s *svc) createGroupPayment(
ClientPaymentRef: requestCtx.ClientPaymentRef,
IntentSnapshot: intentSnapshot,
QuoteSnapshot: group.QuoteSnapshot,
Steps: toStepShells(graph),
Steps: toStepShells(graph, intentSnapshot, group.QuoteSnapshot),
})
if err != nil {
return nil, err
@@ -291,7 +292,7 @@ func (s *svc) buildBatchOperationGroups(groups []opagg.Group) ([]opagg.Group, er
}
group.IntentSnapshot.Attributes[attrAggregatedItems] = strconv.Itoa(len(group.IntentRefs))
targets := buildBatchPayoutTargets([]opagg.Group{group})
targets := buildBatchPayoutTargets(group)
if routeContainsCardPayout(group.QuoteSnapshot) && len(targets) > 0 {
raw, err := batchmeta.EncodePayoutTargets(targets)
if err != nil {
@@ -311,32 +312,49 @@ func (s *svc) buildBatchOperationGroups(groups []opagg.Group) ([]opagg.Group, er
return out, nil
}
func buildBatchPayoutTargets(groups []opagg.Group) []batchmeta.PayoutTarget {
if len(groups) == 0 {
func buildBatchPayoutTargets(group opagg.Group) []batchmeta.PayoutTarget {
if len(group.Members) > 0 {
targets := make([]batchmeta.PayoutTarget, 0, len(group.Members))
for i := range group.Members {
member := group.Members[i]
target := batchmeta.PayoutTarget{
TargetRef: firstNonEmpty(strings.TrimSpace(member.IntentRef), "recipient-"+strconv.Itoa(i+1)),
IntentRef: normalizeIntentRefs([]string{member.IntentRef}),
Amount: batchPayoutAmountFromMember(member),
}
if member.IntentSnapshot.Destination.Type == model.EndpointTypeCard && member.IntentSnapshot.Destination.Card != nil {
card := *member.IntentSnapshot.Destination.Card
target.Card = &card
}
if member.IntentSnapshot.Customer != nil {
customer := *member.IntentSnapshot.Customer
target.Customer = &customer
}
targets = append(targets, target)
}
if len(targets) == 0 {
return nil
}
return targets
}
if isEmptyIntentSnapshot(group.IntentSnapshot) {
return nil
}
out := make([]batchmeta.PayoutTarget, 0, len(groups))
for i := range groups {
group := groups[i]
target := batchmeta.PayoutTarget{
TargetRef: firstNonEmpty(strings.TrimSpace(group.RecipientKey), "recipient-"+strconv.Itoa(i+1)),
IntentRef: normalizeIntentRefs(group.IntentRefs),
Amount: batchPayoutAmount(group),
}
if group.IntentSnapshot.Destination.Type == model.EndpointTypeCard && group.IntentSnapshot.Destination.Card != nil {
card := *group.IntentSnapshot.Destination.Card
target.Card = &card
}
if group.IntentSnapshot.Customer != nil {
customer := *group.IntentSnapshot.Customer
target.Customer = &customer
}
out = append(out, target)
target := batchmeta.PayoutTarget{
TargetRef: firstNonEmpty(strings.TrimSpace(group.RecipientKey), "recipient-1"),
IntentRef: normalizeIntentRefs(group.IntentRefs),
Amount: batchPayoutAmount(group),
}
if len(out) == 0 {
return nil
if group.IntentSnapshot.Destination.Type == model.EndpointTypeCard && group.IntentSnapshot.Destination.Card != nil {
card := *group.IntentSnapshot.Destination.Card
target.Card = &card
}
return out
if group.IntentSnapshot.Customer != nil {
customer := *group.IntentSnapshot.Customer
target.Customer = &customer
}
return []batchmeta.PayoutTarget{target}
}
func batchPayoutAmount(group opagg.Group) *paymenttypes.Money {
@@ -355,6 +373,26 @@ func batchPayoutAmount(group opagg.Group) *paymenttypes.Money {
}
}
func batchPayoutAmountFromMember(member opagg.GroupMember) *paymenttypes.Money {
if member.QuoteSnapshot != nil && member.QuoteSnapshot.ExpectedSettlementAmount != nil {
return &paymenttypes.Money{
Amount: strings.TrimSpace(member.QuoteSnapshot.ExpectedSettlementAmount.Amount),
Currency: strings.TrimSpace(member.QuoteSnapshot.ExpectedSettlementAmount.Currency),
}
}
if member.IntentSnapshot.Amount == nil {
return nil
}
return &paymenttypes.Money{
Amount: strings.TrimSpace(member.IntentSnapshot.Amount.Amount),
Currency: strings.TrimSpace(member.IntentSnapshot.Amount.Currency),
}
}
func isEmptyIntentSnapshot(intent model.PaymentIntent) bool {
return intent.Amount == nil && (intent.Kind == "" || intent.Kind == model.PaymentKindUnspecified)
}
func routeContainsCardPayout(snapshot *model.PaymentQuoteSnapshot) bool {
if snapshot == nil || snapshot.Route == nil {
return false

View File

@@ -162,6 +162,47 @@ func TestExecuteBatchPayment_CryptoPolicyMergesByDestination(t *testing.T) {
}
}
func TestExecuteBatchPayment_CryptoPolicyMergesSameRecipientEvenWithDifferentSourceAndQuoteRefs(t *testing.T) {
env := newTestEnvWithPolicy(t, BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-merge",
Priority: 100,
Mode: BatchOptimizationModeMergeByDestination,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
},
}, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution
step.State = agg.StepStateCompleted
return &sexec.ExecuteOutput{StepExecution: step}, nil
})
quote := newExecutableBatchCryptoQuoteSameDestination(env.orgID, "quote-batch-crypto-merge-mixed")
if len(quote.Items) != 2 {
t.Fatalf("expected 2 quote items, got=%d", len(quote.Items))
}
quote.Items[0].Quote.Route.RouteRef = "route-a"
quote.Items[1].Intent.Source = testLedgerEndpoint("ledger-src-2")
quote.Items[1].Quote.Route.RouteRef = "route-b"
env.quotes.Put(quote)
resp, err := env.svc.ExecuteBatchPayment(context.Background(), &orchestrationv2.ExecuteBatchPaymentRequest{
Meta: testMeta(env.orgID, "idem-batch-crypto-merge-mixed"),
QuotationRef: "quote-batch-crypto-merge-mixed",
ClientPaymentRef: "client-batch-crypto-merge-mixed",
})
if err != nil {
t.Fatalf("ExecuteBatchPayment returned error: %v", err)
}
if got, want := len(resp.GetPayments()), 1; got != want {
t.Fatalf("expected %d merged payment for crypto policy, got=%d", want, got)
}
}
func TestExecuteBatchPayment_IdempotentRetry(t *testing.T) {
env := newTestEnv(t, func(_ string, req sexec.StepRequest) (*sexec.ExecuteOutput, error) {
step := req.StepExecution

View File

@@ -17,11 +17,21 @@ const (
BatchOptimizationModeNoOptimization BatchOptimizationMode = "no_optimization"
)
// BatchOptimizationGrouping defines which target key is used for merge grouping.
type BatchOptimizationGrouping string
const (
BatchOptimizationGroupingUnspecified BatchOptimizationGrouping = ""
BatchOptimizationGroupingOperationDestination BatchOptimizationGrouping = "operation_destination"
BatchOptimizationGroupingOperationSource BatchOptimizationGrouping = "operation_source"
BatchOptimizationGroupingRailTarget BatchOptimizationGrouping = "rail_target"
)
// BatchOptimizationPolicy configures batch operation compaction behavior.
//
// Rules are evaluated by "best match":
// 1. all specified match fields must match the item context;
// 2. winner is highest specificity (number of optional selectors present);
// 2. winner is highest specificity (number of optional match fields present);
// 3. ties are resolved by larger Priority;
// 4. remaining ties keep the first rule order from config.
//
@@ -37,6 +47,7 @@ type BatchOptimizationRule struct {
Enabled *bool
Priority int
Mode BatchOptimizationMode
GroupBy BatchOptimizationGrouping
Match BatchOptimizationMatch
}
@@ -67,9 +78,11 @@ type BatchOptimizationContext struct {
// BatchOptimizationSelection is the selected optimization decision.
type BatchOptimizationSelection struct {
Mode BatchOptimizationMode
RuleID string
Matched bool
Mode BatchOptimizationMode
RuleID string
Matched bool
GroupBy BatchOptimizationGrouping
MatchRail model.Rail
}
func defaultBatchOptimizationPolicy() BatchOptimizationPolicy {
@@ -107,10 +120,14 @@ func normalizeBatchOptimizationRule(in BatchOptimizationRule) (BatchOptimization
Enabled: in.Enabled,
Priority: in.Priority,
Mode: normalizeBatchOptimizationMode(in.Mode),
GroupBy: normalizeBatchOptimizationGrouping(in.GroupBy),
}
if out.Mode == BatchOptimizationModeUnspecified {
return BatchOptimizationRule{}, false
}
if out.GroupBy == BatchOptimizationGroupingUnspecified {
out.GroupBy = BatchOptimizationGroupingOperationDestination
}
match, ok := normalizeBatchOptimizationMatch(in.Match)
if !ok {
@@ -174,41 +191,78 @@ func normalizeBatchOptimizationAmountRange(in BatchOptimizationAmountRange) (Bat
}
func (p BatchOptimizationPolicy) Select(in BatchOptimizationContext) BatchOptimizationSelection {
return p.SelectMany([]BatchOptimizationContext{in})
}
func (p BatchOptimizationPolicy) SelectMany(inputs []BatchOptimizationContext) BatchOptimizationSelection {
policy := normalizeBatchOptimizationPolicy(p)
ctx := normalizeBatchOptimizationContext(in)
contexts := make([]BatchOptimizationContext, 0, len(inputs))
for i := range inputs {
ctx := normalizeBatchOptimizationContext(inputs[i])
if ctx.Rail == discovery.RailUnspecified {
continue
}
contexts = append(contexts, ctx)
}
selected := BatchOptimizationSelection{
Mode: policy.DefaultMode,
Mode: policy.DefaultMode,
GroupBy: BatchOptimizationGroupingOperationDestination,
MatchRail: discovery.RailUnspecified,
}
if len(contexts) == 0 {
return selected
}
bestSpecificity := -1
bestPriority := 0
bestOrder := len(policy.Rules) + 1
for i := range policy.Rules {
rule := policy.Rules[i]
if rule.Enabled != nil && !*rule.Enabled {
continue
}
specificity, ok := rule.matches(ctx)
if !ok {
continue
}
if specificity > bestSpecificity ||
(specificity == bestSpecificity && rule.Priority > bestPriority) ||
(specificity == bestSpecificity && rule.Priority == bestPriority && i < bestOrder) {
selected = BatchOptimizationSelection{
Mode: rule.Mode,
RuleID: rule.ID,
Matched: true,
bestContext := len(contexts) + 1
for cIdx := range contexts {
ctx := contexts[cIdx]
for rIdx := range policy.Rules {
rule := policy.Rules[rIdx]
if rule.Enabled != nil && !*rule.Enabled {
continue
}
specificity, ok := rule.matches(ctx)
if !ok {
continue
}
if specificity > bestSpecificity ||
(specificity == bestSpecificity && rule.Priority > bestPriority) ||
(specificity == bestSpecificity && rule.Priority == bestPriority && rIdx < bestOrder) ||
(specificity == bestSpecificity && rule.Priority == bestPriority && rIdx == bestOrder && cIdx < bestContext) {
selected = BatchOptimizationSelection{
Mode: rule.Mode,
RuleID: rule.ID,
Matched: true,
GroupBy: rule.GroupBy,
MatchRail: rule.Match.Rail,
}
bestSpecificity = specificity
bestPriority = rule.Priority
bestOrder = rIdx
bestContext = cIdx
}
bestSpecificity = specificity
bestPriority = rule.Priority
bestOrder = i
}
}
return selected
}
func normalizeBatchOptimizationGrouping(raw BatchOptimizationGrouping) BatchOptimizationGrouping {
switch strings.ToLower(strings.TrimSpace(string(raw))) {
case "operation_destination", "operation-destination", "destination", "intent_destination", "intent-destination":
return BatchOptimizationGroupingOperationDestination
case "operation_source", "operation-source", "source", "intent_source", "intent-source":
return BatchOptimizationGroupingOperationSource
case "rail_target", "rail-target", "by_rail_target", "by-rail-target":
return BatchOptimizationGroupingRailTarget
default:
return BatchOptimizationGroupingUnspecified
}
}
func (r BatchOptimizationRule) matches(ctx BatchOptimizationContext) (int, bool) {
match := r.Match
if match.Rail == discovery.RailUnspecified || ctx.Rail == discovery.RailUnspecified {

View File

@@ -53,6 +53,9 @@ func TestNormalizeBatchOptimizationPolicy_DropsInvalidRules(t *testing.T) {
if got.Rules[0].ID != "valid" {
t.Fatalf("rule id mismatch: got=%q want=%q", got.Rules[0].ID, "valid")
}
if got, want := got.Rules[0].GroupBy, BatchOptimizationGroupingOperationDestination; got != want {
t.Fatalf("group_by mismatch: got=%q want=%q", got, want)
}
}
func TestBatchOptimizationPolicy_Select_UsesBestSpecificityThenPriority(t *testing.T) {
@@ -101,6 +104,12 @@ func TestBatchOptimizationPolicy_Select_UsesBestSpecificityThenPriority(t *testi
if out.Mode != BatchOptimizationModeMergeByDestination {
t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeMergeByDestination)
}
if got, want := out.GroupBy, BatchOptimizationGroupingOperationDestination; got != want {
t.Fatalf("group_by mismatch: got=%q want=%q", got, want)
}
if got, want := string(out.MatchRail), string(discovery.RailCrypto); got != want {
t.Fatalf("match rail mismatch: got=%q want=%q", got, want)
}
}
func TestBatchOptimizationPolicy_Select_FallsBackToDefault(t *testing.T) {
@@ -124,6 +133,9 @@ func TestBatchOptimizationPolicy_Select_FallsBackToDefault(t *testing.T) {
if out.Mode != BatchOptimizationModeNoOptimization {
t.Fatalf("mode mismatch: got=%q want=%q", out.Mode, BatchOptimizationModeNoOptimization)
}
if got, want := out.GroupBy, BatchOptimizationGroupingOperationDestination; got != want {
t.Fatalf("default group_by mismatch: got=%q want=%q", got, want)
}
}
func TestBatchOptimizationPolicy_Select_MatchesAmountRange(t *testing.T) {
@@ -165,3 +177,26 @@ func TestBatchOptimizationPolicy_Select_MatchesAmountRange(t *testing.T) {
t.Fatalf("default mode mismatch: got=%q want=%q", notMatched.Mode, BatchOptimizationModeNoOptimization)
}
}
func TestBatchOptimizationPolicy_Select_ReturnsConfiguredGroupBy(t *testing.T) {
policy := BatchOptimizationPolicy{
DefaultMode: BatchOptimizationModeNoOptimization,
Rules: []BatchOptimizationRule{
{
ID: "crypto-by-rail-target",
Mode: BatchOptimizationModeMergeByDestination,
GroupBy: BatchOptimizationGroupingRailTarget,
Match: BatchOptimizationMatch{
Rail: discovery.RailCrypto,
},
},
},
}
out := policy.Select(BatchOptimizationContext{Rail: discovery.RailCrypto})
if !out.Matched {
t.Fatal("expected matched rule")
}
if got, want := out.GroupBy, BatchOptimizationGroupingRailTarget; got != want {
t.Fatalf("group_by mismatch: got=%q want=%q", got, want)
}
}

View File

@@ -0,0 +1,234 @@
package psvc
import (
"fmt"
"strings"
"github.com/shopspring/decimal"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/batchmeta"
svcshared "github.com/tech/sendico/payments/orchestrator/internal/service/shared"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
)
func plannedMoneyForStep(
step xplan.Step,
intent model.PaymentIntent,
quote *model.PaymentQuoteSnapshot,
) (*paymenttypes.Money, *paymenttypes.Money) {
action := model.ParseRailOperation(string(step.Action))
switch step.Rail {
case discovery.RailLedger:
return plannedLedgerMoney(step, intent, quote, action), nil
case discovery.RailCrypto:
switch action {
case discovery.RailOperationSend:
return plannedSourceMoney(intent, quote), nil
case discovery.RailOperationFee:
return plannedFeeMoney(intent, quote), nil
}
case discovery.RailProviderSettlement:
if action == discovery.RailOperationFXConvert {
return plannedSourceMoney(intent, quote), plannedSettlementMoney(intent, quote)
}
case discovery.RailCardPayout:
if action == discovery.RailOperationSend {
return plannedCardPayoutMoney(step, intent, quote), nil
}
}
if override, ok := batchmeta.AmountFromMetadata(step.Metadata); ok {
return svcshared.CloneMoneyTrimNonEmpty(override), nil
}
return nil, nil
}
func plannedLedgerMoney(
step xplan.Step,
intent model.PaymentIntent,
quote *model.PaymentQuoteSnapshot,
action model.RailOperation,
) *paymenttypes.Money {
if override, ok := batchmeta.AmountFromMetadata(step.Metadata); ok {
return svcshared.CloneMoneyTrimNonEmpty(override)
}
sourceMoney := plannedSourceMoney(intent, quote)
settlementMoney := plannedSettlementMoney(intent, quote)
payoutMoney := settlementMoney
if fromRail, toRail, ok := plannedLedgerBoundaryRails(step.StepCode, quote); ok {
switch {
case isLedgerExternalRail(fromRail) && isLedgerExternalRail(toRail):
return sourceMoney
case isLedgerExternalRail(fromRail) && isLedgerInternalRail(toRail):
return settlementMoney
case isLedgerInternalRail(fromRail) && isLedgerExternalRail(toRail):
if toRail == discovery.RailCardPayout {
return payoutMoney
}
return settlementMoney
case isLedgerInternalRail(fromRail) && isLedgerInternalRail(toRail):
return settlementMoney
}
}
switch action {
case discovery.RailOperationCredit, discovery.RailOperationExternalCredit:
return settlementMoney
case discovery.RailOperationDebit, discovery.RailOperationExternalDebit:
if sourceMoney != nil {
return sourceMoney
}
return settlementMoney
case discovery.RailOperationMove, discovery.RailOperationBlock, discovery.RailOperationRelease:
if settlementMoney != nil {
return settlementMoney
}
return sourceMoney
default:
return nil
}
}
func plannedSourceMoney(intent model.PaymentIntent, quote *model.PaymentQuoteSnapshot) *paymenttypes.Money {
if quote != nil && quote.DebitAmount != nil {
return svcshared.CloneMoneyTrimNonEmpty(quote.DebitAmount)
}
return svcshared.CloneMoneyTrimNonEmpty(intent.Amount)
}
func plannedSettlementMoney(intent model.PaymentIntent, quote *model.PaymentQuoteSnapshot) *paymenttypes.Money {
if quote != nil && quote.ExpectedSettlementAmount != nil {
return svcshared.CloneMoneyTrimNonEmpty(quote.ExpectedSettlementAmount)
}
return plannedSourceMoney(intent, quote)
}
func plannedCardPayoutMoney(step xplan.Step, intent model.PaymentIntent, quote *model.PaymentQuoteSnapshot) *paymenttypes.Money {
if override, ok := batchmeta.AmountFromMetadata(step.Metadata); ok {
return svcshared.CloneMoneyTrimNonEmpty(override)
}
if quote != nil && quote.ExpectedSettlementAmount != nil {
return svcshared.CloneMoneyTrimNonEmpty(quote.ExpectedSettlementAmount)
}
return svcshared.CloneMoneyTrimNonEmpty(intent.Amount)
}
func plannedFeeMoney(intent model.PaymentIntent, quote *model.PaymentQuoteSnapshot) *paymenttypes.Money {
if quote == nil || len(quote.FeeLines) == 0 {
return nil
}
sourceCurrency := ""
if source := plannedSourceMoney(intent, quote); source != nil {
sourceCurrency = strings.TrimSpace(source.Currency)
}
total := decimal.Zero
currency := ""
for i := range quote.FeeLines {
line := quote.FeeLines[i]
if line == nil || line.GetMoney() == nil {
continue
}
lineCurrency := strings.TrimSpace(line.GetMoney().GetCurrency())
if lineCurrency == "" {
continue
}
if sourceCurrency != "" && !strings.EqualFold(sourceCurrency, lineCurrency) {
continue
}
if currency == "" {
currency = strings.ToUpper(lineCurrency)
} else if !strings.EqualFold(currency, lineCurrency) {
return nil
}
amountRaw := strings.TrimSpace(line.GetMoney().GetAmount())
amount, err := decimal.NewFromString(amountRaw)
if err != nil {
continue
}
amount = amount.Abs()
if amount.IsZero() {
continue
}
if line.GetSide() == paymenttypes.EntrySideCredit {
total = total.Sub(amount)
} else {
total = total.Add(amount)
}
}
if total.Sign() <= 0 || strings.TrimSpace(currency) == "" {
return nil
}
return &paymenttypes.Money{
Amount: total.String(),
Currency: strings.ToUpper(strings.TrimSpace(currency)),
}
}
func plannedLedgerBoundaryRails(stepCode string, quote *model.PaymentQuoteSnapshot) (model.Rail, model.Rail, bool) {
fromIndex, toIndex, ok := parseLedgerEdgeStepCode(stepCode)
if !ok || quote == nil || quote.Route == nil {
return discovery.RailUnspecified, discovery.RailUnspecified, false
}
var fromRail model.Rail = discovery.RailUnspecified
var toRail model.Rail = discovery.RailUnspecified
for i := range quote.Route.Hops {
hop := quote.Route.Hops[i]
if hop == nil {
continue
}
if hop.Index == fromIndex {
fromRail = model.ParseRail(hop.Rail)
}
if hop.Index == toIndex {
toRail = model.ParseRail(hop.Rail)
}
}
if fromRail == discovery.RailUnspecified || toRail == discovery.RailUnspecified {
return discovery.RailUnspecified, discovery.RailUnspecified, false
}
return fromRail, toRail, true
}
func parseLedgerEdgeStepCode(stepCode string) (uint32, uint32, bool) {
code := strings.ToLower(strings.TrimSpace(stepCode))
if !strings.HasPrefix(code, "edge.") || !strings.Contains(code, ".ledger.") {
return 0, 0, false
}
var (
from uint32
to uint32
op string
)
if _, err := fmt.Sscanf(code, "edge.%d_%d.ledger.%s", &from, &to, &op); err != nil {
return 0, 0, false
}
if strings.TrimSpace(op) == "" {
return 0, 0, false
}
return from, to, true
}
func isLedgerInternalRail(rail model.Rail) bool {
return rail == discovery.RailLedger
}
func isLedgerExternalRail(rail model.Rail) bool {
switch rail {
case discovery.RailCrypto, discovery.RailProviderSettlement, discovery.RailCardPayout, discovery.RailFiatOnRamp:
return true
default:
return false
}
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/sexec"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/ssched"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
svcshared "github.com/tech/sendico/payments/orchestrator/internal/service/shared"
"github.com/tech/sendico/pkg/merrors"
"go.uber.org/zap"
)
@@ -325,8 +326,10 @@ func normalizeExecutorOutput(current agg.StepExecution, out *sexec.ExecuteOutput
next.Attempt = out.StepExecution.Attempt
}
next.ExternalRefs = out.StepExecution.ExternalRefs
next.ExecutedMoney = cloneStepMoney(out.StepExecution.ExecutedMoney)
next.ConvertedMoney = cloneStepMoney(out.StepExecution.ConvertedMoney)
next.ExecutedMoney = svcshared.CloneMoneyTrimNonEmpty(out.StepExecution.ExecutedMoney)
next.ConvertedMoney = svcshared.CloneMoneyTrimNonEmpty(out.StepExecution.ConvertedMoney)
next.PlannedMoney = svcshared.CloneMoneyTrimNonEmpty(out.StepExecution.PlannedMoney)
next.PlannedConvertedMoney = svcshared.CloneMoneyTrimNonEmpty(out.StepExecution.PlannedConvertedMoney)
next.FailureCode = strings.TrimSpace(out.StepExecution.FailureCode)
next.FailureMsg = strings.TrimSpace(out.StepExecution.FailureMsg)
@@ -445,6 +448,12 @@ func stepExecutionEqual(left, right agg.StepExecution) bool {
if !stepMoneyEqual(left.ConvertedMoney, right.ConvertedMoney) {
return false
}
if !stepMoneyEqual(left.PlannedMoney, right.PlannedMoney) {
return false
}
if !stepMoneyEqual(left.PlannedConvertedMoney, right.PlannedConvertedMoney) {
return false
}
return true
}

View File

@@ -1,29 +1,13 @@
package psvc
import (
"strings"
svcshared "github.com/tech/sendico/payments/orchestrator/internal/service/shared"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
func cloneStepMoney(src *paymenttypes.Money) *paymenttypes.Money {
if src == nil {
return nil
}
amount := strings.TrimSpace(src.GetAmount())
currency := strings.TrimSpace(src.GetCurrency())
if amount == "" || currency == "" {
return nil
}
return &paymenttypes.Money{
Amount: amount,
Currency: currency,
}
}
func stepMoneyEqual(left, right *paymenttypes.Money) bool {
left = cloneStepMoney(left)
right = cloneStepMoney(right)
left = svcshared.CloneMoneyTrimNonEmpty(left)
right = svcshared.CloneMoneyTrimNonEmpty(right)
switch {
case left == nil && right == nil:
return true

View File

@@ -207,6 +207,19 @@ func TestExecute_UnsupportedProviderSettlementSend(t *testing.T) {
}
}
func TestExecute_UnsupportedLedgerFee(t *testing.T) {
registry := New(Dependencies{})
_, err := registry.Execute(context.Background(), ExecuteInput{
Payment: &agg.Payment{PaymentRef: "p1"},
Step: xplan.Step{StepRef: "s1", StepCode: "ledger.fee", Action: discovery.RailOperationFee, Rail: discovery.RailLedger},
StepExecution: agg.StepExecution{StepRef: "s1", StepCode: "ledger.fee", Attempt: 1},
})
if !errors.Is(err, ErrUnsupportedStep) {
t.Fatalf("expected ErrUnsupportedStep, got %v", err)
}
}
func TestExecute_MissingExecutor(t *testing.T) {
registry := New(Dependencies{})

View File

@@ -4,11 +4,12 @@ import (
"strings"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oshared"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan"
svcshared "github.com/tech/sendico/payments/orchestrator/internal/service/shared"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
func (s *svc) prepareInput(in Input) (*preparedInput, error) {
@@ -174,9 +175,11 @@ func (s *svc) normalizeStepExecution(exec agg.StepExecution, index int) (agg.Ste
exec.InstanceID = strings.TrimSpace(exec.InstanceID)
exec.Rail = model.ParseRail(string(exec.Rail))
exec.ReportVisibility = model.NormalizeReportVisibility(exec.ReportVisibility)
exec.ExternalRefs = cloneExternalRefs(exec.ExternalRefs)
exec.ExecutedMoney = cloneStepMoney(exec.ExecutedMoney)
exec.ConvertedMoney = cloneStepMoney(exec.ConvertedMoney)
exec.ExternalRefs = oshared.CloneExternalRefs(exec.ExternalRefs)
exec.ExecutedMoney = svcshared.CloneMoneyTrimNonEmpty(exec.ExecutedMoney)
exec.ConvertedMoney = svcshared.CloneMoneyTrimNonEmpty(exec.ConvertedMoney)
exec.PlannedMoney = svcshared.CloneMoneyTrimNonEmpty(exec.PlannedMoney)
exec.PlannedConvertedMoney = svcshared.CloneMoneyTrimNonEmpty(exec.PlannedConvertedMoney)
if exec.StepRef == "" {
return agg.StepExecution{}, merrors.InvalidArgument("stepExecutions[" + itoa(index) + "].step_ref is required")
}
@@ -212,15 +215,17 @@ func seedMissingExecutions(
maxAttemptsByRef[stepRef] = 1
}
executionsByRef[stepRef] = &agg.StepExecution{
StepRef: step.StepRef,
StepCode: step.StepCode,
Rail: step.Rail,
Gateway: strings.TrimSpace(step.Gateway),
InstanceID: strings.TrimSpace(step.InstanceID),
ReportVisibility: effectiveStepVisibility(model.ReportVisibilityUnspecified, step.Visibility),
UserLabel: strings.TrimSpace(step.UserLabel),
State: agg.StepStatePending,
Attempt: attempt,
StepRef: step.StepRef,
StepCode: step.StepCode,
Rail: step.Rail,
Gateway: strings.TrimSpace(step.Gateway),
InstanceID: strings.TrimSpace(step.InstanceID),
ReportVisibility: effectiveStepVisibility(model.ReportVisibilityUnspecified, step.Visibility),
UserLabel: strings.TrimSpace(step.UserLabel),
State: agg.StepStatePending,
Attempt: attempt,
PlannedMoney: nil,
PlannedConvertedMoney: nil,
}
}
}
@@ -259,39 +264,11 @@ func normalizeStepState(state agg.StepState) (agg.StepState, bool) {
func cloneStepExecution(exec agg.StepExecution) agg.StepExecution {
out := exec
out.ExternalRefs = cloneExternalRefs(exec.ExternalRefs)
out.ExecutedMoney = cloneStepMoney(exec.ExecutedMoney)
out.ConvertedMoney = cloneStepMoney(exec.ConvertedMoney)
return out
}
func cloneStepMoney(money *paymenttypes.Money) *paymenttypes.Money {
if money == nil {
return nil
}
amount := strings.TrimSpace(money.GetAmount())
currency := strings.TrimSpace(money.GetCurrency())
if amount == "" || currency == "" {
return nil
}
return &paymenttypes.Money{
Amount: amount,
Currency: currency,
}
}
func cloneExternalRefs(refs []agg.ExternalRef) []agg.ExternalRef {
if len(refs) == 0 {
return nil
}
out := make([]agg.ExternalRef, 0, len(refs))
for i := range refs {
ref := refs[i]
ref.GatewayInstanceID = strings.TrimSpace(ref.GatewayInstanceID)
ref.Kind = strings.TrimSpace(ref.Kind)
ref.Ref = strings.TrimSpace(ref.Ref)
out = append(out, ref)
}
out.ExternalRefs = oshared.CloneExternalRefs(exec.ExternalRefs)
out.ExecutedMoney = svcshared.CloneMoneyTrimNonEmpty(exec.ExecutedMoney)
out.ConvertedMoney = svcshared.CloneMoneyTrimNonEmpty(exec.ConvertedMoney)
out.PlannedMoney = svcshared.CloneMoneyTrimNonEmpty(exec.PlannedMoney)
out.PlannedConvertedMoney = svcshared.CloneMoneyTrimNonEmpty(exec.PlannedConvertedMoney)
return out
}

View File

@@ -170,7 +170,7 @@ func TestCompile_ExternalToExternal_WithWalletFee_InsertsFeeStep(t *testing.T) {
}
}
func TestCompile_ExternalToExternal_IgnoresNonWalletFeeLines(t *testing.T) {
func TestCompile_ExternalToExternal_IncludesNonWalletFeeLines(t *testing.T) {
compiler := New()
graph, err := compiler.Compile(Input{
@@ -195,14 +195,18 @@ func TestCompile_ExternalToExternal_IgnoresNonWalletFeeLines(t *testing.T) {
if err != nil {
t.Fatalf("Compile returned error: %v", err)
}
if got, want := len(graph.Steps), 9; got != want {
t.Fatalf("expected 9 steps, got %d", got)
if got, want := len(graph.Steps), 10; got != want {
t.Fatalf("expected 10 steps, got %d", got)
}
feeCount := 0
for i := range graph.Steps {
if got := graph.Steps[i].Action; got == discovery.RailOperationFee {
t.Fatalf("unexpected fee step at index %d: %+v", i, graph.Steps[i])
if graph.Steps[i].Action == discovery.RailOperationFee {
feeCount++
}
}
if got, want := feeCount, 1; got != want {
t.Fatalf("fee steps mismatch: got=%d want=%d", got, want)
}
}
func TestCompile_InternalToExternal_UsesHoldAndSettlementBranches(t *testing.T) {
@@ -239,6 +243,51 @@ func TestCompile_InternalToExternal_UsesHoldAndSettlementBranches(t *testing.T)
}
}
func TestCompile_InternalToExternal_WithWalletFee_InsertsFeeStep(t *testing.T) {
compiler := New()
graph, err := compiler.Compile(Input{
IntentSnapshot: testIntent(model.PaymentKindPayout),
QuoteSnapshot: &model.PaymentQuoteSnapshot{
DebitAmount: &paymenttypes.Money{Amount: "100", Currency: "USDT"},
FeeLines: []*paymenttypes.FeeLine{
{
Money: &paymenttypes.Money{Amount: "1", Currency: "USDT"},
Side: paymenttypes.EntrySideDebit,
Meta: map[string]string{"fee_target": "wallet"},
},
},
Route: &paymenttypes.QuoteRouteSpecification{
Hops: []*paymenttypes.QuoteRouteHop{
{Index: 10, Rail: "LEDGER", Role: paymenttypes.QuoteRouteHopRoleSource},
{Index: 20, Rail: "CARD", Gateway: "gw-card", Role: paymenttypes.QuoteRouteHopRoleDestination},
},
},
},
})
if err != nil {
t.Fatalf("Compile returned error: %v", err)
}
if len(graph.Steps) != 7 {
t.Fatalf("expected 7 steps, got %d", len(graph.Steps))
}
assertStep(t, graph.Steps[0], "edge.10_20.ledger.block", discovery.RailOperationBlock, discovery.RailLedger, model.ReportVisibilityHidden)
assertStep(t, graph.Steps[1], "hop.10.ledger.fee", discovery.RailOperationFee, discovery.RailLedger, model.ReportVisibilityHidden)
assertStep(t, graph.Steps[2], "hop.20.card_payout.send", discovery.RailOperationSend, discovery.RailCardPayout, model.ReportVisibilityUser)
assertStep(t, graph.Steps[3], "hop.20.card_payout.observe", discovery.RailOperationObserveConfirm, discovery.RailCardPayout, model.ReportVisibilityUser)
assertStep(t, graph.Steps[4], "edge.10_20.ledger.debit", discovery.RailOperationExternalDebit, discovery.RailLedger, model.ReportVisibilityHidden)
assertStep(t, graph.Steps[5], "edge.10_20.ledger.release", discovery.RailOperationRelease, discovery.RailLedger, model.ReportVisibilityHidden)
assertStep(t, graph.Steps[6], "edge.10_20.ledger.release", discovery.RailOperationRelease, discovery.RailLedger, model.ReportVisibilityHidden)
if got, want := graph.Steps[1].DependsOn, []string{graph.Steps[0].StepRef}; !equalStringSlice(got, want) {
t.Fatalf("fee deps mismatch: got=%v want=%v", got, want)
}
if got, want := graph.Steps[2].DependsOn, []string{graph.Steps[1].StepRef}; !equalStringSlice(got, want) {
t.Fatalf("send deps mismatch: got=%v want=%v", got, want)
}
}
func TestCompile_ExternalToInternal_UsesCreditAfterObserve(t *testing.T) {
compiler := New()

View File

@@ -51,28 +51,28 @@ func (e *expansion) nextRef(base string) string {
return token + "_" + itoa(count+1)
}
func (e *expansion) needsWalletFeeStep(hop normalizedHop) bool {
if e == nil || len(e.walletFeeHops) == 0 {
func (e *expansion) needsFeeStep(hop normalizedHop) bool {
if e == nil || len(e.feeHops) == 0 {
return false
}
key := observedKey(hop)
if _, ok := e.walletFeeHops[key]; !ok {
if _, ok := e.feeHops[key]; !ok {
return false
}
if _, emitted := e.walletFeeEmitted[key]; emitted {
if _, emitted := e.feeEmitted[key]; emitted {
return false
}
return true
}
func (e *expansion) markWalletFeeEmitted(hop normalizedHop) {
func (e *expansion) markFeeEmitted(hop normalizedHop) {
if e == nil {
return
}
if e.walletFeeEmitted == nil {
e.walletFeeEmitted = map[string]struct{}{}
if e.feeEmitted == nil {
e.feeEmitted = map[string]struct{}{}
}
e.walletFeeEmitted[observedKey(hop)] = struct{}{}
e.feeEmitted[observedKey(hop)] = struct{}{}
}
func normalizeStep(step Step) Step {

View File

@@ -5,27 +5,26 @@ import (
"strings"
"github.com/shopspring/decimal"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
func planWalletFeeHops(
func planFeeHops(
hops []normalizedHop,
intent model.PaymentIntent,
quote *model.PaymentQuoteSnapshot,
) (map[string]struct{}, error) {
_, hasWalletFee, err := walletFeeAmountFromSnapshots(intent, quote)
_, hasFee, err := feeAmountFromSnapshots(intent, quote)
if err != nil {
return nil, err
}
if !hasWalletFee {
if !hasFee {
return nil, nil
}
sourceHop, ok := sourceCryptoHop(hops)
sourceHop, ok := sourceFeeHop(hops)
if !ok {
return nil, nil
}
@@ -34,19 +33,19 @@ func planWalletFeeHops(
}, nil
}
func sourceCryptoHop(hops []normalizedHop) (normalizedHop, bool) {
func sourceFeeHop(hops []normalizedHop) (normalizedHop, bool) {
for i := range hops {
if hops[i].rail == discovery.RailCrypto && hops[i].role == paymenttypes.QuoteRouteHopRoleSource {
if hops[i].role == paymenttypes.QuoteRouteHopRoleSource {
return hops[i], true
}
}
if len(hops) > 0 && hops[0].rail == discovery.RailCrypto {
if len(hops) > 0 {
return hops[0], true
}
return normalizedHop{}, false
}
func walletFeeAmountFromSnapshots(
func feeAmountFromSnapshots(
intent model.PaymentIntent,
quote *model.PaymentQuoteSnapshot,
) (*paymenttypes.Money, bool, error) {
@@ -62,10 +61,7 @@ func walletFeeAmountFromSnapshots(
total := decimal.Zero
currency := ""
for i, line := range quote.FeeLines {
if !isWalletDebitFeeLine(line) {
continue
}
money := line.GetMoney()
money := feeLineMoney(line)
if money == nil {
continue
}
@@ -80,7 +76,7 @@ func walletFeeAmountFromSnapshots(
if currency == "" {
currency = lineCurrency
} else if !strings.EqualFold(currency, lineCurrency) {
return nil, false, merrors.InvalidArgument("quote_snapshot.fee_lines wallet fee currency mismatch")
return nil, false, merrors.InvalidArgument("quote_snapshot.fee_lines fee currency mismatch")
}
amountRaw := strings.TrimSpace(money.GetAmount())
@@ -88,10 +84,12 @@ func walletFeeAmountFromSnapshots(
if err != nil {
return nil, false, merrors.InvalidArgument(fmt.Sprintf("quote_snapshot.fee_lines[%d].money.amount is invalid", i))
}
if amount.Sign() < 0 {
amount = amount.Neg()
amount = amount.Abs()
if amount.IsZero() {
continue
}
if amount.Sign() == 0 {
if line.GetSide() == paymenttypes.EntrySideCredit {
total = total.Sub(amount)
continue
}
total = total.Add(amount)
@@ -113,16 +111,9 @@ func feePlanningSourceAmount(intent model.PaymentIntent, quote *model.PaymentQuo
return intent.Amount
}
func isWalletDebitFeeLine(line *paymenttypes.FeeLine) bool {
func feeLineMoney(line *paymenttypes.FeeLine) *paymenttypes.Money {
if line == nil {
return false
return nil
}
if line.GetSide() != paymenttypes.EntrySideDebit {
return false
}
meta := line.Meta
if len(meta) == 0 {
return false
}
return strings.EqualFold(strings.TrimSpace(meta["fee_target"]), "wallet")
return line.GetMoney()
}

View File

@@ -30,16 +30,16 @@ type expansion struct {
lastMainRef string
refSeq map[string]int
externalObserved map[string]string
walletFeeHops map[string]struct{}
walletFeeEmitted map[string]struct{}
feeHops map[string]struct{}
feeEmitted map[string]struct{}
}
func newExpansion(walletFeeHops map[string]struct{}) *expansion {
func newExpansion(feeHops map[string]struct{}) *expansion {
return &expansion{
refSeq: map[string]int{},
externalObserved: map[string]string{},
walletFeeHops: walletFeeHops,
walletFeeEmitted: map[string]struct{}{},
feeHops: feeHops,
feeEmitted: map[string]struct{}{},
}
}
@@ -90,12 +90,12 @@ func (s *svc) Compile(in Input) (graph *Graph, err error) {
return nil, err
}
walletFeeHops, err := planWalletFeeHops(hops, in.IntentSnapshot, in.QuoteSnapshot)
feeHops, err := planFeeHops(hops, in.IntentSnapshot, in.QuoteSnapshot)
if err != nil {
return nil, err
}
ex := newExpansion(walletFeeHops)
ex := newExpansion(feeHops)
appendGuards(ex, conditions)
if len(hops) == 1 {

View File

@@ -18,7 +18,7 @@ func (s *svc) expandSingleHop(ex *expansion, hop normalizedHop, intent model.Pay
switch hop.role {
case paymenttypes.QuoteRouteHopRoleSource:
ex.appendMain(Step{
appendMainWithOptionalFee(ex, Step{
StepCode: singleHopCode(hop, "debit"),
Kind: StepKindFundsDebit,
Action: discovery.RailOperationDebit,
@@ -26,7 +26,7 @@ func (s *svc) expandSingleHop(ex *expansion, hop normalizedHop, intent model.Pay
HopIndex: hop.index,
HopRole: hop.role,
Visibility: model.ReportVisibilityHidden,
})
}, hop)
case paymenttypes.QuoteRouteHopRoleDestination:
ex.appendMain(Step{
StepCode: singleHopCode(hop, "credit"),
@@ -74,7 +74,7 @@ func (s *svc) applyDefaultBoundary(
if to.rail == discovery.RailCardPayout && len(targets) > 0 {
return s.applyBatchCardPayoutBoundary(ex, from, to, internalRail, intent, targets)
}
ex.appendMain(makeFundsBlockStep(from, to, internalRail))
appendMainWithOptionalFee(ex, makeFundsBlockStep(from, to, internalRail), from)
observeRef, err := s.ensureExternalObserved(ex, to, intent)
if err != nil {
return err
@@ -97,7 +97,7 @@ func (s *svc) applyDefaultBoundary(
return nil
case isInternalRail(from.rail) && isInternalRail(to.rail):
ex.appendMain(makeFundsMoveStep(from, to, internalRailForBoundary(from, to)))
appendMainWithOptionalFee(ex, makeFundsMoveStep(from, to, internalRailForBoundary(from, to)), from)
return nil
default:
@@ -113,7 +113,7 @@ func (s *svc) applyBatchCardPayoutBoundary(
intent model.PaymentIntent,
targets []batchmeta.PayoutTarget,
) error {
blockRef := ex.appendMain(makeFundsBlockStep(from, to, internalRail))
blockRef := appendMainWithOptionalFee(ex, makeFundsBlockStep(from, to, internalRail), from)
for i := range targets {
target := targets[i]
if target.Amount == nil {
@@ -148,13 +148,13 @@ func (s *svc) ensureExternalObserved(ex *expansion, hop normalizedHop, intent mo
sendStep := makeRailSendStep(hop, intent)
sendRef := ex.appendMain(sendStep)
observeDependency := sendRef
if ex.needsWalletFeeStep(hop) {
if ex.needsFeeStep(hop) {
feeStep := makeRailFeeStep(hop)
if observeDependency != "" {
feeStep.DependsOn = []string{observeDependency}
}
observeDependency = ex.appendMain(feeStep)
ex.markWalletFeeEmitted(hop)
ex.markFeeEmitted(hop)
}
observeStep := makeRailObserveStep(hop, intent)
@@ -167,6 +167,20 @@ func (s *svc) ensureExternalObserved(ex *expansion, hop normalizedHop, intent mo
return observeRef, nil
}
func appendMainWithOptionalFee(ex *expansion, step Step, hop normalizedHop) string {
mainRef := ex.appendMain(step)
if !ex.needsFeeStep(hop) {
return mainRef
}
feeStep := makeRailFeeStep(hop)
if strings.TrimSpace(mainRef) != "" {
feeStep.DependsOn = []string{mainRef}
}
feeRef := ex.appendMain(feeStep)
ex.markFeeEmitted(hop)
return feeRef
}
func makeRailFeeStep(hop normalizedHop) Step {
return Step{
StepCode: singleHopCode(hop, "fee"),