Merge pull request 'payments orchestrator refactoring' (#370) from payments-368 into main
Some checks failed
ci/woodpecker/push/db Pipeline is pending
ci/woodpecker/push/discovery Pipeline is pending
ci/woodpecker/push/frontend Pipeline is pending
ci/woodpecker/push/fx_ingestor Pipeline is pending
ci/woodpecker/push/fx_oracle Pipeline is pending
ci/woodpecker/push/gateway_chain Pipeline is pending
ci/woodpecker/push/gateway_mntx Pipeline is pending
ci/woodpecker/push/gateway_tgsettle Pipeline is pending
ci/woodpecker/push/ledger Pipeline is pending
ci/woodpecker/push/nats Pipeline is pending
ci/woodpecker/push/notification Pipeline is pending
ci/woodpecker/push/payments_orchestrator Pipeline is pending
ci/woodpecker/push/billing_fees Pipeline failed
ci/woodpecker/push/bff Pipeline failed

Reviewed-on: #370
This commit was merged in pull request #370.
This commit is contained in:
2026-01-30 16:02:00 +00:00
36 changed files with 1371 additions and 368 deletions

BIN
api/notification/app Executable file

Binary file not shown.

View File

@@ -0,0 +1,46 @@
root = "."
testdata_dir = "testdata"
tmp_dir = "tmp"
[build]
args_bin = []
entrypoint = "./tmp/main"
cmd = "go build -o ./tmp/main ."
delay = 1000
exclude_dir = ["assets", "tmp", "vendor", "testdata"]
exclude_file = []
exclude_regex = ["_test.go", "_templ.go"]
exclude_unchanged = false
follow_symlink = false
full_bin = ""
include_dir = []
include_ext = ["go", "tpl", "tmpl", "html"]
include_file = []
kill_delay = "0s"
log = "build-errors.log"
poll = false
poll_interval = 0
post_cmd = []
pre_cmd = []
rerun = false
rerun_delay = 500
send_interrupt = false
stop_on_error = false
[color]
app = ""
build = "yellow"
main = "magenta"
runner = "green"
watcher = "cyan"
[log]
main_only = false
time = false
[misc]
clean_on_exit = false
[screen]
clear_on_rebuild = false
keep_scroll = true

View File

@@ -0,0 +1,50 @@
runtime:
shutdown_timeout_seconds: 15
grpc:
network: tcp
address: ":50062"
advertise_host: "dev-payments-orchestrator"
enable_reflection: true
enable_health: true
metrics:
address: ":9403"
database:
driver: mongodb
settings:
host_env: PAYMENTS_MONGO_HOST
port_env: PAYMENTS_MONGO_PORT
database_env: PAYMENTS_MONGO_DATABASE
user_env: PAYMENTS_MONGO_USER
password_env: PAYMENTS_MONGO_PASSWORD
auth_source_env: PAYMENTS_MONGO_AUTH_SOURCE
replica_set_env: PAYMENTS_MONGO_REPLICA_SET
messaging:
driver: NATS
settings:
url_env: NATS_URL
host_env: NATS_HOST
port_env: NATS_PORT
username_env: NATS_USER
password_env: NATS_PASSWORD
broker_name: Payments Orchestrator Service
max_reconnects: 10
reconnect_wait: 5
buffer_size: 1024
# Retain quote records after expiry to allow long-running payments to complete.
quote_retention_hours: 72
# Service endpoints are sourced from discovery; no static overrides.
card_gateways:
monetix:
funding_address: "TUaWaCkiXwYPKm5qjcB27Lhwv976vPvedE"
fee_wallet_ref: "697a062a248dc785125ccb9e"
fee_ledger_accounts:
monetix: "697a15cc72e95c92d4c5db01"
# Gateway instances and capabilities are sourced from service discovery.

View File

@@ -1,6 +1,6 @@
module github.com/tech/sendico/payments/orchestrator
go 1.25.3
go 1.25.6
replace github.com/tech/sendico/pkg => ../../pkg
@@ -107,5 +107,5 @@ require (
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260122232226-8e98ce8d340d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
)

View File

@@ -264,8 +264,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda h1:+2XxjfsAu6vqFxwGBRcHiMaDCuZiqXGDUDVWVtrFAnE=
google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260122232226-8e98ce8d340d h1:xXzuihhT3gL/ntduUZwHECzAn57E8dA6l8SOtYWdD8Q=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260122232226-8e98ce8d340d/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1:H86B94AW+VfJWDqFeEbBPhEtHzJwJfTbgE2lZa54ZAQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=

View File

@@ -431,17 +431,23 @@ func parseDiscoveryEndpoint(raw string) (discoveryEndpoint, error) {
if raw == "" {
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri is required")
}
parsed, err := url.Parse(raw)
if err != nil || parsed.Scheme == "" {
// No scheme: treat as a gRPC target (default to insecure grpc).
if !strings.Contains(raw, "://") {
if _, _, splitErr := net.SplitHostPort(raw); splitErr != nil {
if err != nil {
return discoveryEndpoint{}, err
}
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
}
return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil
}
parsed, err := url.Parse(raw)
if err != nil || parsed.Scheme == "" {
if err != nil {
return discoveryEndpoint{}, err
}
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
}
scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme))
switch scheme {
case "grpc":

View File

@@ -166,6 +166,22 @@ func (c *discoveryLedgerClient) GetStatement(ctx context.Context, req *ledgerv1.
return client.GetStatement(ctx, req)
}
func (c *discoveryLedgerClient) BlockAccount(ctx context.Context, req *ledgerv1.BlockAccountRequest) (*ledgerv1.BlockAccountResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.BlockAccount(ctx, req)
}
func (c *discoveryLedgerClient) UnblockAccount(ctx context.Context, req *ledgerv1.UnblockAccountRequest) (*ledgerv1.UnblockAccountResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.UnblockAccount(ctx, req)
}
func (c *discoveryLedgerClient) Close() error {
if c == nil || c.resolver == nil {
return nil

View File

@@ -467,6 +467,12 @@ func protoRailOperationFromModel(action model.RailOperation) gatewayv1.RailOpera
return gatewayv1.RailOperation_RAIL_OPERATION_DEBIT
case string(model.RailOperationCredit):
return gatewayv1.RailOperation_RAIL_OPERATION_CREDIT
case string(model.RailOperationExternalDebit):
return gatewayv1.RailOperation_RAIL_OPERATION_DEBIT
case string(model.RailOperationExternalCredit):
return gatewayv1.RailOperation_RAIL_OPERATION_CREDIT
case string(model.RailOperationMove):
return gatewayv1.RailOperation_RAIL_OPERATION_MOVE
case string(model.RailOperationSend):
return gatewayv1.RailOperation_RAIL_OPERATION_SEND
case string(model.RailOperationFee):

View File

@@ -708,7 +708,7 @@ func (h *initiatePaymentCommand) Execute(ctx context.Context, req *orchestratorv
}
h.logger.Debug(
"Initiate payment request accepted",
zap.String("org_ref", orgID.Hex()),
mzap.ObjRef("org_ref", orgID),
zap.String("idempotency_key", idempotencyKey),
zap.String("quote_ref", quoteRef),
zap.Bool("has_intent", hasIntent),
@@ -723,7 +723,7 @@ func (h *initiatePaymentCommand) Execute(ctx context.Context, req *orchestratorv
h.logger.Debug(
"idempotent payment request reused",
zap.String("payment_ref", existing.PaymentRef),
zap.String("org_ref", orgID.Hex()),
mzap.ObjRef("org_ref", orgID),
zap.String("idempotency_key", idempotencyKey),
zap.String("quote_ref", quoteRef),
)
@@ -763,7 +763,7 @@ func (h *initiatePaymentCommand) Execute(ctx context.Context, req *orchestratorv
}
h.logger.Debug(
"Payment quote resolved",
zap.String("org_ref", orgID.Hex()),
mzap.ObjRef("org_ref", orgID),
zap.String("quote_ref", quoteRef),
zap.Bool("quote_ref_used", quoteRef != ""),
)
@@ -784,7 +784,7 @@ func (h *initiatePaymentCommand) Execute(ctx context.Context, req *orchestratorv
h.logger.Info(
"Payment initiated",
zap.String("payment_ref", entity.PaymentRef),
zap.String("org_ref", orgID.Hex()),
mzap.ObjRef("org_ref", orgID),
zap.String("kind", resolvedIntent.GetKind().String()),
zap.String("quote_ref", quoteSnapshot.GetQuoteRef()),
zap.String("idempotency_key", idempotencyKey),
@@ -869,7 +869,7 @@ func (h *initiateConversionCommand) Execute(ctx context.Context, req *orchestrat
}
if existing, err := getPaymentByIdempotencyKey(ctx, store, orgID, idempotencyKey); err == nil && existing != nil {
h.logger.Debug("Idempotent conversion request reused", zap.String("payment_ref", existing.PaymentRef), zap.String("org_ref", orgID.Hex()))
h.logger.Debug("Idempotent conversion request reused", zap.String("payment_ref", existing.PaymentRef), mzap.ObjRef("org_ref", orgID))
return gsresponse.Success(&orchestratorv1.InitiateConversionResponse{Conversion: toProtoPayment(existing)})
} else if err != nil && !errors.Is(err, storage.ErrPaymentNotFound) {
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
@@ -913,7 +913,7 @@ func (h *initiateConversionCommand) Execute(ctx context.Context, req *orchestrat
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info("Conversion initiated", zap.String("payment_ref", entity.PaymentRef), zap.String("org_ref", orgID.Hex()))
h.logger.Info("Conversion initiated", zap.String("payment_ref", entity.PaymentRef), mzap.ObjRef("org_ref", orgID))
return gsresponse.Success(&orchestratorv1.InitiateConversionResponse{
Conversion: toProtoPayment(entity),
})

View File

@@ -25,6 +25,13 @@ type moneyGetter interface {
GetCurrency() string
}
const (
feeLineMetaTarget = "fee_target"
feeLineTargetWallet = "wallet"
feeLineMetaWalletRef = "fee_wallet_ref"
feeLineMetaWalletType = "fee_wallet_type"
)
func cloneProtoMoney(input *moneyv1.Money) *moneyv1.Money {
if input == nil {
return nil
@@ -303,13 +310,63 @@ func quoteToProto(src *oracleclient.Quote) *oraclev1.Quote {
}
}
func setFeeLineTarget(lines []*feesv1.DerivedPostingLine, target string) {
target = strings.TrimSpace(target)
if target == "" || len(lines) == 0 {
return
}
for _, line := range lines {
if line == nil {
continue
}
if line.Meta == nil {
line.Meta = map[string]string{}
}
line.Meta[feeLineMetaTarget] = target
if strings.EqualFold(target, feeLineTargetWallet) {
line.LedgerAccountRef = ""
}
}
}
func feeLineTarget(line *feesv1.DerivedPostingLine) string {
if line == nil {
return ""
}
return strings.TrimSpace(line.GetMeta()[feeLineMetaTarget])
}
func isWalletTargetFeeLine(line *feesv1.DerivedPostingLine) bool {
return strings.EqualFold(feeLineTarget(line), feeLineTargetWallet)
}
func setFeeLineWalletRef(lines []*feesv1.DerivedPostingLine, walletRef, walletType string) {
walletRef = strings.TrimSpace(walletRef)
walletType = strings.TrimSpace(walletType)
if walletRef == "" || len(lines) == 0 {
return
}
for _, line := range lines {
if line == nil {
continue
}
if line.Meta == nil {
line.Meta = map[string]string{}
}
line.Meta[feeLineMetaWalletRef] = walletRef
if walletType != "" {
line.Meta[feeLineMetaWalletType] = walletType
}
}
}
func ledgerChargesFromFeeLines(lines []*feesv1.DerivedPostingLine) []*ledgerv1.PostingLine {
if len(lines) == 0 {
return nil
}
charges := make([]*ledgerv1.PostingLine, 0, len(lines))
for _, line := range lines {
if line == nil || strings.TrimSpace(line.GetLedgerAccountRef()) == "" {
if line == nil || isWalletTargetFeeLine(line) || strings.TrimSpace(line.GetLedgerAccountRef()) == "" {
continue
}
money := cloneProtoMoney(line.GetMoney())
@@ -408,7 +465,7 @@ func assignLedgerAccounts(lines []*feesv1.DerivedPostingLine, account string) []
return lines
}
for _, line := range lines {
if line == nil {
if line == nil || isWalletTargetFeeLine(line) {
continue
}
if strings.TrimSpace(line.GetLedgerAccountRef()) != "" {

View File

@@ -3,6 +3,8 @@ package orchestrator
import (
"testing"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
accountingv1 "github.com/tech/sendico/pkg/proto/common/accounting/v1"
fxv1 "github.com/tech/sendico/pkg/proto/common/fx/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
@@ -77,3 +79,29 @@ func TestComputeAggregatesRecipientPaysFee(t *testing.T) {
t.Fatalf("expected settlement 7376.76 RUB, got %s %s", settlement.GetCurrency(), settlement.GetAmount())
}
}
func TestLedgerChargesFromFeeLinesSkipsWalletTarget(t *testing.T) {
lines := []*feesv1.DerivedPostingLine{
{
LedgerAccountRef: "ledger:fees",
Money: &moneyv1.Money{Currency: "USDT", Amount: "0.7"},
LineType: accountingv1.PostingLineType_POSTING_LINE_FEE,
Meta: map[string]string{
feeLineMetaTarget: feeLineTargetWallet,
},
},
{
LedgerAccountRef: "ledger:fees",
Money: &moneyv1.Money{Currency: "USDT", Amount: "1.0"},
LineType: accountingv1.PostingLineType_POSTING_LINE_FEE,
},
}
charges := ledgerChargesFromFeeLines(lines)
if len(charges) != 1 {
t.Fatalf("expected 1 ledger charge, got %d", len(charges))
}
if charges[0].GetMoney().GetAmount() != "1.0" {
t.Fatalf("expected remaining charge amount 1.0, got %s", charges[0].GetMoney().GetAmount())
}
}

View File

@@ -169,16 +169,14 @@ func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.P
},
}
if g.logger != nil {
g.logger.Info("Rail gateway resolved",
zap.String("step_id", strings.TrimSpace(step.StepID)),
zap.String("action", string(step.Action)),
zap.String("gateway_id", entry.ID),
zap.String("instance_id", entry.InstanceID),
zap.String("rail", string(entry.Rail)),
zap.String("network", entry.Network),
zap.String("invoke_uri", invokeURI))
}
g.logger.Info("Rail gateway resolved",
zap.String("step_id", strings.TrimSpace(step.StepID)),
zap.String("action", string(step.Action)),
zap.String("gateway_id", entry.ID),
zap.String("instance_id", entry.InstanceID),
zap.String("rail", string(entry.Rail)),
zap.String("network", entry.Network),
zap.String("invoke_uri", invokeURI))
switch entry.Rail {
case model.RailProviderSettlement:
@@ -398,7 +396,7 @@ func WithGatewayRegistry(registry GatewayRegistry) Option {
s.deps.railGateways.providerResolver = s.deps.gatewayInvokeResolver
s.deps.railGateways.logger = s.logger.Named("rail_gateways")
if s.deps.planBuilder == nil {
s.deps.planBuilder = &defaultPlanBuilder{}
s.deps.planBuilder = newDefaultPlanBuilder(s.logger)
}
}
}

View File

@@ -45,7 +45,7 @@ func (p *paymentExecutor) executePayment(ctx context.Context, store storage.Paym
}
builder := p.svc.deps.planBuilder
if builder == nil {
builder = &defaultPlanBuilder{}
builder = newDefaultPlanBuilder(p.logger)
}
plan, err := builder.Build(ctx, payment, quote, routeStore, planTemplates, p.svc.deps.gatewayRegistry)
if err != nil {

View File

@@ -7,11 +7,12 @@ import (
"github.com/shopspring/decimal"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
pmodel "github.com/tech/sendico/pkg/model"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
)
func (p *paymentExecutor) submitCardPayoutPlan(ctx context.Context, payment *model.Payment, amount *moneyv1.Money) (string, error) {
func (p *paymentExecutor) submitCardPayoutPlan(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, fromRole, toRole *pmodel.AccountRole) (string, error) {
if payment == nil {
return "", merrors.InvalidArgument("payment is required")
}
@@ -37,6 +38,18 @@ func (p *paymentExecutor) submitCardPayoutPlan(ctx context.Context, payment *mod
currency := strings.TrimSpace(amount.GetCurrency())
holder := strings.TrimSpace(card.Cardholder)
meta := cloneMetadata(payment.Metadata)
if strings.TrimSpace(string(mergeAccountRole(fromRole))) != "" {
if meta == nil {
meta = map[string]string{}
}
meta[pmodel.MetadataKeyFromRole] = strings.TrimSpace(string(mergeAccountRole(fromRole)))
}
if strings.TrimSpace(string(mergeAccountRole(toRole))) != "" {
if meta == nil {
meta = map[string]string{}
}
meta[pmodel.MetadataKeyToRole] = strings.TrimSpace(string(mergeAccountRole(toRole)))
}
customer := intent.Customer
customerID := ""
customerFirstName := ""
@@ -146,6 +159,13 @@ func (p *paymentExecutor) submitCardPayoutPlan(ctx context.Context, payment *mod
return exec.CardPayoutRef, nil
}
func mergeAccountRole(role *pmodel.AccountRole) pmodel.AccountRole {
if role == nil {
return ""
}
return pmodel.AccountRole(strings.TrimSpace(string(*role)))
}
func (p *paymentExecutor) resolveCardRoute(intent model.PaymentIntent) (CardGatewayRoute, error) {
if p.svc != nil {
return p.svc.cardRoute(p.gatewayKeyFromIntent(intent))

View File

@@ -5,12 +5,13 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
pmodel "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/payments/rail"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
)
func (p *paymentExecutor) buildCryptoTransferRequest(payment *model.Payment, amount *paymenttypes.Money, action model.RailOperation, idempotencyKey string, quote *orchestratorv1.PaymentQuote) (rail.TransferRequest, error) {
func (p *paymentExecutor) buildCryptoTransferRequest(payment *model.Payment, amount *paymenttypes.Money, action model.RailOperation, idempotencyKey string, quote *orchestratorv1.PaymentQuote, fromRole, toRole *pmodel.AccountRole) (rail.TransferRequest, error) {
if payment == nil {
return rail.TransferRequest{}, merrors.InvalidArgument("chain: payment is required")
}
@@ -37,15 +38,18 @@ func (p *paymentExecutor) buildCryptoTransferRequest(payment *model.Payment, amo
ClientReference: payment.PaymentRef,
DestinationMemo: memo,
}
if fromRole != nil {
req.FromRole = *fromRole
}
if toRole != nil {
req.ToRole = *toRole
}
if req.Currency == "" || req.Amount == "" {
return rail.TransferRequest{}, merrors.InvalidArgument("chain: amount is required")
}
if req.IdempotencyKey == "" {
return rail.TransferRequest{}, merrors.InvalidArgument("chain: idempotency_key is required")
}
if action == model.RailOperationSend && quote != nil {
req.Fees = feeBreakdownFromQuote(quote)
}
return req, nil
}

View File

@@ -72,6 +72,10 @@ func (p *paymentExecutor) executePaymentPlan(ctx context.Context, store storage.
return p.failPayment(ctx, store, payment, model.PaymentFailureCodePolicy, strings.TrimSpace(err.Error()), err)
}
if blocked {
if step.CommitPolicy == model.CommitPolicyAfterFailure && commitAfterDependenciesSucceeded(step, execSteps) {
setExecutionStepStatus(execStep, executionStepStatusSkipped)
continue
}
payment.State = model.PaymentStateFailed
payment.FailureCode = failureCodeForStep(step)
return p.persistPayment(ctx, store, payment)

View File

@@ -9,13 +9,16 @@ import (
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
mo "github.com/tech/sendico/pkg/model"
pmodel "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/payments/rail"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
)
func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
@@ -35,19 +38,32 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
},
}
debitCalls := 0
creditCalls := 0
moveCalls := 0
pendingAccountID := "ledger:pending"
operatingAccountID := "ledger:operating"
transitAccountID := "ledger:transit"
ledgerFake := &ledgerclient.Fake{
CreateTransactionFn: func(ctx context.Context, tx rail.LedgerTx) (string, error) {
if strings.EqualFold(tx.FromRail, "LEDGER") {
debitCalls++
return "debit-1", nil
}
if strings.EqualFold(tx.ToRail, "LEDGER") {
creditCalls++
return "credit-1", nil
}
return "", nil
ListConnectorAccountsFn: func(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
details, _ := structpb.NewStruct(map[string]interface{}{
"role": "ACCOUNT_ROLE_PENDING",
})
detailsOperating, _ := structpb.NewStruct(map[string]interface{}{
"role": "ACCOUNT_ROLE_OPERATING",
})
detailsTransit, _ := structpb.NewStruct(map[string]interface{}{
"role": "ACCOUNT_ROLE_TRANSIT",
})
return &connectorv1.ListAccountsResponse{
Accounts: []*connectorv1.Account{
{Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: pendingAccountID}, Kind: connectorv1.AccountKind_LEDGER_ACCOUNT, Asset: "USDT", ProviderDetails: details},
{Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: operatingAccountID}, Kind: connectorv1.AccountKind_LEDGER_ACCOUNT, Asset: "USDT", ProviderDetails: detailsOperating},
{Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: transitAccountID}, Kind: connectorv1.AccountKind_LEDGER_ACCOUNT, Asset: "USDT", ProviderDetails: detailsTransit},
},
}, nil
},
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
moveCalls++
return &ledgerv1.PostResponse{JournalEntryRef: "move-1"}, nil
},
}
@@ -125,9 +141,9 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
{StepID: "crypto_send", Rail: model.RailCrypto, Action: model.RailOperationSend, Amount: &paymenttypes.Money{Currency: "USDT", Amount: "100"}},
{StepID: "crypto_fee", Rail: model.RailCrypto, Action: model.RailOperationFee, DependsOn: []string{"crypto_send"}, Amount: &paymenttypes.Money{Currency: "USDT", Amount: "5"}},
{StepID: "crypto_observe", Rail: model.RailProviderSettlement, Action: model.RailOperationObserveConfirm, DependsOn: []string{"crypto_send"}},
{StepID: "ledger_credit", Rail: model.RailLedger, Action: model.RailOperationCredit, DependsOn: []string{"crypto_observe"}, Amount: &paymenttypes.Money{Currency: "USDT", Amount: "95"}},
{StepID: "ledger_credit", Rail: model.RailLedger, Action: model.RailOperationMove, DependsOn: []string{"crypto_observe"}, Amount: &paymenttypes.Money{Currency: "USDT", Amount: "95"}, FromRole: rolePtr(pmodel.AccountRolePending), ToRole: rolePtr(pmodel.AccountRoleOperating)},
{StepID: "card_payout", Rail: model.RailCardPayout, Action: model.RailOperationSend, DependsOn: []string{"ledger_credit"}, Amount: &paymenttypes.Money{Currency: "USDT", Amount: "95"}},
{StepID: "ledger_debit", Rail: model.RailLedger, Action: model.RailOperationDebit, DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USDT", Amount: "95"}},
{StepID: "ledger_debit", Rail: model.RailLedger, Action: model.RailOperationMove, DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USDT", Amount: "95"}, FromRole: rolePtr(pmodel.AccountRoleOperating), ToRole: rolePtr(pmodel.AccountRoleTransit)},
},
},
}
@@ -141,8 +157,8 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
if sendCalls != 2 {
t.Fatalf("expected 2 rail sends, got %d", sendCalls)
}
if debitCalls != 0 || creditCalls != 0 {
t.Fatalf("unexpected ledger calls: debit=%d credit=%d", debitCalls, creditCalls)
if moveCalls != 0 {
t.Fatalf("unexpected ledger move calls: %d", moveCalls)
}
if payoutCalls != 0 {
t.Fatalf("expected no payout before source confirmation, got %d", payoutCalls)
@@ -173,8 +189,8 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan resume error: %v", err)
}
if debitCalls != 0 || creditCalls != 1 {
t.Fatalf("expected ledger credit after source confirmation, debit=%d credit=%d", debitCalls, creditCalls)
if moveCalls != 1 {
t.Fatalf("expected one ledger move after source confirmation, got %d", moveCalls)
}
if payoutCalls != 1 {
t.Fatalf("expected card payout submitted, got %d", payoutCalls)
@@ -193,39 +209,18 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan finalize error: %v", err)
}
if debitCalls != 1 || creditCalls != 1 {
t.Fatalf("expected ledger debit after payout confirmation, debit=%d credit=%d", debitCalls, creditCalls)
if moveCalls != 2 {
t.Fatalf("expected two ledger moves after payout confirmation, got %d", moveCalls)
}
}
func TestExecutePaymentPlan_BlockThenDebitFromHold(t *testing.T) {
func TestExecutePaymentPlan_RejectsLegacyLedgerOperations(t *testing.T) {
ctx := context.Background()
store := newStubPaymentsStore()
repo := &stubRepository{store: store}
blockCalls := 0
var blockReq *ledgerv1.TransferRequest
debitCalls := 0
var debitTx rail.LedgerTx
ledgerFake := &ledgerclient.Fake{
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
blockCalls++
blockReq = req
return &ledgerv1.PostResponse{JournalEntryRef: "hold-1"}, nil
},
CreateTransactionFn: func(ctx context.Context, tx rail.LedgerTx) (string, error) {
debitCalls++
debitTx = tx
return "debit-1", nil
},
}
mntxFake := &mntxclient.Fake{
CreateCardPayoutFn: func(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
return &mntxv1.CardPayoutResponse{Payout: &mntxv1.CardPayoutState{PayoutId: "payout-1"}}, nil
},
}
ledgerFake := &ledgerclient.Fake{}
svc := &Service{
logger: zap.NewNop(),
@@ -235,21 +230,14 @@ func TestExecutePaymentPlan_BlockThenDebitFromHold(t *testing.T) {
client: ledgerFake,
internal: ledgerFake,
},
mntx: mntxDependency{client: mntxFake},
cardRoutes: map[string]CardGatewayRoute{
defaultCardGateway: {
FundingAddress: "funding-address",
FeeWalletRef: "fee-wallet",
},
},
},
}
executor := newPaymentExecutor(&svc.deps, svc.logger, svc)
payment := &model.Payment{
PaymentRef: "pay-block-1",
IdempotencyKey: "pay-block-1",
PaymentRef: "pay-legacy-1",
IdempotencyKey: "pay-legacy-1",
OrganizationBoundBase: mo.OrganizationBoundBase{
OrganizationRef: primitive.NewObjectID(),
},
@@ -263,80 +251,25 @@ func TestExecutePaymentPlan_BlockThenDebitFromHold(t *testing.T) {
},
Destination: model.PaymentEndpoint{
Type: model.EndpointTypeCard,
Card: &model.CardEndpoint{
Pan: "4111111111111111",
Cardholder: "Ada",
CardholderSurname: "Lovelace",
ExpMonth: 1,
ExpYear: 2030,
MaskedPan: "4111",
},
},
Attributes: map[string]string{
"ledger_debit_account_ref": "ledger:debit",
"ledger_block_account_ref": "ledger:block",
},
Customer: &model.Customer{
ID: "cust-1",
FirstName: "Ada",
LastName: "Lovelace",
IP: "1.2.3.4",
Card: &model.CardEndpoint{MaskedPan: "4111"},
},
},
PaymentPlan: &model.PaymentPlan{
ID: "pay-block-1",
IdempotencyKey: "pay-block-1",
ID: "pay-legacy-1",
IdempotencyKey: "pay-legacy-1",
Steps: []*model.PaymentStep{
{StepID: "ledger_block", Rail: model.RailLedger, Action: model.RailOperationBlock, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "card_payout", Rail: model.RailCardPayout, Action: model.RailOperationSend, DependsOn: []string{"ledger_block"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "ledger_debit", Rail: model.RailLedger, Action: model.RailOperationDebit, DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "ledger_release", Rail: model.RailLedger, Action: model.RailOperationRelease, DependsOn: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
},
},
}
store.payments[payment.PaymentRef] = payment
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan error: %v", err)
err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{})
if err == nil {
t.Fatal("expected legacy ledger operation error")
}
if blockCalls != 1 || blockReq == nil {
t.Fatalf("expected ledger block transfer, calls=%d", blockCalls)
}
if blockReq.GetFromLedgerAccountRef() != "ledger:debit" {
t.Fatalf("unexpected block from account: %s", blockReq.GetFromLedgerAccountRef())
}
if blockReq.GetToLedgerAccountRef() != "ledger:block" {
t.Fatalf("unexpected block to account: %s", blockReq.GetToLedgerAccountRef())
}
if debitCalls != 0 {
t.Fatalf("expected no debit before payout confirmation, got %d", debitCalls)
}
if payment.State != model.PaymentStateFundsReserved {
t.Fatalf("expected funds reserved state, got %s", payment.State)
}
steps := executionStepsByCode(payment.ExecutionPlan)
cardStep := steps["card_payout"]
if cardStep == nil {
t.Fatalf("expected card payout step in execution plan")
}
setExecutionStepStatus(cardStep, executionStepStatusConfirmed)
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan resume error: %v", err)
}
if debitCalls != 1 {
t.Fatalf("expected ledger debit after payout confirmation, got %d", debitCalls)
}
if debitTx.LedgerAccountRef != "ledger:block" {
t.Fatalf("expected debit from block account, got %s", debitTx.LedgerAccountRef)
}
if debitTx.ContraLedgerAccountRef != "" {
t.Fatalf("expected contra to be cleared after block, got %s", debitTx.ContraLedgerAccountRef)
}
if payment.State != model.PaymentStateSettled {
t.Fatalf("expected settled state, got %s", payment.State)
if !strings.Contains(err.Error(), "unsupported legacy ledger operation") {
t.Fatalf("unexpected error: %v", err)
}
}

View File

@@ -5,6 +5,7 @@ import (
"strings"
"github.com/tech/sendico/payments/orchestrator/storage/model"
pmodel "github.com/tech/sendico/pkg/model"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
)
@@ -100,6 +101,26 @@ func blockStepConfirmed(plan *model.PaymentPlan, execPlan *model.ExecutionPlan)
return false
}
func roleHintsForStep(plan *model.PaymentPlan, idx int) (*pmodel.AccountRole, *pmodel.AccountRole) {
if plan == nil || idx <= 0 {
return nil, nil
}
for i := idx - 1; i >= 0; i-- {
step := plan.Steps[i]
if step == nil {
continue
}
if step.Rail != model.RailLedger || step.Action != model.RailOperationMove {
continue
}
if step.ToRole != nil && strings.TrimSpace(string(*step.ToRole)) != "" {
role := *step.ToRole
return &role, nil
}
}
return nil, nil
}
func linkRailObservation(payment *model.Payment, rail model.Rail, referenceID, dependsOn string) {
if payment == nil || payment.PaymentPlan == nil {
return

View File

@@ -2,10 +2,13 @@ package orchestrator
import (
"context"
"fmt"
"strings"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/ledgerconv"
"github.com/tech/sendico/pkg/merrors"
pmodel "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/payments/rail"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
@@ -15,7 +18,7 @@ import (
"go.uber.org/zap"
)
func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) {
func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
@@ -24,7 +27,7 @@ func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Pa
p.logger.Error("Ledger client unavailable", zap.String("action", "debit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
tx, err := p.ledgerTxForAction(ctx, payment, amount, charges, idempotencyKey, idx, model.RailOperationDebit, quote)
tx, err := p.ledgerTxForAction(ctx, payment, amount, charges, idempotencyKey, idx, action, quote)
if err != nil {
p.logger.Warn("Ledger debit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
@@ -34,10 +37,15 @@ func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Pa
p.logger.Warn("Ledger debit failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
}
p.logger.Info("Ledger debit posted",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("action", string(action)),
zap.String("entry_ref", strings.TrimSpace(ref)))
return ref, nil
}
func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) {
func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
@@ -46,7 +54,7 @@ func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.P
p.logger.Error("Ledger client unavailable", zap.String("action", "credit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
tx, err := p.ledgerTxForAction(ctx, payment, amount, nil, idempotencyKey, idx, model.RailOperationCredit, quote)
tx, err := p.ledgerTxForAction(ctx, payment, amount, nil, idempotencyKey, idx, action, quote)
if err != nil {
p.logger.Warn("Ledger credit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
@@ -56,9 +64,86 @@ func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.P
p.logger.Warn("Ledger credit failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
}
p.logger.Info("Ledger credit posted",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("action", string(action)),
zap.String("entry_ref", strings.TrimSpace(ref)))
return ref, nil
}
func (p *paymentExecutor) postLedgerMove(ctx context.Context, payment *model.Payment, step *model.PaymentStep, amount *moneyv1.Money, idempotencyKey string, idx int) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
}
if p.deps.ledger.internal == nil {
p.logger.Error("Ledger client unavailable", zap.String("action", "move"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
if payment == nil {
return "", merrors.InvalidArgument("ledger: payment is required")
}
if payment.OrganizationRef == primitive.NilObjectID {
return "", merrors.InvalidArgument("ledger: organization_ref is required")
}
if step == nil {
return "", merrors.InvalidArgument("ledger: step is required")
}
if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" {
return "", merrors.InvalidArgument("ledger: amount is required")
}
fromRole, toRole, err := ledgerMoveRoles(step)
if err != nil {
return "", err
}
currency := strings.TrimSpace(amount.GetCurrency())
fromAccount, err := p.resolveAccount(ctx, payment.OrganizationRef, currency, model.RailLedger, fromRole)
if err != nil {
return "", err
}
toAccount, err := p.resolveAccount(ctx, payment.OrganizationRef, currency, model.RailLedger, toRole)
if err != nil {
return "", err
}
resp, err := p.deps.ledger.internal.TransferInternal(ctx, &ledgerv1.TransferRequest{
IdempotencyKey: strings.TrimSpace(idempotencyKey),
OrganizationRef: payment.OrganizationRef.Hex(),
FromLedgerAccountRef: strings.TrimSpace(fromAccount),
ToLedgerAccountRef: strings.TrimSpace(toAccount),
Money: cloneProtoMoney(amount),
Description: paymentDescription(payment),
Metadata: cloneMetadata(payment.Metadata),
FromRole: ledgerRoleFromAccountRole(fromRole),
ToRole: ledgerRoleFromAccountRole(toRole),
})
if err != nil {
p.logger.Warn("Ledger move failed",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("from_role", string(fromRole)),
zap.String("to_role", string(toRole)),
zap.String("from_account", strings.TrimSpace(fromAccount)),
zap.String("to_account", strings.TrimSpace(toAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", currency),
zap.Error(err))
return "", err
}
entryRef := strings.TrimSpace(resp.GetJournalEntryRef())
p.logger.Info("Ledger move posted",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("entry_ref", entryRef),
zap.String("from_role", string(fromRole)),
zap.String("to_role", string(toRole)),
zap.String("from_account", strings.TrimSpace(fromAccount)),
zap.String("to_account", strings.TrimSpace(toAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", currency))
return entryRef, nil
}
func (p *paymentExecutor) postLedgerBlock(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int) (string, error) {
paymentRef := ""
if payment != nil {
@@ -200,9 +285,10 @@ func (p *paymentExecutor) ledgerTxForAction(ctx context.Context, payment *model.
accountRef := ""
contraRef := ""
externalRef := ""
operation := ""
switch action {
case model.RailOperationDebit:
case model.RailOperationDebit, model.RailOperationExternalDebit:
fromRail = model.RailLedger
toRail = ledgerStepToRail(payment.PaymentPlan, idx, destRail)
accountRef, contraRef, err = ledgerDebitAccount(payment)
@@ -215,7 +301,10 @@ func (p *paymentExecutor) ledgerTxForAction(ctx context.Context, payment *model.
contraRef = ""
}
}
case model.RailOperationCredit:
if action == model.RailOperationExternalDebit {
operation = "external.debit"
}
case model.RailOperationCredit, model.RailOperationExternalCredit:
fromRail = ledgerStepFromRail(payment.PaymentPlan, idx, sourceRail)
toRail = model.RailLedger
accountRef, contraRef, err = ledgerCreditAccount(payment)
@@ -223,19 +312,24 @@ func (p *paymentExecutor) ledgerTxForAction(ctx context.Context, payment *model.
accountRef, contraRef, err = p.resolveLedgerAccountRef(ctx, payment, amount, action)
}
externalRef = ledgerExternalReference(payment.ExecutionPlan, idx)
if action == model.RailOperationExternalCredit {
operation = "external.credit"
}
default:
return rail.LedgerTx{}, merrors.InvalidArgument("ledger: unsupported action")
}
if err != nil {
return rail.LedgerTx{}, err
}
if action == model.RailOperationCredit && strings.TrimSpace(accountRef) != "" {
isDebit := action == model.RailOperationDebit || action == model.RailOperationExternalDebit
isCredit := action == model.RailOperationCredit || action == model.RailOperationExternalCredit
if isCredit && strings.TrimSpace(accountRef) != "" {
setLedgerAccountAttributes(payment, accountRef)
}
if action == model.RailOperationDebit && toRail == model.RailLedger {
if isDebit && toRail == model.RailLedger {
toRail = model.RailUnspecified
}
if action == model.RailOperationCredit && fromRail == model.RailLedger {
if isCredit && fromRail == model.RailLedger {
fromRail = model.RailUnspecified
}
@@ -245,7 +339,7 @@ func (p *paymentExecutor) ledgerTxForAction(ctx context.Context, payment *model.
}
feeAmount := ""
if action == model.RailOperationDebit {
if isDebit {
if feeMoney := resolveFeeAmount(payment, quote); feeMoney != nil {
feeAmount = strings.TrimSpace(feeMoney.GetAmount())
}
@@ -264,6 +358,7 @@ func (p *paymentExecutor) ledgerTxForAction(ctx context.Context, payment *model.
FromRail: ledgerRailValue(fromRail),
ToRail: ledgerRailValue(toRail),
ExternalReferenceID: externalRef,
Operation: operation,
FXRateUsed: fxRate,
IdempotencyKey: strings.TrimSpace(idempotencyKey),
CreatedAt: planTimestamp(payment),
@@ -331,6 +426,93 @@ func ledgerExternalReference(plan *model.ExecutionPlan, idx int) string {
return ""
}
func ledgerMoveRoles(step *model.PaymentStep) (pmodel.AccountRole, pmodel.AccountRole, error) {
if step == nil {
return "", "", merrors.InvalidArgument("ledger: step is required")
}
if step.FromRole == nil || strings.TrimSpace(string(*step.FromRole)) == "" {
return "", "", merrors.InvalidArgument("ledger: from_role is required")
}
if step.ToRole == nil || strings.TrimSpace(string(*step.ToRole)) == "" {
return "", "", merrors.InvalidArgument("ledger: to_role is required")
}
from := strings.ToLower(strings.TrimSpace(string(*step.FromRole)))
to := strings.ToLower(strings.TrimSpace(string(*step.ToRole)))
if from == "" || to == "" || strings.EqualFold(from, to) {
return "", "", merrors.InvalidArgument("ledger: from_role and to_role must differ")
}
return pmodel.AccountRole(from), pmodel.AccountRole(to), nil
}
func ledgerRoleFromAccountRole(role pmodel.AccountRole) ledgerv1.AccountRole {
if strings.TrimSpace(string(role)) == "" {
return ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED
}
if parsed, ok := ledgerconv.ParseAccountRole(string(role)); ok {
return parsed
}
return ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED
}
func (p *paymentExecutor) resolveAccount(ctx context.Context, orgRef primitive.ObjectID, asset string, rail model.Rail, role pmodel.AccountRole) (string, error) {
switch rail {
case model.RailLedger:
return p.resolveLedgerAccountByRole(ctx, orgRef, asset, role)
default:
return "", nil
}
}
func (p *paymentExecutor) resolveLedgerAccountByRole(ctx context.Context, orgRef primitive.ObjectID, asset string, role pmodel.AccountRole) (string, error) {
if p == nil || p.deps == nil || p.deps.ledger.client == nil {
return "", merrors.Internal("ledger_client_unavailable")
}
if orgRef == primitive.NilObjectID {
return "", merrors.InvalidArgument("ledger: organization_ref is required")
}
currency := strings.TrimSpace(asset)
if currency == "" {
return "", merrors.InvalidArgument("ledger: asset is required")
}
if strings.TrimSpace(string(role)) == "" {
return "", merrors.InvalidArgument("ledger: role is required")
}
resp, err := p.deps.ledger.client.ListConnectorAccounts(ctx, &connectorv1.ListAccountsRequest{
OrganizationRef: orgRef.Hex(),
Kind: connectorv1.AccountKind_LEDGER_ACCOUNT,
Asset: currency,
})
if err != nil {
return "", err
}
expectedRole := strings.ToLower(strings.TrimSpace(string(role)))
for _, account := range resp.GetAccounts() {
if account == nil {
continue
}
if account.GetKind() != connectorv1.AccountKind_LEDGER_ACCOUNT {
continue
}
if asset := strings.TrimSpace(account.GetAsset()); asset == "" || !strings.EqualFold(asset, currency) {
continue
}
if strings.TrimSpace(account.GetOwnerRef()) != "" {
continue
}
accRole := strings.ToLower(strings.TrimSpace(string(connectorAccountRole(account))))
if accRole == "" || !strings.EqualFold(accRole, expectedRole) {
continue
}
if ref := account.GetRef(); ref != nil {
if accountID := strings.TrimSpace(ref.GetAccountId()); accountID != "" {
return accountID, nil
}
}
}
return "", merrors.InvalidArgument("ledger: account role not found")
}
func (p *paymentExecutor) resolveLedgerAccountRef(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, action model.RailOperation) (string, string, error) {
if payment == nil {
return "", "", merrors.InvalidArgument("ledger: payment is required")
@@ -339,12 +521,12 @@ func (p *paymentExecutor) resolveLedgerAccountRef(ctx context.Context, payment *
return "", "", merrors.InvalidArgument("ledger: amount is required")
}
switch action {
case model.RailOperationCredit:
case model.RailOperationCredit, model.RailOperationExternalCredit:
if account, _, err := ledgerDebitAccount(payment); err == nil && strings.TrimSpace(account) != "" {
setLedgerAccountAttributes(payment, account)
return account, "", nil
}
case model.RailOperationDebit:
case model.RailOperationDebit, model.RailOperationExternalDebit:
if account, _, err := ledgerCreditAccount(payment); err == nil && strings.TrimSpace(account) != "" {
setLedgerAccountAttributes(payment, account)
return account, "", nil
@@ -408,22 +590,30 @@ func (p *paymentExecutor) resolveOrgOwnedLedgerAccount(ctx context.Context, paym
}
func connectorAccountIsSettlement(account *connectorv1.Account) bool {
return connectorAccountRole(account) == pmodel.AccountRoleSettlement
}
func connectorAccountRole(account *connectorv1.Account) pmodel.AccountRole {
if account == nil || account.GetProviderDetails() == nil {
return false
return ""
}
details := account.GetProviderDetails().AsMap()
val, ok := details["is_settlement"]
if !ok {
return false
if value := strings.TrimSpace(fmt.Sprint(details["role"])); value != "" {
if role, ok := pmodel.Parse(value); ok {
return role
}
}
switch v := val.(type) {
switch v := details["is_settlement"].(type) {
case bool:
return v
if v {
return pmodel.AccountRoleSettlement
}
case string:
return strings.EqualFold(strings.TrimSpace(v), "true")
default:
return false
if strings.EqualFold(strings.TrimSpace(v), "true") {
return pmodel.AccountRoleSettlement
}
}
return ""
}
func setLedgerAccountAttributes(payment *model.Payment, accountRef string) {

View File

@@ -6,46 +6,61 @@ import (
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/payments/rail"
pmodel "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
)
func TestLedgerAccountResolution_UsesOrgOwnedAccount(t *testing.T) {
func TestLedgerAccountResolution_UsesRoleAccounts(t *testing.T) {
ctx := context.Background()
accountID := "ledger:org:usd"
fromAccountID := "ledger:operating:usd"
toAccountID := "ledger:transit:usd"
providerDetails, err := structpb.NewStruct(map[string]interface{}{
"is_settlement": false,
operatingDetails, err := structpb.NewStruct(map[string]interface{}{
"role": "ACCOUNT_ROLE_OPERATING",
})
if err != nil {
t.Fatalf("provider details build error: %v", err)
}
transitDetails, err := structpb.NewStruct(map[string]interface{}{
"role": "ACCOUNT_ROLE_TRANSIT",
})
if err != nil {
t.Fatalf("provider details build error: %v", err)
}
listCalls := 0
ledgerAccountRefs := make([]string, 0, 2)
var transferReq *ledgerv1.TransferRequest
ledgerFake := &ledgerclient.Fake{
ListConnectorAccountsFn: func(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
listCalls++
return &connectorv1.ListAccountsResponse{
Accounts: []*connectorv1.Account{
{
Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: accountID},
Kind: connectorv1.AccountKind_LEDGER_ACCOUNT,
Asset: "USD",
OwnerRef: "",
ProviderDetails: providerDetails,
Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: fromAccountID},
Kind: connectorv1.AccountKind_LEDGER_ACCOUNT,
Asset: "USD",
OwnerRef: "",
ProviderDetails: operatingDetails,
},
{
Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: toAccountID},
Kind: connectorv1.AccountKind_LEDGER_ACCOUNT,
Asset: "USD",
OwnerRef: "",
ProviderDetails: transitDetails,
},
},
}, nil
},
CreateTransactionFn: func(ctx context.Context, tx rail.LedgerTx) (string, error) {
ledgerAccountRefs = append(ledgerAccountRefs, tx.LedgerAccountRef)
return "entry-1", nil
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
transferReq = req
return &ledgerv1.PostResponse{JournalEntryRef: "entry-1"}, nil
},
}
@@ -71,8 +86,7 @@ func TestLedgerAccountResolution_UsesOrgOwnedAccount(t *testing.T) {
ID: "pay-1",
IdempotencyKey: "pay-1",
Steps: []*model.PaymentStep{
{StepID: "ledger_debit", Rail: model.RailLedger, Action: model.RailOperationDebit, Amount: cloneMoney(amount)},
{StepID: "ledger_credit", Rail: model.RailLedger, Action: model.RailOperationCredit, DependsOn: []string{"ledger_debit"}, Amount: cloneMoney(amount)},
{StepID: "ledger_move", Rail: model.RailLedger, Action: model.RailOperationMove, Amount: cloneMoney(amount), FromRole: rolePtr(pmodel.AccountRoleOperating), ToRole: rolePtr(pmodel.AccountRoleTransit)},
},
},
}
@@ -87,10 +101,13 @@ func TestLedgerAccountResolution_UsesOrgOwnedAccount(t *testing.T) {
if listCalls == 0 {
t.Fatalf("expected ledger accounts lookup")
}
if len(ledgerAccountRefs) != 2 {
t.Fatalf("expected two ledger transactions, got %d", len(ledgerAccountRefs))
if transferReq == nil {
t.Fatalf("expected ledger transfer")
}
if ledgerAccountRefs[0] != accountID || ledgerAccountRefs[1] != accountID {
t.Fatalf("unexpected ledger account refs: %+v", ledgerAccountRefs)
if transferReq.GetFromLedgerAccountRef() != fromAccountID {
t.Fatalf("expected from account %s, got %s", fromAccountID, transferReq.GetFromLedgerAccountRef())
}
if transferReq.GetToLedgerAccountRef() != toAccountID {
t.Fatalf("expected to account %s, got %s", toAccountID, transferReq.GetToLedgerAccountRef())
}
}

View File

@@ -137,14 +137,116 @@ func stepDependenciesReady(step *model.PaymentStep, execSteps map[string]*model.
}
}
if step.CommitPolicy != model.CommitPolicyAfterSuccess {
// Handle commit policies
switch step.CommitPolicy {
case model.CommitPolicyImmediate, model.CommitPolicyUnspecified:
// Execute immediately once dependencies are satisfied
return true, false, nil
case model.CommitPolicyAfterSuccess:
// Wait for commitAfter dependencies to succeed (confirmed/skipped)
commitAfter := step.CommitAfter
if len(commitAfter) == 0 {
commitAfter = step.DependsOn
}
for _, dep := range commitAfter {
key := strings.TrimSpace(dep)
if key == "" {
continue
}
execStep := execSteps[key]
if execStep == nil {
return false, false, merrors.InvalidArgument("payment plan: commit dependency missing")
}
status := executionStepStatus(execStep)
switch status {
case executionStepStatusFailed, executionStepStatusCancelled:
return false, true, nil
case executionStepStatusConfirmed, executionStepStatusSkipped:
continue
default:
return false, false, nil
}
}
return true, false, nil
case model.CommitPolicyAfterFailure:
// Wait for commitAfter dependencies to fail
commitAfter := step.CommitAfter
if len(commitAfter) == 0 {
commitAfter = step.DependsOn
}
for _, dep := range commitAfter {
key := strings.TrimSpace(dep)
if key == "" {
continue
}
execStep := execSteps[key]
if execStep == nil {
return false, false, merrors.InvalidArgument("payment plan: commit dependency missing")
}
status := executionStepStatus(execStep)
switch status {
case executionStepStatusFailed:
// Dependency failed - this is what we're waiting for
continue
case executionStepStatusCancelled:
// If cancelled, also block this step
return false, true, nil
case executionStepStatusConfirmed, executionStepStatusSkipped:
// Dependency succeeded - can't proceed with AFTER_FAILURE
return false, true, nil
default:
// Still waiting for failure
return false, false, nil
}
}
return true, false, nil
case model.CommitPolicyAfterCanceled:
// Wait for commitAfter dependencies to reach any terminal state (confirmed, failed, cancelled, skipped)
commitAfter := step.CommitAfter
if len(commitAfter) == 0 {
commitAfter = step.DependsOn
}
for _, dep := range commitAfter {
key := strings.TrimSpace(dep)
if key == "" {
continue
}
execStep := execSteps[key]
if execStep == nil {
return false, false, merrors.InvalidArgument("payment plan: commit dependency missing")
}
status := executionStepStatus(execStep)
switch status {
case executionStepStatusConfirmed, executionStepStatusFailed, executionStepStatusCancelled, executionStepStatusSkipped:
// Dependency reached terminal state
continue
default:
// Still waiting for terminal state
return false, false, nil
}
}
return true, false, nil
default:
// Unknown policy - treat as immediate
return true, false, nil
}
}
func commitAfterDependenciesSucceeded(step *model.PaymentStep, execSteps map[string]*model.ExecutionStep) bool {
if step == nil {
return false
}
commitAfter := step.CommitAfter
if len(commitAfter) == 0 {
commitAfter = step.DependsOn
}
if len(commitAfter) == 0 {
return false
}
for _, dep := range commitAfter {
key := strings.TrimSpace(dep)
if key == "" {
@@ -152,19 +254,17 @@ func stepDependenciesReady(step *model.PaymentStep, execSteps map[string]*model.
}
execStep := execSteps[key]
if execStep == nil {
return false, false, merrors.InvalidArgument("payment plan: commit dependency missing")
return false
}
status := executionStepStatus(execStep)
switch status {
case executionStepStatusFailed, executionStepStatusCancelled:
return false, true, nil
case executionStepStatusConfirmed, executionStepStatusSkipped:
continue
default:
return false, false, nil
return false
}
}
return true, false, nil
return true
}
func cardPayoutDependenciesConfirmed(plan *model.PaymentPlan, execPlan *model.ExecutionPlan) bool {

View File

@@ -2,30 +2,24 @@ package orchestrator
import (
"context"
"strings"
"testing"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
mo "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
)
func TestReleasePaymentHold_TransfersFromHoldAccount(t *testing.T) {
func TestReleasePaymentHold_RejectsLegacyLedgerRelease(t *testing.T) {
ctx := context.Background()
store := newStubPaymentsStore()
repo := &stubRepository{store: store}
var releaseReq *ledgerv1.TransferRequest
ledgerFake := &ledgerclient.Fake{
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
releaseReq = req
return &ledgerv1.PostResponse{JournalEntryRef: "release-1"}, nil
},
}
ledgerFake := &ledgerclient.Fake{}
svc := &Service{
logger: zap.NewNop(),
@@ -79,29 +73,11 @@ func TestReleasePaymentHold_TransfersFromHoldAccount(t *testing.T) {
}
setExecutionStepStatus(blockStep, executionStepStatusConfirmed)
if err := executor.releasePaymentHold(ctx, store, payment); err != nil {
t.Fatalf("releasePaymentHold error: %v", err)
err := executor.releasePaymentHold(ctx, store, payment)
if err == nil {
t.Fatal("expected legacy ledger operation error")
}
if releaseReq == nil {
t.Fatalf("expected ledger release transfer")
}
if releaseReq.GetFromLedgerAccountRef() != "ledger:block" {
t.Fatalf("unexpected release from account: %s", releaseReq.GetFromLedgerAccountRef())
}
if releaseReq.GetToLedgerAccountRef() != "ledger:debit" {
t.Fatalf("unexpected release to account: %s", releaseReq.GetToLedgerAccountRef())
}
steps = executionStepsByCode(payment.ExecutionPlan)
releaseStep := steps["ledger_release"]
if releaseStep == nil {
t.Fatalf("expected release step in execution plan")
}
if executionStepStatus(releaseStep) != executionStepStatusConfirmed {
t.Fatalf("expected release step confirmed, got %s", executionStepStatus(releaseStep))
}
if releaseStep.TransferRef != "release-1" {
t.Fatalf("expected release transfer ref set, got %s", releaseStep.TransferRef)
if !strings.Contains(err.Error(), "unsupported legacy ledger operation") {
t.Fatalf("unexpected error: %v", err)
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/tech/sendico/pkg/merrors"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.uber.org/zap"
)
func (p *paymentExecutor) executePlanStep(ctx context.Context, payment *model.Payment, step *model.PaymentStep, execStep *model.ExecutionStep, quote *orchestratorv1.PaymentQuote, charges []*ledgerv1.PostingLine, idx int) (bool, error) {
@@ -15,25 +16,45 @@ func (p *paymentExecutor) executePlanStep(ctx context.Context, payment *model.Pa
return false, merrors.InvalidArgument("payment plan: step is required")
}
if step.Rail == model.RailLedger {
switch step.Action {
case model.RailOperationBlock, model.RailOperationRelease:
p.logger.Warn("Legacy operation detected", zap.String("action", string(step.Action)))
return false, merrors.InvalidArgument("unsupported legacy ledger operation, use ledger.move with roles")
}
}
switch step.Action {
case model.RailOperationDebit:
case model.RailOperationMove:
amount, err := requireMoney(cloneMoney(step.Amount), "ledger move amount")
if err != nil {
return false, err
}
ref, err := p.postLedgerMove(ctx, payment, step, protoMoney(amount), planStepIdempotencyKey(payment, idx, step), idx)
if err != nil {
return false, err
}
execStep.TransferRef = strings.TrimSpace(ref)
setExecutionStepStatus(execStep, executionStepStatusConfirmed)
return false, nil
case model.RailOperationDebit, model.RailOperationExternalDebit:
amount, err := requireMoney(cloneMoney(step.Amount), "ledger debit amount")
if err != nil {
return false, err
}
ref, err := p.postLedgerDebit(ctx, payment, protoMoney(amount), charges, planStepIdempotencyKey(payment, idx, step), idx, quote)
ref, err := p.postLedgerDebit(ctx, payment, protoMoney(amount), charges, planStepIdempotencyKey(payment, idx, step), idx, step.Action, quote)
if err != nil {
return false, err
}
ensureExecutionRefs(payment).DebitEntryRef = ref
setExecutionStepStatus(execStep, executionStepStatusConfirmed)
return false, nil
case model.RailOperationCredit:
case model.RailOperationCredit, model.RailOperationExternalCredit:
amount, err := requireMoney(cloneMoney(step.Amount), "ledger credit amount")
if err != nil {
return false, err
}
ref, err := p.postLedgerCredit(ctx, payment, protoMoney(amount), planStepIdempotencyKey(payment, idx, step), idx, quote)
ref, err := p.postLedgerCredit(ctx, payment, protoMoney(amount), planStepIdempotencyKey(payment, idx, step), idx, step.Action, quote)
if err != nil {
return false, err
}
@@ -98,7 +119,8 @@ func (p *paymentExecutor) executeSendStep(ctx context.Context, payment *model.Pa
if !p.deps.railGateways.available() {
return false, merrors.Internal("rail gateway unavailable")
}
req, err := p.buildCryptoTransferRequest(payment, amount, model.RailOperationSend, planStepIdempotencyKey(payment, idx, step), quote)
fromRole, toRole := roleHintsForStep(payment.PaymentPlan, idx)
req, err := p.buildCryptoTransferRequest(payment, amount, model.RailOperationSend, planStepIdempotencyKey(payment, idx, step), quote, fromRole, toRole)
if err != nil {
return false, err
}
@@ -126,7 +148,8 @@ func (p *paymentExecutor) executeSendStep(ctx context.Context, payment *model.Pa
if err != nil {
return false, err
}
ref, err := p.submitCardPayoutPlan(ctx, payment, protoMoney(amount))
fromRole, toRole := roleHintsForStep(payment.PaymentPlan, idx)
ref, err := p.submitCardPayoutPlan(ctx, payment, protoMoney(amount), fromRole, toRole)
if err != nil {
return false, err
}
@@ -142,7 +165,8 @@ func (p *paymentExecutor) executeSendStep(ctx context.Context, payment *model.Pa
if !p.deps.railGateways.available() {
return false, merrors.Internal("rail gateway unavailable")
}
req, err := p.buildProviderSettlementTransferRequest(payment, step, amount, quote, idx)
fromRole, toRole := roleHintsForStep(payment.PaymentPlan, idx)
req, err := p.buildProviderSettlementTransferRequest(payment, step, amount, quote, idx, fromRole, toRole)
if err != nil {
return false, err
}
@@ -178,7 +202,8 @@ func (p *paymentExecutor) executeFeeStep(ctx context.Context, payment *model.Pay
if !p.deps.railGateways.available() {
return false, merrors.Internal("rail gateway unavailable")
}
req, err := p.buildCryptoTransferRequest(payment, amount, model.RailOperationFee, planStepIdempotencyKey(payment, idx, step), nil)
fromRole, toRole := roleHintsForStep(payment.PaymentPlan, idx)
req, err := p.buildCryptoTransferRequest(payment, amount, model.RailOperationFee, planStepIdempotencyKey(payment, idx, step), nil, fromRole, toRole)
if err != nil {
return false, err
}

View File

@@ -5,10 +5,21 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mutil/mzap"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.uber.org/zap"
)
type defaultPlanBuilder struct{}
type defaultPlanBuilder struct {
logger mlogger.Logger
}
func newDefaultPlanBuilder(logger mlogger.Logger) *defaultPlanBuilder {
return &defaultPlanBuilder{
logger: logger.Named("plan_builder"),
}
}
func (b *defaultPlanBuilder) Build(ctx context.Context, payment *model.Payment, quote *orchestratorv1.PaymentQuote, routes RouteStore, templates PlanTemplateStore, gateways GatewayRegistry) (*model.PaymentPlan, error) {
if payment == nil {
@@ -21,43 +32,71 @@ func (b *defaultPlanBuilder) Build(ctx context.Context, payment *model.Payment,
return nil, merrors.InvalidArgument("plan builder: plan templates store is required")
}
logger := b.logger.With(
zap.String("payment_ref", payment.PaymentRef),
zap.String("payment_kind", string(payment.Intent.Kind)),
)
logger.Debug("Building payment plan")
intent := payment.Intent
if intent.Kind == model.PaymentKindFXConversion {
return buildFXConversionPlan(payment)
logger.Debug("Building fx conversion plan")
plan, err := buildFXConversionPlan(payment)
if err != nil {
logger.Warn("Failed to build fx conversion plan", zap.Error(err))
return nil, err
}
logger.Info("fx conversion plan built", zap.Int("steps", len(plan.Steps)))
return plan, nil
}
sourceRail, sourceNetwork, err := railFromEndpoint(intent.Source, intent.Attributes, true)
if err != nil {
logger.Warn("Failed to resolve source rail", zap.Error(err))
return nil, err
}
destRail, destNetwork, err := railFromEndpoint(intent.Destination, intent.Attributes, false)
if err != nil {
logger.Warn("Failed to resolve destination rail", zap.Error(err))
return nil, err
}
logger = logger.With(
zap.String("source_rail", string(sourceRail)),
zap.String("dest_rail", string(destRail)),
zap.String("source_network", sourceNetwork),
zap.String("dest_network", destNetwork),
)
if sourceRail == model.RailUnspecified || destRail == model.RailUnspecified {
logger.Warn("Source and destination rails are required")
return nil, merrors.InvalidArgument("plan builder: source and destination rails are required")
}
if sourceRail == destRail {
if sourceRail == model.RailLedger {
return buildLedgerTransferPlan(payment)
}
if sourceRail == destRail && sourceRail != model.RailLedger {
logger.Warn("Unsupported same-rail payment")
return nil, merrors.InvalidArgument("plan builder: unsupported same-rail payment")
}
network, err := resolveRouteNetwork(intent.Attributes, sourceNetwork, destNetwork)
if err != nil {
logger.Warn("Failed to resolve route network", zap.Error(err))
return nil, err
}
logger = logger.With(zap.String("network", network))
if _, err := selectRoute(ctx, routes, sourceRail, destRail, network); err != nil {
return nil, err
}
template, err := selectPlanTemplate(ctx, templates, sourceRail, destRail, network)
route, err := selectRoute(ctx, routes, sourceRail, destRail, network)
if err != nil {
logger.Warn("Failed to select route", zap.Error(err))
return nil, err
}
logger.Debug("Route selected", mzap.StorableRef(route))
template, err := selectPlanTemplate(ctx, logger, templates, sourceRail, destRail, network)
if err != nil {
logger.Warn("Failed to select plan template", zap.Error(err))
return nil, err
}
logger.Debug("Plan template selected", mzap.StorableRef(template))
return b.buildPlanFromTemplate(ctx, payment, quote, template, sourceRail, destRail, sourceNetwork, destNetwork, gateways)
}

View File

@@ -6,6 +6,8 @@ import (
"testing"
"github.com/tech/sendico/payments/orchestrator/storage/model"
pmodel "github.com/tech/sendico/pkg/model"
mloggerfactory "github.com/tech/sendico/pkg/mlogger/factory"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
@@ -13,7 +15,7 @@ import (
func TestDefaultPlanBuilder_BuildsPlanFromRoutes_CryptoToCard(t *testing.T) {
ctx := context.Background()
builder := &defaultPlanBuilder{}
builder := newDefaultPlanBuilder(mloggerfactory.NewLogger(false))
payment := &model.Payment{
PaymentRef: "pay-1",
@@ -65,9 +67,9 @@ func TestDefaultPlanBuilder_BuildsPlanFromRoutes_CryptoToCard(t *testing.T) {
{StepID: "crypto_send", Rail: model.RailCrypto, Operation: "payout.crypto"},
{StepID: "crypto_fee", Rail: model.RailCrypto, Operation: "fee.send", DependsOn: []string{"crypto_send"}},
{StepID: "crypto_observe", Rail: model.RailCrypto, Operation: "observe.confirm", DependsOn: []string{"crypto_send"}},
{StepID: "ledger_credit", Rail: model.RailLedger, Operation: "ledger.credit", DependsOn: []string{"crypto_observe"}},
{StepID: "ledger_credit", Rail: model.RailLedger, Operation: "ledger.move", DependsOn: []string{"crypto_observe"}, FromRole: rolePtr(pmodel.AccountRolePending), ToRole: rolePtr(pmodel.AccountRoleOperating)},
{StepID: "card_payout", Rail: model.RailCardPayout, Operation: "payout.card", DependsOn: []string{"ledger_credit"}},
{StepID: "ledger_debit", Rail: model.RailLedger, Operation: "ledger.debit", DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}},
{StepID: "ledger_debit", Rail: model.RailLedger, Operation: "ledger.move", DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}, FromRole: rolePtr(pmodel.AccountRoleOperating), ToRole: rolePtr(pmodel.AccountRoleTransit)},
},
},
},
@@ -128,14 +130,14 @@ func TestDefaultPlanBuilder_BuildsPlanFromRoutes_CryptoToCard(t *testing.T) {
assertPlanStep(t, plan.Steps[0], "crypto_send", model.RailCrypto, model.RailOperationSend, "crypto-tron", "crypto-tron-1", "USDT", "95")
assertPlanStep(t, plan.Steps[1], "crypto_fee", model.RailCrypto, model.RailOperationFee, "crypto-tron", "crypto-tron-1", "USDT", "5")
assertPlanStep(t, plan.Steps[2], "crypto_observe", model.RailCrypto, model.RailOperationObserveConfirm, "crypto-tron", "crypto-tron-1", "", "")
assertPlanStep(t, plan.Steps[3], "ledger_credit", model.RailLedger, model.RailOperationCredit, "", "", "USDT", "95")
assertPlanStep(t, plan.Steps[3], "ledger_credit", model.RailLedger, model.RailOperationMove, "", "", "USDT", "95")
assertPlanStep(t, plan.Steps[4], "card_payout", model.RailCardPayout, model.RailOperationSend, "card", "card-1", "USDT", "95")
assertPlanStep(t, plan.Steps[5], "ledger_debit", model.RailLedger, model.RailOperationDebit, "", "", "USDT", "95")
assertPlanStep(t, plan.Steps[5], "ledger_debit", model.RailLedger, model.RailOperationMove, "", "", "USDT", "95")
}
func TestDefaultPlanBuilder_ErrorsWhenRouteMissing(t *testing.T) {
ctx := context.Background()
builder := &defaultPlanBuilder{}
builder := newDefaultPlanBuilder(mloggerfactory.NewLogger(false))
payment := &model.Payment{
PaymentRef: "pay-1",
@@ -169,7 +171,7 @@ func TestDefaultPlanBuilder_ErrorsWhenRouteMissing(t *testing.T) {
func TestBuildPlanFromTemplate_ProviderSettlementUsesNetAmountWhenFixReceived(t *testing.T) {
ctx := context.Background()
builder := &defaultPlanBuilder{}
builder := newDefaultPlanBuilder(mloggerfactory.NewLogger(false))
payment := &model.Payment{
PaymentRef: "pay-settle-1",
@@ -232,7 +234,7 @@ func TestBuildPlanFromTemplate_ProviderSettlementUsesNetAmountWhenFixReceived(t
func TestDefaultPlanBuilder_UsesSourceCurrencyForCryptoSendWithFX(t *testing.T) {
ctx := context.Background()
builder := &defaultPlanBuilder{}
builder := newDefaultPlanBuilder(mloggerfactory.NewLogger(false))
payment := &model.Payment{
PaymentRef: "pay-2",
@@ -288,9 +290,9 @@ func TestDefaultPlanBuilder_UsesSourceCurrencyForCryptoSendWithFX(t *testing.T)
{StepID: "crypto_send", Rail: model.RailCrypto, Operation: "payout.crypto"},
{StepID: "crypto_fee", Rail: model.RailCrypto, Operation: "fee.send", DependsOn: []string{"crypto_send"}},
{StepID: "crypto_observe", Rail: model.RailCrypto, Operation: "observe.confirm", DependsOn: []string{"crypto_send"}},
{StepID: "ledger_credit", Rail: model.RailLedger, Operation: "ledger.credit", DependsOn: []string{"crypto_observe"}},
{StepID: "ledger_credit", Rail: model.RailLedger, Operation: "ledger.move", DependsOn: []string{"crypto_observe"}, FromRole: rolePtr(pmodel.AccountRolePending), ToRole: rolePtr(pmodel.AccountRoleOperating)},
{StepID: "card_payout", Rail: model.RailCardPayout, Operation: "payout.card", DependsOn: []string{"ledger_credit"}},
{StepID: "ledger_debit", Rail: model.RailLedger, Operation: "ledger.debit", DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}},
{StepID: "ledger_debit", Rail: model.RailLedger, Operation: "ledger.move", DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}, FromRole: rolePtr(pmodel.AccountRoleOperating), ToRole: rolePtr(pmodel.AccountRoleTransit)},
},
},
},
@@ -340,9 +342,9 @@ func TestDefaultPlanBuilder_UsesSourceCurrencyForCryptoSendWithFX(t *testing.T)
assertPlanStep(t, plan.Steps[0], "crypto_send", model.RailCrypto, model.RailOperationSend, "crypto-tron", "crypto-tron-2", "USDT", "1.4")
assertPlanStep(t, plan.Steps[1], "crypto_fee", model.RailCrypto, model.RailOperationFee, "crypto-tron", "crypto-tron-2", "USDT", "0.098")
assertPlanStep(t, plan.Steps[2], "crypto_observe", model.RailCrypto, model.RailOperationObserveConfirm, "crypto-tron", "crypto-tron-2", "", "")
assertPlanStep(t, plan.Steps[3], "ledger_credit", model.RailLedger, model.RailOperationCredit, "", "", "RUB", "108.99")
assertPlanStep(t, plan.Steps[3], "ledger_credit", model.RailLedger, model.RailOperationMove, "", "", "RUB", "108.99")
assertPlanStep(t, plan.Steps[4], "card_payout", model.RailCardPayout, model.RailOperationSend, "card", "card-2", "RUB", "108.99")
assertPlanStep(t, plan.Steps[5], "ledger_debit", model.RailLedger, model.RailOperationDebit, "", "", "RUB", "108.99")
assertPlanStep(t, plan.Steps[5], "ledger_debit", model.RailLedger, model.RailOperationMove, "", "", "RUB", "108.99")
}
// --- test doubles ---

View File

@@ -28,37 +28,6 @@ func buildFXConversionPlan(payment *model.Payment) (*model.PaymentPlan, error) {
}, nil
}
func buildLedgerTransferPlan(payment *model.Payment) (*model.PaymentPlan, error) {
if payment == nil {
return nil, merrors.InvalidArgument("plan builder: payment is required")
}
amount := cloneMoney(payment.Intent.Amount)
steps := []*model.PaymentStep{
{
StepID: "ledger_debit",
Rail: model.RailLedger,
Action: model.RailOperationDebit,
CommitPolicy: model.CommitPolicyImmediate,
Amount: cloneMoney(amount),
},
{
StepID: "ledger_credit",
Rail: model.RailLedger,
Action: model.RailOperationCredit,
DependsOn: []string{"ledger_debit"},
CommitPolicy: model.CommitPolicyAfterSuccess,
CommitAfter: []string{"ledger_debit"},
Amount: cloneMoney(amount),
},
}
return &model.PaymentPlan{
ID: payment.PaymentRef,
Steps: steps,
IdempotencyKey: payment.IdempotencyKey,
CreatedAt: planTimestamp(payment),
}, nil
}
func resolveSettlementAmount(payment *model.Payment, quote *orchestratorv1.PaymentQuote, fallback *paymenttypes.Money) *paymenttypes.Money {
if quote != nil && quote.GetExpectedSettlementAmount() != nil {
return moneyFromProto(quote.GetExpectedSettlementAmount())

View File

@@ -6,9 +6,12 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
pmodel "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mutil/mzap"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
oraclev1 "github.com/tech/sendico/pkg/proto/oracle/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.uber.org/zap"
)
func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment *model.Payment, quote *orchestratorv1.PaymentQuote, template *model.PaymentPlanTemplate, sourceRail, destRail model.Rail, sourceNetwork, destNetwork string, gateways GatewayRegistry) (*model.PaymentPlan, error) {
@@ -16,36 +19,58 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
return nil, merrors.InvalidArgument("plan builder: plan template is required")
}
logger := b.logger.With(
zap.String("payment_ref", payment.PaymentRef),
mzap.ObjRef("template_id", template.ID),
zap.String("source_rail", string(sourceRail)),
zap.String("dest_rail", string(destRail)),
)
logger.Debug("Building plan from template", zap.Int("template_steps", len(template.Steps)))
intentAmount, err := requireMoney(cloneMoney(payment.Intent.Amount), "amount")
if err != nil {
logger.Warn("Invalid intent amount", zap.Error(err))
return nil, err
}
sourceAmount, err := requireMoney(resolveDebitAmount(payment, quote, intentAmount), "debit amount")
if err != nil {
logger.Warn("Failed to resolve debit amount", zap.Error(err))
return nil, err
}
settlementAmount, err := requireMoney(resolveSettlementAmount(payment, quote, sourceAmount), "settlement amount")
if err != nil {
logger.Warn("Failed to resolve settlement amount", zap.Error(err))
return nil, err
}
feeAmount := resolveFeeAmount(payment, quote)
feeRequired := isPositiveMoney(feeAmount)
sourceSendAmount, err := netSourceAmount(sourceAmount, feeAmount, quote)
if err != nil {
logger.Warn("Failed to calculate net source amount", zap.Error(err))
return nil, err
}
providerSettlementAmount := settlementAmount
if payment.Intent.SettlementMode == model.SettlementModeFixReceived && feeRequired {
providerSettlementAmount, err = netSettlementAmount(settlementAmount, feeAmount, quote)
if err != nil {
logger.Warn("Failed to calculate net settlement amount", zap.Error(err))
return nil, err
}
}
logger.Debug("Amounts calculated",
zap.String("intent_amount", moneyString(intentAmount)),
zap.String("source_amount", moneyString(sourceAmount)),
zap.String("settlement_amount", moneyString(settlementAmount)),
zap.String("fee_amount", moneyString(feeAmount)),
zap.Bool("fee_required", feeRequired),
)
payoutAmount := settlementAmount
if destRail == model.RailCardPayout {
payoutAmount, err = cardPayoutAmount(payment)
if err != nil {
logger.Warn("Failed to calculate card payout amount", zap.Error(err))
return nil, err
}
}
@@ -59,6 +84,14 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
steps := make([]*model.PaymentStep, 0, len(template.Steps))
gatewaysByRail := map[model.Rail]*model.GatewayInstanceDescriptor{}
stepIDs := map[string]bool{}
sourceManagedWalletNetwork := ""
destManagedWalletNetwork := ""
if payment.Intent.Source.Type == model.EndpointTypeManagedWallet {
sourceManagedWalletNetwork = networkFromEndpoint(payment.Intent.Source)
}
if payment.Intent.Destination.Type == model.EndpointTypeManagedWallet {
destManagedWalletNetwork = networkFromEndpoint(payment.Intent.Destination)
}
for _, tpl := range template.Steps {
stepID := strings.TrimSpace(tpl.StepID)
@@ -72,6 +105,7 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
action, err := actionForOperation(tpl.Operation)
if err != nil {
b.logger.Warn("plan builder: unsupported operation in plan template step", zap.String("step_id", stepID), zap.String("operation", tpl.Operation), zap.Error(err))
return nil, err
}
@@ -98,6 +132,8 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
CommitPolicy: policy,
CommitAfter: cloneStringList(tpl.CommitAfter),
Amount: cloneMoney(amount),
FromRole: cloneAccountRole(tpl.FromRole),
ToRole: cloneAccountRole(tpl.ToRole),
}
needsGateway := action == model.RailOperationSend || action == model.RailOperationFee || action == model.RailOperationObserveConfirm
@@ -106,6 +142,20 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
}
if needsGateway {
network := gatewayNetworkForRail(tpl.Rail, sourceRail, destRail, sourceNetwork, destNetwork)
managedWalletNetwork := ""
if tpl.Rail == sourceRail && sourceManagedWalletNetwork != "" {
managedWalletNetwork = sourceManagedWalletNetwork
} else if tpl.Rail == destRail && destManagedWalletNetwork != "" {
managedWalletNetwork = destManagedWalletNetwork
}
if managedWalletNetwork != "" {
logger.Debug("Managed wallet network resolved for gateway selection",
zap.String("step_id", stepID),
zap.String("rail", string(tpl.Rail)),
zap.String("managed_wallet_network", managedWalletNetwork),
zap.String("gateway_network", network),
)
}
instanceID := stepInstanceIDForRail(payment.Intent, tpl.Rail, sourceRail, destRail)
checkAmount := amount
if action == model.RailOperationObserveConfirm {
@@ -122,31 +172,61 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
step.InstanceID = strings.TrimSpace(gw.InstanceID)
}
logger.Debug("Plan step added",
zap.String("step_id", step.StepID),
zap.String("rail", string(step.Rail)),
zap.String("action", string(step.Action)),
zap.String("commit_policy", string(step.CommitPolicy)),
zap.String("amount", moneyString(step.Amount)),
zap.Strings("depends_on", step.DependsOn),
)
steps = append(steps, step)
}
if len(steps) == 0 {
logger.Warn("Empty payment plan after processing template")
return nil, merrors.InvalidArgument("plan builder: empty payment plan")
}
execQuote := executionQuote(payment, quote)
return &model.PaymentPlan{
plan := &model.PaymentPlan{
ID: payment.PaymentRef,
FXQuote: fxQuoteFromProto(execQuote.GetFxQuote()),
Fees: feeLinesFromProto(execQuote.GetFeeLines()),
Steps: steps,
IdempotencyKey: payment.IdempotencyKey,
CreatedAt: planTimestamp(payment),
}, nil
}
logger.Info("Payment plan built", zap.Int("steps", len(plan.Steps)),
zap.Int("fees", len(plan.Fees)), zap.Bool("has_fx_quote", plan.FXQuote != nil),
)
return plan, nil
}
func moneyString(m *paymenttypes.Money) string {
if m == nil {
return "nil"
}
return m.Amount + " " + m.Currency
}
func actionForOperation(operation string) (model.RailOperation, error) {
op := strings.ToLower(strings.TrimSpace(operation))
if op == "ledger.block" || op == "ledger.release" {
return model.RailOperationUnspecified, merrors.InvalidArgument("unsupported legacy ledger operation, use ledger.move with roles")
}
switch op {
case "debit", "ledger.debit", "wallet.debit":
return model.RailOperationDebit, nil
case "credit", "ledger.credit", "wallet.credit":
return model.RailOperationCredit, nil
case "ledger.move":
return model.RailOperationMove, nil
case "external.debit":
return model.RailOperationExternalDebit, nil
case "external.credit":
return model.RailOperationExternalCredit, nil
case "debit", "wallet.debit":
return model.RailOperationExternalDebit, nil
case "credit", "wallet.credit":
return model.RailOperationExternalCredit, nil
case "fx.convert", "fx_conversion", "fx.converted":
return model.RailOperationFXConvert, nil
case "observe", "observe.confirm", "observe.confirmation", "observe.crypto", "observe.card":
@@ -155,17 +235,19 @@ func actionForOperation(operation string) (model.RailOperation, error) {
return model.RailOperationFee, nil
case "send", "payout.card", "payout.crypto", "payout.fiat", "payin.crypto", "payin.fiat", "fund.crypto", "fund.card":
return model.RailOperationSend, nil
case "block", "hold", "reserve", "ledger.block", "ledger.hold", "ledger.reserve":
case "block", "hold", "reserve":
return model.RailOperationBlock, nil
case "release", "unblock", "ledger.release":
case "release", "unblock":
return model.RailOperationRelease, nil
}
switch strings.ToUpper(strings.TrimSpace(operation)) {
case string(model.RailOperationDebit):
return model.RailOperationDebit, nil
case string(model.RailOperationCredit):
return model.RailOperationCredit, nil
case string(model.RailOperationExternalDebit), string(model.RailOperationDebit):
return model.RailOperationExternalDebit, nil
case string(model.RailOperationExternalCredit), string(model.RailOperationCredit):
return model.RailOperationExternalCredit, nil
case string(model.RailOperationMove):
return model.RailOperationMove, nil
case string(model.RailOperationSend):
return model.RailOperationSend, nil
case string(model.RailOperationFee):
@@ -185,16 +267,21 @@ func actionForOperation(operation string) (model.RailOperation, error) {
func stepAmountForAction(action model.RailOperation, rail, sourceRail, destRail model.Rail, sourceSendAmount, settlementAmount, payoutAmount, feeAmount, ledgerDebitAmount, ledgerCreditAmount *paymenttypes.Money, feeRequired bool) (*paymenttypes.Money, error) {
switch action {
case model.RailOperationDebit:
case model.RailOperationDebit, model.RailOperationExternalDebit:
if rail == model.RailLedger {
return cloneMoney(ledgerDebitAmount), nil
}
return cloneMoney(settlementAmount), nil
case model.RailOperationCredit:
case model.RailOperationCredit, model.RailOperationExternalCredit:
if rail == model.RailLedger {
return cloneMoney(ledgerCreditAmount), nil
}
return cloneMoney(settlementAmount), nil
case model.RailOperationMove:
if rail == model.RailLedger {
return cloneMoney(ledgerDebitAmount), nil
}
return cloneMoney(settlementAmount), nil
case model.RailOperationSend:
switch rail {
case sourceRail:
@@ -257,6 +344,14 @@ func observeAmountForRail(rail model.Rail, source, settlement, payout *paymentty
return source
}
func cloneAccountRole(role *pmodel.AccountRole) *pmodel.AccountRole {
if role == nil {
return nil
}
cloned := *role
return &cloned
}
func netSourceAmount(sourceAmount, feeAmount *paymenttypes.Money, quote *orchestratorv1.PaymentQuote) (*paymenttypes.Money, error) {
if sourceAmount == nil {
return nil, merrors.InvalidArgument("plan builder: source amount is required")

View File

@@ -7,12 +7,23 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mutil/mzap"
"go.uber.org/zap"
)
func selectPlanTemplate(ctx context.Context, templates PlanTemplateStore, sourceRail, destRail model.Rail, network string) (*model.PaymentPlanTemplate, error) {
func selectPlanTemplate(ctx context.Context, logger mlogger.Logger, templates PlanTemplateStore, sourceRail, destRail model.Rail, network string) (*model.PaymentPlanTemplate, error) {
if templates == nil {
return nil, merrors.InvalidArgument("plan builder: plan templates store is required")
}
logger = logger.With(
zap.String("source_rail", string(sourceRail)),
zap.String("dest_rail", string(destRail)),
zap.String("network", network),
)
logger.Debug("Selecting plan template")
enabled := true
result, err := templates.List(ctx, &model.PaymentPlanTemplateFilter{
FromRail: sourceRail,
@@ -20,12 +31,16 @@ func selectPlanTemplate(ctx context.Context, templates PlanTemplateStore, source
IsEnabled: &enabled,
})
if err != nil {
logger.Warn("Failed to list plan templates", zap.Error(err))
return nil, err
}
if result == nil || len(result.Items) == 0 {
logger.Warn("No plan templates found for route")
return nil, merrors.InvalidArgument("plan builder: plan template missing")
}
logger.Debug("Fetched plan templates", zap.Int("total", len(result.Items)))
candidates := make([]*model.PaymentPlanTemplate, 0, len(result.Items))
for _, tpl := range result.Items {
if tpl == nil || !tpl.IsEnabled {
@@ -35,17 +50,23 @@ func selectPlanTemplate(ctx context.Context, templates PlanTemplateStore, source
continue
}
if !templateMatchesNetwork(tpl, network) {
logger.Debug("Template network mismatch, skipping",
mzap.StorableRef(tpl),
zap.String("template_network", tpl.Network))
continue
}
if err := validatePlanTemplate(tpl); err != nil {
if err := validatePlanTemplate(logger, tpl); err != nil {
return nil, err
}
candidates = append(candidates, tpl)
}
if len(candidates) == 0 {
logger.Warn("No valid plan template candidates after filtering")
return nil, merrors.InvalidArgument("plan builder: plan template missing")
}
logger.Debug("Plan template candidates filtered", zap.Int("candidates", len(candidates)))
sort.Slice(candidates, func(i, j int) bool {
pi := templatePriority(candidates[i], network)
pj := templatePriority(candidates[j], network)
@@ -58,7 +79,14 @@ func selectPlanTemplate(ctx context.Context, templates PlanTemplateStore, source
return candidates[i].ID.Hex() < candidates[j].ID.Hex()
})
return candidates[0], nil
selected := candidates[0]
logger.Debug("Plan template selected",
mzap.StorableRef(selected),
zap.String("template_network", selected.Network),
zap.Int("steps", len(selected.Steps)),
zap.Int("priority", templatePriority(selected, network)))
return selected, nil
}
func templateMatchesNetwork(template *model.PaymentPlanTemplate, network string) bool {
@@ -91,38 +119,87 @@ func templatePriority(template *model.PaymentPlanTemplate, network string) int {
return 2
}
func validatePlanTemplate(template *model.PaymentPlanTemplate) error {
func validatePlanTemplate(logger mlogger.Logger, template *model.PaymentPlanTemplate) error {
if template == nil {
return merrors.InvalidArgument("plan builder: plan template is required")
}
logger = logger.With(
mzap.StorableRef(template),
zap.String("from_rail", string(template.FromRail)),
zap.String("to_rail", string(template.ToRail)),
zap.String("network", template.Network),
)
logger.Debug("Validating plan template")
if len(template.Steps) == 0 {
logger.Warn("Plan template has no steps")
return merrors.InvalidArgument("plan builder: plan template steps are required")
}
seen := map[string]struct{}{}
for _, step := range template.Steps {
for idx, step := range template.Steps {
id := strings.TrimSpace(step.StepID)
if id == "" {
logger.Warn("Plan template step missing ID", zap.Int("step_index", idx))
return merrors.InvalidArgument("plan builder: plan template step id is required")
}
if _, exists := seen[id]; exists {
logger.Warn("Duplicate plan template step ID", zap.String("step_id", id))
return merrors.InvalidArgument("plan builder: plan template step id must be unique")
}
seen[id] = struct{}{}
if strings.TrimSpace(step.Operation) == "" {
logger.Warn("Plan template step missing operation", zap.String("step_id", id),
zap.Int("step_index", idx))
return merrors.InvalidArgument("plan builder: plan template operation is required")
}
action, err := actionForOperation(step.Operation)
if err != nil {
logger.Warn("Plan template step has invalid operation", zap.String("step_id", id),
zap.String("operation", step.Operation), zap.Error(err))
return err
}
if step.Rail == model.RailLedger && action == model.RailOperationMove {
if step.FromRole == nil || strings.TrimSpace(string(*step.FromRole)) == "" {
logger.Warn("Ledger move step missing fromRole", zap.String("step_id", id),
zap.String("operation", step.Operation))
return merrors.InvalidArgument("plan builder: ledger.move fromRole is required")
}
if step.ToRole == nil || strings.TrimSpace(string(*step.ToRole)) == "" {
logger.Warn("Ledger move step missing toRole", zap.String("step_id", id),
zap.String("operation", step.Operation))
return merrors.InvalidArgument("plan builder: ledger.move toRole is required")
}
from := strings.ToLower(strings.TrimSpace(string(*step.FromRole)))
to := strings.ToLower(strings.TrimSpace(string(*step.ToRole)))
if from == "" || to == "" || strings.EqualFold(from, to) {
logger.Warn("Ledger move step has invalid roles", zap.String("step_id", id),
zap.String("from_role", from), zap.String("to_role", to))
return merrors.InvalidArgument("plan builder: ledger.move fromRole and toRole must differ")
}
}
}
for _, step := range template.Steps {
for _, dep := range step.DependsOn {
if _, ok := seen[strings.TrimSpace(dep)]; !ok {
depID := strings.TrimSpace(dep)
if _, ok := seen[depID]; !ok {
logger.Warn("Plan template step has missing dependency", zap.String("step_id", step.StepID),
zap.String("missing_dependency", depID))
return merrors.InvalidArgument("plan builder: plan template dependency missing")
}
}
for _, dep := range step.CommitAfter {
if _, ok := seen[strings.TrimSpace(dep)]; !ok {
depID := strings.TrimSpace(dep)
if _, ok := seen[depID]; !ok {
logger.Warn("Plan template step has missing commit dependency", zap.String("step_id", step.StepID),
zap.String("missing_commit_dependency", depID))
return merrors.InvalidArgument("plan builder: plan template commit dependency missing")
}
}
}
logger.Debug("Plan template validation successful", zap.Int("steps", len(template.Steps)))
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
pmodel "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/payments/rail"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
@@ -19,7 +20,7 @@ const (
providerSettlementMetaSourceCurrency = "source_currency"
)
func (p *paymentExecutor) buildProviderSettlementTransferRequest(payment *model.Payment, step *model.PaymentStep, amount *paymenttypes.Money, quote *orchestratorv1.PaymentQuote, idx int) (rail.TransferRequest, error) {
func (p *paymentExecutor) buildProviderSettlementTransferRequest(payment *model.Payment, step *model.PaymentStep, amount *paymenttypes.Money, quote *orchestratorv1.PaymentQuote, idx int, fromRole, toRole *pmodel.AccountRole) (rail.TransferRequest, error) {
if payment == nil || step == nil {
return rail.TransferRequest{}, merrors.InvalidArgument("provider settlement: payment and step are required")
}
@@ -76,7 +77,7 @@ func (p *paymentExecutor) buildProviderSettlementTransferRequest(payment *model.
destRef = paymentRef
}
return rail.TransferRequest{
req := rail.TransferRequest{
OrganizationRef: payment.OrganizationRef.Hex(),
FromAccountID: sourceWalletRef,
ToAccountID: destRef,
@@ -86,7 +87,14 @@ func (p *paymentExecutor) buildProviderSettlementTransferRequest(payment *model.
DestinationMemo: paymentRef,
Metadata: metadata,
ClientReference: paymentRef,
}, nil
}
if fromRole != nil {
req.FromRole = *fromRole
}
if toRole != nil {
req.ToRole = *toRole
}
return req, nil
}
func paymentGatewayQuoteRef(payment *model.Payment, quote *orchestratorv1.PaymentQuote) string {

View File

@@ -3,6 +3,7 @@ package orchestrator
import (
"context"
"errors"
"fmt"
"strings"
"time"
@@ -62,6 +63,13 @@ func (s *Service) buildPaymentQuote(ctx context.Context, orgRef string, req *orc
return nil, time.Time{}, err
}
}
conversionFeeQuote := &feesv1.PrecomputeFeesResponse{}
if s.shouldQuoteConversionFee(ctx, req.GetIntent()) {
conversionFeeQuote, err = s.quoteConversionFees(ctx, orgRef, req, feeBaseAmount)
if err != nil {
return nil, time.Time{}, err
}
}
feeCurrency := ""
if feeBaseAmount != nil {
feeCurrency = feeBaseAmount.GetCurrency()
@@ -69,6 +77,9 @@ func (s *Service) buildPaymentQuote(ctx context.Context, orgRef string, req *orc
feeCurrency = amount.GetCurrency()
}
feeLines := cloneFeeLines(feeQuote.GetLines())
if conversionFeeQuote != nil {
feeLines = append(feeLines, cloneFeeLines(conversionFeeQuote.GetLines())...)
}
s.assignFeeLedgerAccounts(intent, feeLines)
feeTotal := extractFeeTotal(feeLines, feeCurrency)
@@ -78,7 +89,7 @@ func (s *Service) buildPaymentQuote(ctx context.Context, orgRef string, req *orc
if err != nil {
return nil, time.Time{}, err
}
s.logger.Debug("network fee estimated", zap.String("org_ref", orgRef))
s.logger.Debug("Network fee estimated", zap.String("org_ref", orgRef))
}
debitAmount, settlementAmount := computeAggregates(payAmount, settlementAmountBeforeFees, feeTotal, networkFee, fxQuote, intent.GetSettlementMode())
@@ -88,12 +99,18 @@ func (s *Service) buildPaymentQuote(ctx context.Context, orgRef string, req *orc
ExpectedSettlementAmount: settlementAmount,
ExpectedFeeTotal: feeTotal,
FeeLines: feeLines,
FeeRules: cloneFeeRules(feeQuote.GetApplied()),
FeeRules: mergeFeeRules(feeQuote, conversionFeeQuote),
FxQuote: fxQuote,
NetworkFee: networkFee,
}
expiresAt := quoteExpiry(s.clock.Now(), feeQuote, fxQuote)
if conversionFeeQuote != nil {
convExpiry := quoteExpiry(s.clock.Now(), conversionFeeQuote, fxQuote)
if convExpiry.Before(expiresAt) {
expiresAt = convExpiry
}
}
return quote, expiresAt, nil
}
@@ -107,13 +124,14 @@ func (s *Service) quoteFees(ctx context.Context, orgRef string, req *orchestrato
if amount == nil {
amount = cloneProtoMoney(intent.GetAmount())
}
attrs := ensureFeeAttributes(intent, amount, cloneMetadata(intent.GetAttributes()))
feeIntent := &feesv1.Intent{
Trigger: triggerFromKind(intent.GetKind(), intent.GetRequiresFx()),
Trigger: feeTriggerForIntent(intent),
BaseAmount: amount,
BookedAt: timestamppb.New(s.clock.Now()),
OriginType: "payments.orchestrator.quote",
OriginRef: strings.TrimSpace(req.GetIdempotencyKey()),
Attributes: cloneMetadata(intent.GetAttributes()),
Attributes: attrs,
}
timeout := req.GetMeta().GetTrace()
ctxTimeout, cancel := s.withTimeout(ctx, s.deps.fees.timeout)
@@ -127,12 +145,259 @@ func (s *Service) quoteFees(ctx context.Context, orgRef string, req *orchestrato
TtlMs: defaultFeeQuoteTTLMillis,
})
if err != nil {
s.logger.Warn("fees precompute failed", zap.Error(err))
s.logger.Warn("Fees precompute failed", zap.Error(err))
return nil, merrors.Internal("fees_precompute_failed")
}
return resp, nil
}
func (s *Service) quoteConversionFees(ctx context.Context, orgRef string, req *orchestratorv1.QuotePaymentRequest, baseAmount *moneyv1.Money) (*feesv1.PrecomputeFeesResponse, error) {
if !s.deps.fees.available() {
return &feesv1.PrecomputeFeesResponse{}, nil
}
intent := req.GetIntent()
amount := cloneProtoMoney(baseAmount)
if amount == nil {
amount = cloneProtoMoney(intent.GetAmount())
}
attrs := ensureFeeAttributes(intent, amount, cloneMetadata(intent.GetAttributes()))
attrs["product"] = "wallet"
attrs["source_type"] = "managed_wallet"
attrs["destination_type"] = "ledger"
feeIntent := &feesv1.Intent{
Trigger: feesv1.Trigger_TRIGGER_CAPTURE,
BaseAmount: amount,
BookedAt: timestamppb.New(s.clock.Now()),
OriginType: "payments.orchestrator.conversion_quote",
OriginRef: strings.TrimSpace(req.GetIdempotencyKey()),
Attributes: attrs,
}
timeout := req.GetMeta().GetTrace()
ctxTimeout, cancel := s.withTimeout(ctx, s.deps.fees.timeout)
defer cancel()
resp, err := s.deps.fees.client.PrecomputeFees(ctxTimeout, &feesv1.PrecomputeFeesRequest{
Meta: &feesv1.RequestMeta{
OrganizationRef: orgRef,
Trace: timeout,
},
Intent: feeIntent,
TtlMs: defaultFeeQuoteTTLMillis,
})
if err != nil {
s.logger.Warn("Conversion fee precompute failed", zap.Error(err))
return nil, merrors.Internal("fees_precompute_failed")
}
setFeeLineTarget(resp.GetLines(), feeLineTargetWallet)
if src := intent.GetSource().GetManagedWallet(); src != nil {
setFeeLineWalletRef(resp.GetLines(), src.GetManagedWalletRef(), "managed_wallet")
}
return resp, nil
}
func (s *Service) shouldQuoteConversionFee(ctx context.Context, intent *orchestratorv1.PaymentIntent) bool {
if intent == nil {
return false
}
if !isManagedWalletEndpoint(intent.GetSource()) {
return false
}
if isLedgerEndpoint(intent.GetDestination()) {
return false
}
if s.storage == nil {
return false
}
templates := s.storage.PlanTemplates()
if templates == nil {
return false
}
intentModel := intentFromProto(intent)
sourceRail, sourceNetwork, err := railFromEndpoint(intentModel.Source, intentModel.Attributes, true)
if err != nil {
return false
}
destRail, destNetwork, err := railFromEndpoint(intentModel.Destination, intentModel.Attributes, false)
if err != nil {
return false
}
network, err := resolveRouteNetwork(intentModel.Attributes, sourceNetwork, destNetwork)
if err != nil {
return false
}
template, err := selectPlanTemplate(ctx, s.logger.Named("quote_payment"), templates, sourceRail, destRail, network)
if err != nil {
return false
}
return templateHasLedgerMove(template)
}
func templateHasLedgerMove(template *model.PaymentPlanTemplate) bool {
if template == nil {
return false
}
for _, step := range template.Steps {
if step.Rail != model.RailLedger {
continue
}
if strings.EqualFold(strings.TrimSpace(step.Operation), "ledger.move") {
return true
}
}
return false
}
func mergeFeeRules(primary, secondary *feesv1.PrecomputeFeesResponse) []*feesv1.AppliedRule {
rules := cloneFeeRules(nil)
if primary != nil {
rules = append(rules, cloneFeeRules(primary.GetApplied())...)
}
if secondary != nil {
rules = append(rules, cloneFeeRules(secondary.GetApplied())...)
}
if len(rules) == 0 {
return nil
}
return rules
}
func ensureFeeAttributes(intent *orchestratorv1.PaymentIntent, baseAmount *moneyv1.Money, attrs map[string]string) map[string]string {
if attrs == nil {
attrs = map[string]string{}
}
if intent == nil {
return attrs
}
setFeeAttributeIfMissing(attrs, "product", "wallet")
if op := feeOperationFromKind(intent.GetKind()); op != "" {
setFeeAttributeIfMissing(attrs, "operation", op)
}
if currency := feeCurrencyFromAmount(baseAmount, intent.GetAmount()); currency != "" {
setFeeAttributeIfMissing(attrs, "currency", currency)
}
if srcType := endpointTypeFromProto(intent.GetSource()); srcType != "" {
setFeeAttributeIfMissing(attrs, "source_type", srcType)
}
if dstType := endpointTypeFromProto(intent.GetDestination()); dstType != "" {
setFeeAttributeIfMissing(attrs, "destination_type", dstType)
}
if asset := assetFromIntent(intent); asset != nil {
if token := strings.TrimSpace(asset.GetTokenSymbol()); token != "" {
setFeeAttributeIfMissing(attrs, "asset", token)
}
if chain := asset.GetChain(); chain != chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED {
if network := strings.TrimSpace(chainpkg.NetworkAlias(chain)); network != "" {
setFeeAttributeIfMissing(attrs, "network", network)
}
}
}
return attrs
}
func feeTriggerForIntent(intent *orchestratorv1.PaymentIntent) feesv1.Trigger {
if intent == nil {
return feesv1.Trigger_TRIGGER_UNSPECIFIED
}
trigger := triggerFromKind(intent.GetKind(), intent.GetRequiresFx())
if trigger != feesv1.Trigger_TRIGGER_FX_CONVERSION && isManagedWalletEndpoint(intent.GetSource()) && isLedgerEndpoint(intent.GetDestination()) {
return feesv1.Trigger_TRIGGER_CAPTURE
}
return trigger
}
func isManagedWalletEndpoint(endpoint *orchestratorv1.PaymentEndpoint) bool {
return endpoint != nil && endpoint.GetManagedWallet() != nil
}
func isLedgerEndpoint(endpoint *orchestratorv1.PaymentEndpoint) bool {
return endpoint != nil && endpoint.GetLedger() != nil
}
func setFeeAttributeIfMissing(attrs map[string]string, key, value string) {
if attrs == nil {
return
}
if strings.TrimSpace(key) == "" {
return
}
value = strings.TrimSpace(value)
if value == "" {
return
}
if _, exists := attrs[key]; exists {
return
}
attrs[key] = value
}
func feeOperationFromKind(kind orchestratorv1.PaymentKind) string {
switch kind {
case orchestratorv1.PaymentKind_PAYMENT_KIND_PAYOUT:
return "payout"
case orchestratorv1.PaymentKind_PAYMENT_KIND_INTERNAL_TRANSFER:
return "internal_transfer"
case orchestratorv1.PaymentKind_PAYMENT_KIND_FX_CONVERSION:
return "fx_conversion"
default:
return ""
}
}
func feeCurrencyFromAmount(baseAmount, intentAmount *moneyv1.Money) string {
if baseAmount != nil {
if currency := strings.TrimSpace(baseAmount.GetCurrency()); currency != "" {
return currency
}
}
if intentAmount != nil {
return strings.TrimSpace(intentAmount.GetCurrency())
}
return ""
}
func endpointTypeFromProto(endpoint *orchestratorv1.PaymentEndpoint) string {
if endpoint == nil {
return ""
}
switch {
case endpoint.GetLedger() != nil:
return "ledger"
case endpoint.GetManagedWallet() != nil:
return "managed_wallet"
case endpoint.GetExternalChain() != nil:
return "external_chain"
case endpoint.GetCard() != nil:
return "card"
default:
return ""
}
}
func assetFromIntent(intent *orchestratorv1.PaymentIntent) *chainv1.Asset {
if intent == nil {
return nil
}
if asset := assetFromEndpoint(intent.GetDestination()); asset != nil {
return asset
}
return assetFromEndpoint(intent.GetSource())
}
func assetFromEndpoint(endpoint *orchestratorv1.PaymentEndpoint) *chainv1.Asset {
if endpoint == nil {
return nil
}
if wallet := endpoint.GetManagedWallet(); wallet != nil {
return wallet.GetAsset()
}
if external := endpoint.GetExternalChain(); external != nil {
return external.GetAsset()
}
return nil
}
func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1.PaymentIntent) (*chainv1.EstimateTransferFeeResponse, error) {
req := &chainv1.EstimateTransferFeeRequest{
Amount: cloneProtoMoney(intent.GetAmount()),
@@ -169,10 +434,10 @@ func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1
client, _, err := s.resolveChainGatewayClient(ctx, network, moneyFromProto(req.Amount), []model.RailOperation{model.RailOperationSend}, instanceID, "")
if err != nil {
if errors.Is(err, merrors.ErrNoData) {
s.logger.Debug("network fee estimation skipped: gateway unavailable", zap.Error(err))
s.logger.Debug("Network fee estimation skipped: gateway unavailable", zap.Error(err))
return nil, nil
}
s.logger.Warn("chain gateway resolution failed", zap.Error(err))
s.logger.Warn("Chain gateway resolution failed", zap.Error(err))
return nil, err
}
if client == nil {
@@ -244,11 +509,11 @@ func (s *Service) requestFXQuote(ctx context.Context, orgRef string, req *orches
quote, err := s.deps.oracle.client.GetQuote(ctx, params)
if err != nil {
s.logger.Warn("fx oracle quote failed", zap.Error(err))
return nil, merrors.Internal("fx_quote_failed")
return nil, merrors.Internal(fmt.Sprintf("orchestrator: fx quote failed, %s", err.Error()))
}
if quote == nil {
if intent.GetRequiresFx() {
return nil, merrors.Internal("fx_quote_missing")
return nil, merrors.Internal("orchestrator: fx quote missing")
}
return nil, nil
}
@@ -292,11 +557,11 @@ func (s *Service) assignFeeLedgerAccounts(intent *orchestratorv1.PaymentIntent,
}
if account == "" {
s.logger.Debug("no fee ledger account mapping found", zap.String("gateway", key), zap.Int("missing_lines", missing))
s.logger.Debug("No fee ledger account mapping found", zap.String("gateway", key), zap.Int("missing_lines", missing))
return
}
assignLedgerAccounts(lines, account)
s.logger.Debug("applied fee ledger account mapping", zap.String("gateway", key), zap.String("ledger_account", account), zap.Int("lines", missing))
s.logger.Debug("Applied fee ledger account mapping", zap.String("gateway", key), zap.String("ledger_account", account), zap.Int("lines", missing))
}
func (s *Service) gatewayKeyFromIntent(intent *orchestratorv1.PaymentIntent) string {

View File

@@ -141,8 +141,8 @@ func TestRequestFXQuoteFailsWhenRequiredAndQuoteMissing(t *testing.T) {
if !errors.Is(err, merrors.ErrInternal) {
t.Fatalf("expected internal error, got %v", err)
}
if !strings.Contains(err.Error(), "fx_quote_missing") {
t.Fatalf("expected fx_quote_missing error, got %v", err)
if !strings.Contains(err.Error(), "orchestrator: fx quote missing") {
t.Fatalf("expected 'orchestrator: fx quote missing' error, got %v", err)
}
}
}

View File

@@ -11,10 +11,13 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage/model"
clockpkg "github.com/tech/sendico/pkg/clock"
mloggerfactory "github.com/tech/sendico/pkg/mlogger/factory"
pmodel "github.com/tech/sendico/pkg/model"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"google.golang.org/protobuf/types/known/structpb"
)
func TestValidateMetaAndOrgRef(t *testing.T) {
@@ -193,11 +196,18 @@ func TestInitiatePaymentIdempotency(t *testing.T) {
org := primitive.NewObjectID()
store := newHelperPaymentStore()
ledgerFake := &ledgerclient.Fake{
PostDebitWithChargesFn: func(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) {
return &ledgerv1.PostResponse{JournalEntryRef: "debit-1"}, nil
ListConnectorAccountsFn: func(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
operatingDetails, _ := structpb.NewStruct(map[string]interface{}{"role": "ACCOUNT_ROLE_OPERATING"})
transitDetails, _ := structpb.NewStruct(map[string]interface{}{"role": "ACCOUNT_ROLE_TRANSIT"})
return &connectorv1.ListAccountsResponse{
Accounts: []*connectorv1.Account{
{Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: "ledger:operating"}, Kind: connectorv1.AccountKind_LEDGER_ACCOUNT, Asset: "USD", ProviderDetails: operatingDetails},
{Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: "ledger:transit"}, Kind: connectorv1.AccountKind_LEDGER_ACCOUNT, Asset: "USD", ProviderDetails: transitDetails},
},
}, nil
},
PostCreditWithChargesFn: func(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
return &ledgerv1.PostResponse{JournalEntryRef: "credit-1"}, nil
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
return &ledgerv1.PostResponse{JournalEntryRef: "move-1"}, nil
},
}
routes := &stubRoutesStore{
@@ -212,8 +222,7 @@ func TestInitiatePaymentIdempotency(t *testing.T) {
ToRail: model.RailLedger,
IsEnabled: true,
Steps: []model.OrchestrationStep{
{StepID: "ledger_debit", Rail: model.RailLedger, Operation: "ledger.debit"},
{StepID: "ledger_credit", Rail: model.RailLedger, Operation: "ledger.credit", DependsOn: []string{"ledger_debit"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"ledger_debit"}},
{StepID: "ledger_move", Rail: model.RailLedger, Operation: "ledger.move", FromRole: rolePtr(pmodel.AccountRoleOperating), ToRole: rolePtr(pmodel.AccountRoleTransit)},
},
},
},
@@ -273,11 +282,18 @@ func TestInitiatePaymentByQuoteRef(t *testing.T) {
Quote: &model.PaymentQuoteSnapshot{},
}
ledgerFake := &ledgerclient.Fake{
PostDebitWithChargesFn: func(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) {
return &ledgerv1.PostResponse{JournalEntryRef: "debit-1"}, nil
ListConnectorAccountsFn: func(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
operatingDetails, _ := structpb.NewStruct(map[string]interface{}{"role": "ACCOUNT_ROLE_OPERATING"})
transitDetails, _ := structpb.NewStruct(map[string]interface{}{"role": "ACCOUNT_ROLE_TRANSIT"})
return &connectorv1.ListAccountsResponse{
Accounts: []*connectorv1.Account{
{Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: "ledger:operating"}, Kind: connectorv1.AccountKind_LEDGER_ACCOUNT, Asset: "USD", ProviderDetails: operatingDetails},
{Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: "ledger:transit"}, Kind: connectorv1.AccountKind_LEDGER_ACCOUNT, Asset: "USD", ProviderDetails: transitDetails},
},
}, nil
},
PostCreditWithChargesFn: func(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
return &ledgerv1.PostResponse{JournalEntryRef: "credit-1"}, nil
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
return &ledgerv1.PostResponse{JournalEntryRef: "move-1"}, nil
},
}
routes := &stubRoutesStore{
@@ -292,8 +308,7 @@ func TestInitiatePaymentByQuoteRef(t *testing.T) {
ToRail: model.RailLedger,
IsEnabled: true,
Steps: []model.OrchestrationStep{
{StepID: "ledger_debit", Rail: model.RailLedger, Operation: "ledger.debit"},
{StepID: "ledger_credit", Rail: model.RailLedger, Operation: "ledger.credit", DependsOn: []string{"ledger_debit"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"ledger_debit"}},
{StepID: "ledger_move", Rail: model.RailLedger, Operation: "ledger.move", FromRole: rolePtr(pmodel.AccountRoleOperating), ToRole: rolePtr(pmodel.AccountRoleTransit)},
},
},
},
@@ -444,3 +459,7 @@ func (s *helperQuotesStore) GetByIdempotencyKey(_ context.Context, orgRef primit
}
return nil, storage.ErrQuoteNotFound
}
func rolePtr(role pmodel.AccountRole) *pmodel.AccountRole {
return &role
}

View File

@@ -107,7 +107,7 @@ func TestExecutePayment_ChainFailure(t *testing.T) {
Steps: []model.OrchestrationStep{
{StepID: "crypto_send", Rail: model.RailCrypto, Operation: "payout.crypto"},
{StepID: "crypto_observe", Rail: model.RailCrypto, Operation: "observe.confirm", DependsOn: []string{"crypto_send"}},
{StepID: "ledger_credit", Rail: model.RailLedger, Operation: "ledger.credit", DependsOn: []string{"crypto_observe"}},
{StepID: "ledger_credit", Rail: model.RailLedger, Operation: "ledger.move", DependsOn: []string{"crypto_observe"}, FromRole: rolePtr(mo.AccountRolePending), ToRole: rolePtr(mo.AccountRoleOperating)},
},
},
},

View File

@@ -6,6 +6,7 @@ import (
"github.com/tech/sendico/pkg/db/storable"
"github.com/tech/sendico/pkg/model"
pmodel "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
)
@@ -33,9 +34,11 @@ const (
type CommitPolicy string
const (
CommitPolicyUnspecified CommitPolicy = "UNSPECIFIED"
CommitPolicyImmediate CommitPolicy = "IMMEDIATE"
CommitPolicyAfterSuccess CommitPolicy = "AFTER_SUCCESS"
CommitPolicyUnspecified CommitPolicy = "UNSPECIFIED"
CommitPolicyImmediate CommitPolicy = "IMMEDIATE"
CommitPolicyAfterSuccess CommitPolicy = "AFTER_SUCCESS"
CommitPolicyAfterFailure CommitPolicy = "AFTER_FAILURE"
CommitPolicyAfterCanceled CommitPolicy = "AFTER_TERMINAL"
)
// PaymentState enumerates lifecycle phases.
@@ -83,6 +86,9 @@ const (
RailOperationUnspecified RailOperation = "UNSPECIFIED"
RailOperationDebit RailOperation = "DEBIT"
RailOperationCredit RailOperation = "CREDIT"
RailOperationExternalDebit RailOperation = "EXTERNAL_DEBIT"
RailOperationExternalCredit RailOperation = "EXTERNAL_CREDIT"
RailOperationMove RailOperation = "MOVE"
RailOperationSend RailOperation = "SEND"
RailOperationFee RailOperation = "FEE"
RailOperationObserveConfirm RailOperation = "OBSERVE_CONFIRM"
@@ -265,26 +271,28 @@ type ExecutionRefs struct {
// PaymentStep is an explicit action within a payment plan.
type PaymentStep struct {
StepID string `bson:"stepId,omitempty" json:"stepId,omitempty"`
Rail Rail `bson:"rail" json:"rail"`
GatewayID string `bson:"gatewayId,omitempty" json:"gatewayId,omitempty"`
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
Action RailOperation `bson:"action" json:"action"`
DependsOn []string `bson:"dependsOn,omitempty" json:"dependsOn,omitempty"`
CommitPolicy CommitPolicy `bson:"commitPolicy,omitempty" json:"commitPolicy,omitempty"`
CommitAfter []string `bson:"commitAfter,omitempty" json:"commitAfter,omitempty"`
StepID string `bson:"stepId,omitempty" json:"stepId,omitempty"`
Rail Rail `bson:"rail" json:"rail"`
GatewayID string `bson:"gatewayId,omitempty" json:"gatewayId,omitempty"`
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
Action RailOperation `bson:"action" json:"action"`
DependsOn []string `bson:"dependsOn,omitempty" json:"dependsOn,omitempty"`
CommitPolicy CommitPolicy `bson:"commitPolicy,omitempty" json:"commitPolicy,omitempty"`
CommitAfter []string `bson:"commitAfter,omitempty" json:"commitAfter,omitempty"`
Amount *paymenttypes.Money `bson:"amount,omitempty" json:"amount,omitempty"`
Ref string `bson:"ref,omitempty" json:"ref,omitempty"`
Ref string `bson:"ref,omitempty" json:"ref,omitempty"`
FromRole *pmodel.AccountRole `bson:"fromRole,omitempty" json:"fromRole,omitempty"`
ToRole *pmodel.AccountRole `bson:"toRole,omitempty" json:"toRole,omitempty"`
}
// PaymentPlan captures the ordered list of steps to execute a payment.
type PaymentPlan struct {
ID string `bson:"id,omitempty" json:"id,omitempty"`
FXQuote *paymenttypes.FXQuote `bson:"fxQuote,omitempty" json:"fxQuote,omitempty"`
ID string `bson:"id,omitempty" json:"id,omitempty"`
FXQuote *paymenttypes.FXQuote `bson:"fxQuote,omitempty" json:"fxQuote,omitempty"`
Fees []*paymenttypes.FeeLine `bson:"fees,omitempty" json:"fees,omitempty"`
Steps []*PaymentStep `bson:"steps,omitempty" json:"steps,omitempty"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotencyKey,omitempty"`
CreatedAt time.Time `bson:"createdAt,omitempty" json:"createdAt,omitempty"`
Steps []*PaymentStep `bson:"steps,omitempty" json:"steps,omitempty"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotencyKey,omitempty"`
CreatedAt time.Time `bson:"createdAt,omitempty" json:"createdAt,omitempty"`
}
// ExecutionStep describes a planned or executed payment step for reporting.
@@ -467,7 +475,7 @@ func normalizeEndpoint(ep *PaymentEndpoint) {
func normalizeCommitPolicy(policy CommitPolicy) CommitPolicy {
val := strings.ToUpper(strings.TrimSpace(string(policy)))
switch CommitPolicy(val) {
case CommitPolicyImmediate, CommitPolicyAfterSuccess:
case CommitPolicyImmediate, CommitPolicyAfterSuccess, CommitPolicyAfterFailure, CommitPolicyAfterCanceled:
return CommitPolicy(val)
default:
if val == "" {

View File

@@ -4,28 +4,31 @@ import (
"strings"
"github.com/tech/sendico/pkg/db/storable"
pmodel "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
)
// OrchestrationStep defines a template step for execution planning.
type OrchestrationStep struct {
StepID string `bson:"stepId" json:"stepId"`
Rail Rail `bson:"rail" json:"rail"`
Operation string `bson:"operation" json:"operation"`
DependsOn []string `bson:"dependsOn,omitempty" json:"dependsOn,omitempty"`
CommitPolicy CommitPolicy `bson:"commitPolicy,omitempty" json:"commitPolicy,omitempty"`
CommitAfter []string `bson:"commitAfter,omitempty" json:"commitAfter,omitempty"`
StepID string `bson:"stepId" json:"stepId"`
Rail Rail `bson:"rail" json:"rail"`
Operation string `bson:"operation" json:"operation"`
DependsOn []string `bson:"dependsOn,omitempty" json:"dependsOn,omitempty"`
CommitPolicy CommitPolicy `bson:"commitPolicy,omitempty" json:"commitPolicy,omitempty"`
CommitAfter []string `bson:"commitAfter,omitempty" json:"commitAfter,omitempty"`
FromRole *pmodel.AccountRole `bson:"fromRole,omitempty" json:"fromRole,omitempty"`
ToRole *pmodel.AccountRole `bson:"toRole,omitempty" json:"toRole,omitempty"`
}
// PaymentPlanTemplate stores reusable orchestration templates.
type PaymentPlanTemplate struct {
storable.Base `bson:",inline" json:",inline"`
FromRail Rail `bson:"fromRail" json:"fromRail"`
ToRail Rail `bson:"toRail" json:"toRail"`
Network string `bson:"network,omitempty" json:"network,omitempty"`
FromRail Rail `bson:"fromRail" json:"fromRail"`
ToRail Rail `bson:"toRail" json:"toRail"`
Network string `bson:"network,omitempty" json:"network,omitempty"`
Steps []OrchestrationStep `bson:"steps,omitempty" json:"steps,omitempty"`
IsEnabled bool `bson:"isEnabled" json:"isEnabled"`
IsEnabled bool `bson:"isEnabled" json:"isEnabled"`
}
// Collection implements storable.Storable.
@@ -52,9 +55,30 @@ func (t *PaymentPlanTemplate) Normalize() {
step.CommitPolicy = normalizeCommitPolicy(step.CommitPolicy)
step.DependsOn = normalizeStringList(step.DependsOn)
step.CommitAfter = normalizeStringList(step.CommitAfter)
step.FromRole = normalizeAccountRole(step.FromRole)
step.ToRole = normalizeAccountRole(step.ToRole)
}
}
func normalizeAccountRole(role *pmodel.AccountRole) *pmodel.AccountRole {
if role == nil {
return nil
}
trimmed := strings.TrimSpace(string(*role))
if trimmed == "" {
return nil
}
if parsed, ok := pmodel.Parse(trimmed); ok {
if parsed == "" {
return nil
}
normalized := parsed
return &normalized
}
normalized := pmodel.AccountRole(strings.ToLower(trimmed))
return &normalized
}
// PaymentPlanTemplateFilter selects templates for lookup.
type PaymentPlanTemplateFilter struct {
FromRail Rail

Binary file not shown.