extended aurora scenarios + payment operation amounts

This commit is contained in:
Stephan D
2026-03-11 01:09:11 +01:00
parent e446486b77
commit 9ad2104d7d
46 changed files with 1057 additions and 193 deletions

View File

@@ -7,6 +7,7 @@ import (
"github.com/tech/sendico/pkg/db/storable"
"github.com/tech/sendico/pkg/mlogger"
pm "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
)
@@ -68,6 +69,8 @@ type StepExecution struct {
FailureCode string `bson:"failureCode,omitempty" json:"failureCode,omitempty"`
FailureMsg string `bson:"failureMsg,omitempty" json:"failureMsg,omitempty"`
ExternalRefs []ExternalRef `bson:"externalRefs,omitempty" json:"externalRefs,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executedMoney,omitempty"`
ConvertedMoney *paymenttypes.Money `bson:"convertedMoney,omitempty" json:"convertedMoney,omitempty"`
}
// ExternalRef links step execution to an external operation.

View File

@@ -6,6 +6,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
type normalizedEvent struct {
@@ -15,6 +16,7 @@ type normalizedEvent struct {
targetState agg.StepState `bson:"targetState"`
failureInfo *failureInfo `bson:"failure,omitempty"`
forceAggregate *forceAggregate `bson:"forceAggregate,omitempty"`
executedMoney *paymenttypes.Money
}
type failureInfo struct {
@@ -123,6 +125,7 @@ func normalizeGatewayEvent(src GatewayEvent) (*normalizedEvent, error) {
targetState: target,
failureInfo: buildFailureInfo(failureCode, failureMsg, normalizeTimePtr(src.OccurredAt)),
forceAggregate: buildForceAggregate(src.TerminalFailure, needsAttention),
executedMoney: normalizeEventMoney(src.ExecutedMoney),
}
ev.matchRefs = normalizeRefList([]agg.ExternalRef{
{
@@ -264,6 +267,21 @@ func normalizeCardStatus(status CardStatus) (CardStatus, bool) {
}
}
func normalizeEventMoney(money *paymenttypes.Money) *paymenttypes.Money {
if money == nil {
return nil
}
amount := strings.TrimSpace(money.GetAmount())
currency := strings.TrimSpace(money.GetCurrency())
if amount == "" || currency == "" {
return nil
}
return &paymenttypes.Money{
Amount: amount,
Currency: currency,
}
}
func mapFailureTarget(status any, retryable *bool) (agg.StepState, bool) {
switch status {
case GatewayStatusCreated, GatewayStatusProcessing, GatewayStatusWaiting:

View File

@@ -6,6 +6,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/pkg/mlogger"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.uber.org/zap"
)
@@ -81,6 +82,7 @@ type GatewayEvent struct {
TransferRef string
GatewayInstanceID string
Status GatewayStatus
ExecutedMoney *paymenttypes.Money
FailureCode string
FailureMsg string
Retryable *bool

View File

@@ -10,6 +10,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xerr"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
)
@@ -124,6 +125,7 @@ func (s *svc) applyStepEvent(step *agg.StepExecution, event *normalizedEvent, sm
if out.State == target {
changed = s.applyStepDiagnostics(&out, event) || changed
changed = applyExecutedMoney(&out, event) || changed
*step = out
return changed, nil
}
@@ -140,11 +142,43 @@ func (s *svc) applyStepEvent(step *agg.StepExecution, event *normalizedEvent, sm
out = next
changed = changed || transitionChanged
changed = s.applyStepDiagnostics(&out, event) || changed
changed = applyExecutedMoney(&out, event) || changed
*step = out
return changed, nil
}
func applyExecutedMoney(step *agg.StepExecution, event *normalizedEvent) bool {
if step == nil || event == nil {
return false
}
money := normalizeEventMoney(event.executedMoney)
if money == nil {
return false
}
if normalizeEventMoney(step.ExecutedMoney) != nil {
return false
}
if stepMoneyEqual(step.ExecutedMoney, money) {
return false
}
step.ExecutedMoney = money
return true
}
func stepMoneyEqual(left, right *paymenttypes.Money) bool {
left = normalizeEventMoney(left)
right = normalizeEventMoney(right)
switch {
case left == nil && right == nil:
return true
case left == nil || right == nil:
return false
default:
return left.Amount == right.Amount && left.Currency == right.Currency
}
}
func transitionStepState(step agg.StepExecution, target agg.StepState, sm ostate.StateMachine) (agg.StepExecution, bool, error) {
if step.State == target {
return step, false, nil

View File

@@ -7,6 +7,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.uber.org/zap"
)
@@ -32,6 +33,7 @@ func TestReconcile_GatewayWaiting_UpdatesRunningAndRefs(t *testing.T) {
TransferRef: "tx-1",
GatewayInstanceID: "gw-1",
Status: GatewayStatusWaiting,
ExecutedMoney: &paymenttypes.Money{Amount: "5.84", Currency: "USDT"},
},
},
})
@@ -61,6 +63,15 @@ func TestReconcile_GatewayWaiting_UpdatesRunningAndRefs(t *testing.T) {
if !hasRef(got.ExternalRefs, agg.ExternalRef{GatewayInstanceID: "gw-1", Kind: ExternalRefKindTransfer, Ref: "tx-1"}) {
t.Fatalf("expected transfer_ref external reference")
}
if got.ExecutedMoney == nil {
t.Fatal("expected executed money to be mapped")
}
if gotAmt, want := got.ExecutedMoney.Amount, "5.84"; gotAmt != want {
t.Fatalf("executed money amount mismatch: got=%q want=%q", gotAmt, want)
}
if gotCur, want := got.ExecutedMoney.Currency, "USDT"; gotCur != want {
t.Fatalf("executed money currency mismatch: got=%q want=%q", gotCur, want)
}
if out.Payment.State != agg.StateExecuting {
t.Fatalf("aggregate state mismatch: got=%q want=%q", out.Payment.State, agg.StateExecuting)
}
@@ -109,6 +120,54 @@ func TestReconcile_GatewaySuccess_SettlesPayment(t *testing.T) {
}
}
func TestReconcile_GatewayExecutedMoney_DoesNotOverrideExisting(t *testing.T) {
now := time.Date(2026, time.January, 10, 11, 12, 13, 0, time.UTC)
reconciler := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }})
out, err := reconciler.Reconcile(Input{
Payment: &agg.Payment{
PaymentRef: "p1",
State: agg.StateCreated,
Version: 1,
StepExecutions: []agg.StepExecution{
{
StepRef: "s1",
StepCode: "observe",
State: agg.StepStatePending,
Attempt: 1,
ExecutedMoney: &paymenttypes.Money{
Amount: "76.50",
Currency: "RUB",
},
},
},
},
Event: Event{
Gateway: &GatewayEvent{
StepRef: "s1",
Status: GatewayStatusSuccess,
ExecutedMoney: &paymenttypes.Money{
Amount: "7650",
Currency: "RUB",
},
},
},
})
if err != nil {
t.Fatalf("Reconcile returned error: %v", err)
}
step := out.Payment.StepExecutions[0]
if step.ExecutedMoney == nil {
t.Fatal("expected executed money")
}
if got, want := step.ExecutedMoney.Amount, "76.50"; got != want {
t.Fatalf("executed money amount mismatch: got=%q want=%q", got, want)
}
if got, want := step.ExecutedMoney.Currency, "RUB"; got != want {
t.Fatalf("executed money currency mismatch: got=%q want=%q", got, want)
}
}
func TestReconcile_GatewayFailureMapping(t *testing.T) {
now := time.Date(2026, time.January, 10, 11, 12, 13, 0, time.UTC)
reconciler := New(Dependencies{Logger: zap.NewNop(), Now: func() time.Time { return now }})

View File

@@ -9,6 +9,7 @@ import (
"github.com/tech/sendico/pkg/db/storable"
pm "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.mongodb.org/mongo-driver/v2/bson"
)
@@ -108,6 +109,8 @@ func cloneStepExecutions(src []agg.StepExecution) []agg.StepExecution {
step.Attempt = 1
}
step.ExternalRefs = cloneExternalRefs(step.ExternalRefs)
step.ExecutedMoney = cloneStepMoney(step.ExecutedMoney)
step.ConvertedMoney = cloneStepMoney(step.ConvertedMoney)
step.StartedAt = cloneTime(step.StartedAt)
step.CompletedAt = cloneTime(step.CompletedAt)
out = append(out, step)
@@ -115,6 +118,21 @@ func cloneStepExecutions(src []agg.StepExecution) []agg.StepExecution {
return out
}
func cloneStepMoney(money *paymenttypes.Money) *paymenttypes.Money {
if money == nil {
return nil
}
amount := strings.TrimSpace(money.GetAmount())
currency := strings.TrimSpace(money.GetCurrency())
if amount == "" || currency == "" {
return nil
}
return &paymenttypes.Money{
Amount: amount,
Currency: currency,
}
}
func cloneExternalRefs(refs []agg.ExternalRef) []agg.ExternalRef {
if len(refs) == 0 {
return nil

View File

@@ -118,6 +118,24 @@ func TestMap_Success(t *testing.T) {
if got, want := steps[0].GetUserLabel(), "Card payout"; got != want {
t.Fatalf("user_label mismatch: got=%q want=%q", got, want)
}
if steps[0].GetExecutedMoney() == nil {
t.Fatal("expected executed_money to be mapped")
}
if got, want := steps[0].GetExecutedMoney().GetAmount(), "95"; got != want {
t.Fatalf("executed_money.amount mismatch: got=%q want=%q", got, want)
}
if got, want := steps[0].GetExecutedMoney().GetCurrency(), "USD"; got != want {
t.Fatalf("executed_money.currency mismatch: got=%q want=%q", got, want)
}
if steps[0].GetConvertedMoney() == nil {
t.Fatal("expected converted_money to be mapped")
}
if got, want := steps[0].GetConvertedMoney().GetAmount(), "90"; got != want {
t.Fatalf("converted_money.amount mismatch: got=%q want=%q", got, want)
}
if got, want := steps[0].GetConvertedMoney().GetCurrency(), "EUR"; got != want {
t.Fatalf("converted_money.currency mismatch: got=%q want=%q", got, want)
}
if got, want := steps[1].GetReportVisibility(), orchestrationv2.ReportVisibility_REPORT_VISIBILITY_HIDDEN; got != want {
t.Fatalf("report_visibility mismatch: got=%s want=%s", got.String(), want.String())
}
@@ -363,7 +381,15 @@ func newPaymentFixture() *agg.Payment {
UserLabel: " Card payout ",
State: agg.StepStateRunning,
Attempt: 0,
StartedAt: &startedAt,
ExecutedMoney: &paymenttypes.Money{
Amount: "95",
Currency: "USD",
},
ConvertedMoney: &paymenttypes.Money{
Amount: "90",
Currency: "EUR",
},
StartedAt: &startedAt,
},
{
StepRef: "s2",

View File

@@ -46,6 +46,8 @@ func mapStepExecution(step agg.StepExecution, index int) (*orchestrationv2.StepE
Refs: mapExternalRefs(step.StepCode, step.ExternalRefs),
ReportVisibility: mapReportVisibility(step.ReportVisibility),
UserLabel: strings.TrimSpace(step.UserLabel),
ExecutedMoney: moneyToProto(step.ExecutedMoney),
ConvertedMoney: moneyToProto(step.ConvertedMoney),
}, nil
}

View File

@@ -85,6 +85,8 @@ func (defaultObserveConfirmExecutor) ExecuteObserveConfirm(_ context.Context, re
step := req.StepExecution
step.State = agg.StepStateRunning
step.ExternalRefs = refs
step.ExecutedMoney = inheritedExecutedMoney(req.Payment, req.Step, req.StepExecution)
step.ConvertedMoney = inheritedConvertedMoney(req.Payment, req.Step, req.StepExecution)
step.FailureCode = ""
step.FailureMsg = ""
return &sexec.ExecuteOutput{
@@ -198,6 +200,46 @@ func inheritedExternalRefs(payment *agg.Payment, step xplan.Step, current agg.St
return refs
}
func inheritedExecutedMoney(payment *agg.Payment, step xplan.Step, current agg.StepExecution) *paymenttypes.Money {
if money := cloneStepMoney(current.ExecutedMoney); money != nil {
return money
}
if payment == nil || len(step.DependsOn) == 0 {
return nil
}
index := stepIndexByRef(payment.StepExecutions)
for i := range step.DependsOn {
idx, ok := index[strings.TrimSpace(step.DependsOn[i])]
if !ok || idx < 0 || idx >= len(payment.StepExecutions) {
continue
}
if money := cloneStepMoney(payment.StepExecutions[idx].ExecutedMoney); money != nil {
return money
}
}
return nil
}
func inheritedConvertedMoney(payment *agg.Payment, step xplan.Step, current agg.StepExecution) *paymenttypes.Money {
if money := cloneStepMoney(current.ConvertedMoney); money != nil {
return money
}
if payment == nil || len(step.DependsOn) == 0 {
return nil
}
index := stepIndexByRef(payment.StepExecutions)
for i := range step.DependsOn {
idx, ok := index[strings.TrimSpace(step.DependsOn[i])]
if !ok || idx < 0 || idx >= len(payment.StepExecutions) {
continue
}
if money := cloneStepMoney(payment.StepExecutions[idx].ConvertedMoney); money != nil {
return money
}
}
return nil
}
func appendExternalRefs(existing []agg.ExternalRef, additions ...agg.ExternalRef) []agg.ExternalRef {
out := append([]agg.ExternalRef{}, existing...)
seen := map[string]struct{}{}

View File

@@ -122,6 +122,14 @@ func TestDefaultObserveConfirmExecutor_InheritsDependencyRefs(t *testing.T) {
StepExecutions: []agg.StepExecution{
{
StepRef: "hop_1_crypto_send",
ExecutedMoney: &paymenttypes.Money{
Amount: "1.000000",
Currency: "USDT",
},
ConvertedMoney: &paymenttypes.Money{
Amount: "95.00",
Currency: "EUR",
},
ExternalRefs: []agg.ExternalRef{
{GatewayInstanceID: "crypto-gw", Kind: "operation_ref", Ref: "op-1"},
{GatewayInstanceID: "crypto-gw", Kind: "transfer_ref", Ref: "trf-1"},
@@ -161,4 +169,22 @@ func TestDefaultObserveConfirmExecutor_InheritsDependencyRefs(t *testing.T) {
if got, want := out.StepExecution.ExternalRefs[0].Ref, "op-1"; got != want {
t.Fatalf("first external ref value mismatch: got=%q want=%q", got, want)
}
if out.StepExecution.ExecutedMoney == nil {
t.Fatal("expected executed money to be inherited from dependency")
}
if got, want := out.StepExecution.ExecutedMoney.Amount, "1.000000"; got != want {
t.Fatalf("executed money amount mismatch: got=%q want=%q", got, want)
}
if got, want := out.StepExecution.ExecutedMoney.Currency, "USDT"; got != want {
t.Fatalf("executed money currency mismatch: got=%q want=%q", got, want)
}
if out.StepExecution.ConvertedMoney == nil {
t.Fatal("expected converted money to be inherited from dependency")
}
if got, want := out.StepExecution.ConvertedMoney.Amount, "95.00"; got != want {
t.Fatalf("converted money amount mismatch: got=%q want=%q", got, want)
}
if got, want := out.StepExecution.ConvertedMoney.Currency, "EUR"; got != want {
t.Fatalf("converted money currency mismatch: got=%q want=%q", got, want)
}
}

View File

@@ -325,6 +325,8 @@ func normalizeExecutorOutput(current agg.StepExecution, out *sexec.ExecuteOutput
next.Attempt = out.StepExecution.Attempt
}
next.ExternalRefs = out.StepExecution.ExternalRefs
next.ExecutedMoney = cloneStepMoney(out.StepExecution.ExecutedMoney)
next.ConvertedMoney = cloneStepMoney(out.StepExecution.ConvertedMoney)
next.FailureCode = strings.TrimSpace(out.StepExecution.FailureCode)
next.FailureMsg = strings.TrimSpace(out.StepExecution.FailureMsg)
@@ -437,6 +439,12 @@ func stepExecutionEqual(left, right agg.StepExecution) bool {
return false
}
}
if !stepMoneyEqual(left.ExecutedMoney, right.ExecutedMoney) {
return false
}
if !stepMoneyEqual(left.ConvertedMoney, right.ConvertedMoney) {
return false
}
return true
}

View File

@@ -0,0 +1,35 @@
package psvc
import (
"strings"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
func cloneStepMoney(src *paymenttypes.Money) *paymenttypes.Money {
if src == nil {
return nil
}
amount := strings.TrimSpace(src.GetAmount())
currency := strings.TrimSpace(src.GetCurrency())
if amount == "" || currency == "" {
return nil
}
return &paymenttypes.Money{
Amount: amount,
Currency: currency,
}
}
func stepMoneyEqual(left, right *paymenttypes.Money) bool {
left = cloneStepMoney(left)
right = cloneStepMoney(right)
switch {
case left == nil && right == nil:
return true
case left == nil || right == nil:
return false
default:
return left.Amount == right.Amount && left.Currency == right.Currency
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/tech/sendico/payments/storage/model"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
func (s *svc) prepareInput(in Input) (*preparedInput, error) {
@@ -174,6 +175,8 @@ func (s *svc) normalizeStepExecution(exec agg.StepExecution, index int) (agg.Ste
exec.Rail = model.ParseRail(string(exec.Rail))
exec.ReportVisibility = model.NormalizeReportVisibility(exec.ReportVisibility)
exec.ExternalRefs = cloneExternalRefs(exec.ExternalRefs)
exec.ExecutedMoney = cloneStepMoney(exec.ExecutedMoney)
exec.ConvertedMoney = cloneStepMoney(exec.ConvertedMoney)
if exec.StepRef == "" {
return agg.StepExecution{}, merrors.InvalidArgument("stepExecutions[" + itoa(index) + "].step_ref is required")
}
@@ -257,9 +260,26 @@ func normalizeStepState(state agg.StepState) (agg.StepState, bool) {
func cloneStepExecution(exec agg.StepExecution) agg.StepExecution {
out := exec
out.ExternalRefs = cloneExternalRefs(exec.ExternalRefs)
out.ExecutedMoney = cloneStepMoney(exec.ExecutedMoney)
out.ConvertedMoney = cloneStepMoney(exec.ConvertedMoney)
return out
}
func cloneStepMoney(money *paymenttypes.Money) *paymenttypes.Money {
if money == nil {
return nil
}
amount := strings.TrimSpace(money.GetAmount())
currency := strings.TrimSpace(money.GetCurrency())
if amount == "" || currency == "" {
return nil
}
return &paymenttypes.Money{
Amount: amount,
Currency: currency,
}
}
func cloneExternalRefs(refs []agg.ExternalRef) []agg.ExternalRef {
if len(refs) == 0 {
return nil