hold/release + discovery based routing

This commit is contained in:
Stephan D
2026-01-16 14:33:05 +01:00
parent fe0caaa611
commit a4333a5385
44 changed files with 2086 additions and 348 deletions

View File

@@ -147,6 +147,14 @@ func (g *chainRailGateway) Observe(ctx context.Context, referenceID string) (rai
}, nil }, nil
} }
func (g *chainRailGateway) Block(ctx context.Context, req rail.BlockRequest) (rail.RailResult, error) {
return rail.RailResult{}, merrors.NotImplemented("chain gateway: block not supported")
}
func (g *chainRailGateway) Release(ctx context.Context, req rail.ReleaseRequest) (rail.RailResult, error) {
return rail.RailResult{}, merrors.NotImplemented("chain gateway: release not supported")
}
func (g *chainRailGateway) resolveDestination(ctx context.Context, destRef, memo string) (*chainv1.TransferDestination, error) { func (g *chainRailGateway) resolveDestination(ctx context.Context, destRef, memo string) (*chainv1.TransferDestination, error) {
managed, err := g.isManagedWallet(ctx, destRef) managed, err := g.isManagedWallet(ctx, destRef)
if err != nil { if err != nil {

View File

@@ -0,0 +1,90 @@
package client
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"google.golang.org/grpc"
)
type stubConnector struct {
submitFn func(ctx context.Context, req *connectorv1.SubmitOperationRequest) (*connectorv1.SubmitOperationResponse, error)
}
func (s *stubConnector) OpenAccount(context.Context, *connectorv1.OpenAccountRequest, ...grpc.CallOption) (*connectorv1.OpenAccountResponse, error) {
return nil, nil
}
func (s *stubConnector) GetAccount(context.Context, *connectorv1.GetAccountRequest, ...grpc.CallOption) (*connectorv1.GetAccountResponse, error) {
return nil, nil
}
func (s *stubConnector) ListAccounts(context.Context, *connectorv1.ListAccountsRequest, ...grpc.CallOption) (*connectorv1.ListAccountsResponse, error) {
return nil, nil
}
func (s *stubConnector) GetBalance(context.Context, *connectorv1.GetBalanceRequest, ...grpc.CallOption) (*connectorv1.GetBalanceResponse, error) {
return nil, nil
}
func (s *stubConnector) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOperationRequest, _ ...grpc.CallOption) (*connectorv1.SubmitOperationResponse, error) {
if s.submitFn != nil {
return s.submitFn(ctx, req)
}
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{OperationId: "op-1"}}, nil
}
func (s *stubConnector) GetOperation(context.Context, *connectorv1.GetOperationRequest, ...grpc.CallOption) (*connectorv1.GetOperationResponse, error) {
return nil, nil
}
func (s *stubConnector) ListOperations(context.Context, *connectorv1.ListOperationsRequest, ...grpc.CallOption) (*connectorv1.ListOperationsResponse, error) {
return nil, nil
}
func TestTransferInternal_SubmitsTransferOperation(t *testing.T) {
ctx := context.Background()
var captured *connectorv1.Operation
stub := &stubConnector{
submitFn: func(ctx context.Context, req *connectorv1.SubmitOperationRequest) (*connectorv1.SubmitOperationResponse, error) {
captured = req.GetOperation()
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{OperationId: "op-1"}}, nil
},
}
client := NewWithClient(Config{}, stub)
resp, err := client.TransferInternal(ctx, &ledgerv1.TransferRequest{
IdempotencyKey: "id-1",
OrganizationRef: "org-1",
FromLedgerAccountRef: "acct-from",
ToLedgerAccountRef: "acct-to",
Money: &moneyv1.Money{Currency: "USD", Amount: "10"},
Description: "hold",
})
require.NoError(t, err)
require.NotNil(t, resp)
require.NotNil(t, captured)
assert.Equal(t, connectorv1.OperationType_TRANSFER, captured.GetType())
assert.Equal(t, "id-1", captured.GetIdempotencyKey())
assert.Equal(t, "acct-from", captured.GetFrom().GetAccount().GetAccountId())
assert.Equal(t, "acct-to", captured.GetTo().GetAccount().GetAccountId())
assert.Equal(t, ledgerConnectorID, captured.GetFrom().GetAccount().GetConnectorId())
assert.Equal(t, ledgerConnectorID, captured.GetTo().GetAccount().GetConnectorId())
assert.Equal(t, "USD", captured.GetMoney().GetCurrency())
assert.Equal(t, "10", captured.GetMoney().GetAmount())
params := captured.GetParams().AsMap()
assert.Equal(t, "org-1", params["organization_ref"])
assert.Equal(t, "hold", params["description"])
assert.Equal(t, "op-1", resp.GetJournalEntryRef())
assert.Equal(t, ledgerv1.EntryType_ENTRY_TRANSFER, resp.GetEntryType())
}

View File

@@ -34,42 +34,7 @@ messaging:
reconnect_wait: 5 reconnect_wait: 5
buffer_size: 1024 buffer_size: 1024
fees: # Service endpoints are sourced from discovery; no static overrides.
address: "sendico_billing_fees:50060"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
ledger:
address: "sendico_ledger:50052"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
gateway:
address: "sendico_chain_gateway:50070"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
payment_gateway:
address: "sendico_tgsettle_gateway:50080"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
mntx:
address: "sendico_mntx_gateway:50075"
dial_timeout_seconds: 5
call_timeout_seconds: 15
insecure: true
oracle:
address: "sendico_fx_oracle:50051"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
card_gateways: card_gateways:
monetix: monetix:
funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF" funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF"
@@ -78,14 +43,4 @@ card_gateways:
fee_ledger_accounts: fee_ledger_accounts:
monetix: "ledger:fees:monetix" monetix: "ledger:fees:monetix"
# gateway_instances: # Gateway instances and capabilities are sourced from service discovery.
# - id: "crypto-tron"
# rail: "CRYPTO"
# network: "TRON"
# currencies: ["USDT"]
# capabilities:
# can_pay_out: true
# can_send_fee: true
# limits:
# min_amount: "0"
# max_amount: "100000"

View File

@@ -48,10 +48,19 @@ func buildFeeLedgerAccounts(src map[string]string) map[string]string {
} }
func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, registry *discovery.Registry) orchestrator.GatewayRegistry { func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, registry *discovery.Registry) orchestrator.GatewayRegistry {
static := buildGatewayInstances(logger, src) if logger != nil {
staticRegistry := orchestrator.NewGatewayRegistry(logger, static) logger = logger.Named("gateway_registry")
discoveryRegistry := orchestrator.NewDiscoveryGatewayRegistry(logger, registry) }
return orchestrator.NewCompositeGatewayRegistry(logger, staticRegistry, discoveryRegistry) if len(src) > 0 && logger != nil {
logger.Warn("Static gateway configuration ignored; using discovery registry only")
}
if registry == nil {
if logger != nil {
logger.Warn("Discovery registry unavailable; gateway routing disabled")
}
return nil
}
return orchestrator.NewDiscoveryGatewayRegistry(logger, registry)
} }
func buildRailGateways(chainClient chainclient.Client, paymentGatewayClient chainclient.Client, src []gatewayInstanceConfig) map[string]rail.RailGateway { func buildRailGateways(chainClient chainclient.Client, paymentGatewayClient chainclient.Client, src []gatewayInstanceConfig) map[string]rail.RailGateway {
@@ -76,6 +85,8 @@ func buildRailGateways(chainClient chainclient.Client, paymentGatewayClient chai
CanReadBalance: inst.Capabilities.CanReadBalance, CanReadBalance: inst.Capabilities.CanReadBalance,
CanSendFee: inst.Capabilities.CanSendFee, CanSendFee: inst.Capabilities.CanSendFee,
RequiresObserveConfirm: inst.Capabilities.RequiresObserveConfirm, RequiresObserveConfirm: inst.Capabilities.RequiresObserveConfirm,
CanBlock: inst.Capabilities.CanBlock,
CanRelease: inst.Capabilities.CanRelease,
}, },
} }
switch inst.Rail { switch inst.Rail {
@@ -135,6 +146,8 @@ func buildGatewayInstances(logger mlogger.Logger, src []gatewayInstanceConfig) [
CanReadBalance: cfg.Capabilities.CanReadBalance, CanReadBalance: cfg.Capabilities.CanReadBalance,
CanSendFee: cfg.Capabilities.CanSendFee, CanSendFee: cfg.Capabilities.CanSendFee,
RequiresObserveConfirm: cfg.Capabilities.RequiresObserveConfirm, RequiresObserveConfirm: cfg.Capabilities.RequiresObserveConfirm,
CanBlock: cfg.Capabilities.CanBlock,
CanRelease: cfg.Capabilities.CanRelease,
}, },
Limits: buildGatewayLimits(cfg.Limits), Limits: buildGatewayLimits(cfg.Limits),
Version: strings.TrimSpace(cfg.Version), Version: strings.TrimSpace(cfg.Version),

View File

@@ -155,22 +155,8 @@ func (i *Imp) initOracleClient(cfg clientConfig) oracleclient.Client {
} }
func (i *Imp) closeClients() { func (i *Imp) closeClients() {
if i.ledgerClient != nil { if i.discoveryClients != nil {
_ = i.ledgerClient.Close() i.discoveryClients.Close()
} i.discoveryClients = nil
if i.gatewayClient != nil {
_ = i.gatewayClient.Close()
}
if i.paymentGatewayClient != nil {
_ = i.paymentGatewayClient.Close()
}
if i.mntxClient != nil {
_ = i.mntxClient.Close()
}
if i.oracleClient != nil {
_ = i.oracleClient.Close()
}
if i.feesConn != nil {
_ = i.feesConn.Close()
} }
} }

View File

@@ -54,6 +54,8 @@ type gatewayCapabilitiesConfig struct {
CanReadBalance bool `yaml:"can_read_balance"` CanReadBalance bool `yaml:"can_read_balance"`
CanSendFee bool `yaml:"can_send_fee"` CanSendFee bool `yaml:"can_send_fee"`
RequiresObserveConfirm bool `yaml:"requires_observe_confirm"` RequiresObserveConfirm bool `yaml:"requires_observe_confirm"`
CanBlock bool `yaml:"can_block"`
CanRelease bool `yaml:"can_release"`
} }
type limitsConfig struct { type limitsConfig struct {

View File

@@ -2,7 +2,6 @@ package serverimp
import ( import (
oracleclient "github.com/tech/sendico/fx/oracle/client" oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client" mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client" ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
@@ -10,47 +9,28 @@ import (
) )
type orchestratorDeps struct { type orchestratorDeps struct {
feesClient feesv1.FeeEngineClient feesClient feesv1.FeeEngineClient
ledgerClient ledgerclient.Client ledgerClient ledgerclient.Client
gatewayClient chainclient.Client mntxClient mntxclient.Client
paymentGatewayClient chainclient.Client oracleClient oracleclient.Client
mntxClient mntxclient.Client gatewayInvokeResolver orchestrator.GatewayInvokeResolver
oracleClient oracleclient.Client
} }
func (i *Imp) initDependencies(cfg *config) *orchestratorDeps { func (i *Imp) initDependencies(cfg *config) *orchestratorDeps {
deps := &orchestratorDeps{} deps := &orchestratorDeps{}
if cfg == nil { if i.discoveryReg == nil {
if i.logger != nil {
i.logger.Warn("Discovery registry unavailable; downstream clients disabled")
}
return deps return deps
} }
deps.feesClient, i.feesConn = i.initFeesClient(cfg.Fees) i.discoveryClients = newDiscoveryClientResolver(i.logger, i.discoveryReg)
deps.feesClient = &discoveryFeeClient{resolver: i.discoveryClients}
deps.ledgerClient = i.initLedgerClient(cfg.Ledger) deps.ledgerClient = &discoveryLedgerClient{resolver: i.discoveryClients}
if deps.ledgerClient != nil { deps.oracleClient = &discoveryOracleClient{resolver: i.discoveryClients}
i.ledgerClient = deps.ledgerClient deps.mntxClient = &discoveryMntxClient{resolver: i.discoveryClients}
} deps.gatewayInvokeResolver = discoveryGatewayInvokeResolver{resolver: i.discoveryClients}
deps.gatewayClient = i.initGatewayClient(cfg.Gateway)
if deps.gatewayClient != nil {
i.gatewayClient = deps.gatewayClient
}
deps.paymentGatewayClient = i.initPaymentGatewayClient(cfg.PaymentGateway)
if deps.paymentGatewayClient != nil {
i.paymentGatewayClient = deps.paymentGatewayClient
}
deps.mntxClient = i.initMntxClient(cfg.Mntx)
if deps.mntxClient != nil {
i.mntxClient = deps.mntxClient
}
deps.oracleClient = i.initOracleClient(cfg.Oracle)
if deps.oracleClient != nil {
i.oracleClient = deps.oracleClient
}
return deps return deps
} }
@@ -65,21 +45,15 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest
if deps.ledgerClient != nil { if deps.ledgerClient != nil {
opts = append(opts, orchestrator.WithLedgerClient(deps.ledgerClient)) opts = append(opts, orchestrator.WithLedgerClient(deps.ledgerClient))
} }
if deps.gatewayClient != nil {
opts = append(opts, orchestrator.WithChainGatewayClient(deps.gatewayClient))
}
if deps.paymentGatewayClient != nil {
opts = append(opts, orchestrator.WithProviderSettlementGatewayClient(deps.paymentGatewayClient))
}
if railGateways := buildRailGateways(deps.gatewayClient, deps.paymentGatewayClient, cfg.GatewayInstances); len(railGateways) > 0 {
opts = append(opts, orchestrator.WithRailGateways(railGateways))
}
if deps.mntxClient != nil { if deps.mntxClient != nil {
opts = append(opts, orchestrator.WithMntxGateway(deps.mntxClient)) opts = append(opts, orchestrator.WithMntxGateway(deps.mntxClient))
} }
if deps.oracleClient != nil { if deps.oracleClient != nil {
opts = append(opts, orchestrator.WithOracleClient(deps.oracleClient)) opts = append(opts, orchestrator.WithOracleClient(deps.oracleClient))
} }
if deps.gatewayInvokeResolver != nil {
opts = append(opts, orchestrator.WithGatewayInvokeResolver(deps.gatewayInvokeResolver))
}
if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 { if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 {
opts = append(opts, orchestrator.WithCardGatewayRoutes(routes)) opts = append(opts, orchestrator.WithCardGatewayRoutes(routes))
} }

View File

@@ -0,0 +1,477 @@
package serverimp
import (
"context"
"fmt"
"net"
"net/url"
"sort"
"strings"
"sync"
"time"
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
const discoveryLogThrottle = 30 * time.Second
var (
feesServiceNames = []string{"BILLING_FEES", string(mservice.FeePlans)}
ledgerServiceNames = []string{"LEDGER", string(mservice.Ledger)}
oracleServiceNames = []string{"FX_ORACLE", string(mservice.FXOracle)}
mntxServiceNames = []string{"CARD_PAYOUT_RAIL_GATEWAY", string(mservice.MntxGateway)}
)
type discoveryEndpoint struct {
address string
insecure bool
raw string
}
func (e discoveryEndpoint) key() string {
return fmt.Sprintf("%s|%t", e.address, e.insecure)
}
type discoveryClientResolver struct {
logger mlogger.Logger
registry *discovery.Registry
mu sync.Mutex
feesConn *grpc.ClientConn
feesEndpoint discoveryEndpoint
ledgerClient ledgerclient.Client
ledgerEndpoint discoveryEndpoint
oracleClient oracleclient.Client
oracleEndpoint discoveryEndpoint
mntxClient mntxclient.Client
mntxEndpoint discoveryEndpoint
chainClients map[string]chainclient.Client
lastSelection map[string]string
lastMissing map[string]time.Time
}
func newDiscoveryClientResolver(logger mlogger.Logger, registry *discovery.Registry) *discoveryClientResolver {
if logger != nil {
logger = logger.Named("discovery_clients")
}
return &discoveryClientResolver{
logger: logger,
registry: registry,
chainClients: map[string]chainclient.Client{},
lastSelection: map[string]string{},
lastMissing: map[string]time.Time{},
}
}
func (r *discoveryClientResolver) Close() {
r.mu.Lock()
defer r.mu.Unlock()
if r.feesConn != nil {
_ = r.feesConn.Close()
r.feesConn = nil
}
if r.ledgerClient != nil {
_ = r.ledgerClient.Close()
r.ledgerClient = nil
}
if r.oracleClient != nil {
_ = r.oracleClient.Close()
r.oracleClient = nil
}
if r.mntxClient != nil {
_ = r.mntxClient.Close()
r.mntxClient = nil
}
for key, client := range r.chainClients {
if client != nil {
_ = client.Close()
}
delete(r.chainClients, key)
}
}
func (r *discoveryClientResolver) FeesAvailable() bool {
_, ok := r.findEntry("fees", feesServiceNames, "", "")
return ok
}
func (r *discoveryClientResolver) LedgerAvailable() bool {
_, ok := r.findEntry("ledger", ledgerServiceNames, "", "")
return ok
}
func (r *discoveryClientResolver) OracleAvailable() bool {
_, ok := r.findEntry("oracle", oracleServiceNames, "", "")
return ok
}
func (r *discoveryClientResolver) MntxAvailable() bool {
_, ok := r.findEntry("mntx", mntxServiceNames, "", "")
return ok
}
func (r *discoveryClientResolver) FeesClient(ctx context.Context) (feesv1.FeeEngineClient, error) {
entry, ok := r.findEntry("fees", feesServiceNames, "", "")
if !ok {
return nil, merrors.NoData("discovery: fees service unavailable")
}
endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI)
if err != nil {
r.logMissing("fees", "invalid fees invoke uri", entry.InvokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if r.feesConn == nil || r.feesEndpoint.key() != endpoint.key() || r.feesEndpoint.address != endpoint.address {
if r.feesConn != nil {
_ = r.feesConn.Close()
r.feesConn = nil
}
conn, dialErr := dialGrpc(ctx, endpoint)
if dialErr != nil {
r.logMissing("fees", "failed to dial fees service", endpoint.raw, dialErr)
return nil, dialErr
}
r.feesConn = conn
r.feesEndpoint = endpoint
}
return feesv1.NewFeeEngineClient(r.feesConn), nil
}
func (r *discoveryClientResolver) LedgerClient(ctx context.Context) (ledgerclient.Client, error) {
entry, ok := r.findEntry("ledger", ledgerServiceNames, "", "")
if !ok {
return nil, merrors.NoData("discovery: ledger service unavailable")
}
endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI)
if err != nil {
r.logMissing("ledger", "invalid ledger invoke uri", entry.InvokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if r.ledgerClient == nil || r.ledgerEndpoint.key() != endpoint.key() || r.ledgerEndpoint.address != endpoint.address {
if r.ledgerClient != nil {
_ = r.ledgerClient.Close()
r.ledgerClient = nil
}
client, dialErr := ledgerclient.New(ctx, ledgerclient.Config{
Address: endpoint.address,
Insecure: endpoint.insecure,
})
if dialErr != nil {
r.logMissing("ledger", "failed to dial ledger service", endpoint.raw, dialErr)
return nil, dialErr
}
r.ledgerClient = client
r.ledgerEndpoint = endpoint
}
return r.ledgerClient, nil
}
func (r *discoveryClientResolver) OracleClient(ctx context.Context) (oracleclient.Client, error) {
entry, ok := r.findEntry("oracle", oracleServiceNames, "", "")
if !ok {
return nil, merrors.NoData("discovery: oracle service unavailable")
}
endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI)
if err != nil {
r.logMissing("oracle", "invalid oracle invoke uri", entry.InvokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if r.oracleClient == nil || r.oracleEndpoint.key() != endpoint.key() || r.oracleEndpoint.address != endpoint.address {
if r.oracleClient != nil {
_ = r.oracleClient.Close()
r.oracleClient = nil
}
client, dialErr := oracleclient.New(ctx, oracleclient.Config{
Address: endpoint.address,
Insecure: endpoint.insecure,
})
if dialErr != nil {
r.logMissing("oracle", "failed to dial oracle service", endpoint.raw, dialErr)
return nil, dialErr
}
r.oracleClient = client
r.oracleEndpoint = endpoint
}
return r.oracleClient, nil
}
func (r *discoveryClientResolver) MntxClient(ctx context.Context) (mntxclient.Client, error) {
entry, ok := r.findEntry("mntx", mntxServiceNames, "", "")
if !ok {
return nil, merrors.NoData("discovery: mntx service unavailable")
}
endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI)
if err != nil {
r.logMissing("mntx", "invalid mntx invoke uri", entry.InvokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if r.mntxClient == nil || r.mntxEndpoint.key() != endpoint.key() || r.mntxEndpoint.address != endpoint.address {
if r.mntxClient != nil {
_ = r.mntxClient.Close()
r.mntxClient = nil
}
if !endpoint.insecure && r.logger != nil {
r.logger.Warn("Mntx gateway does not support TLS, falling back to insecure transport",
zap.String("invoke_uri", endpoint.raw))
}
client, dialErr := mntxclient.New(ctx, mntxclient.Config{
Address: endpoint.address,
})
if dialErr != nil {
r.logMissing("mntx", "failed to dial mntx service", endpoint.raw, dialErr)
return nil, dialErr
}
r.mntxClient = client
r.mntxEndpoint = endpoint
}
return r.mntxClient, nil
}
func (r *discoveryClientResolver) ChainClient(ctx context.Context, invokeURI string) (chainclient.Client, error) {
endpoint, err := parseDiscoveryEndpoint(invokeURI)
if err != nil {
r.logMissing("chain", "invalid chain gateway invoke uri", invokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if client, ok := r.chainClients[endpoint.key()]; ok && client != nil {
return client, nil
}
client, dialErr := chainclient.New(ctx, chainclient.Config{
Address: endpoint.address,
Insecure: endpoint.insecure,
})
if dialErr != nil {
r.logMissing("chain", "failed to dial chain gateway", endpoint.raw, dialErr)
return nil, dialErr
}
r.chainClients[endpoint.key()] = client
return client, nil
}
func (r *discoveryClientResolver) PaymentGatewayClient(ctx context.Context, invokeURI string) (chainclient.Client, error) {
endpoint, err := parseDiscoveryEndpoint(invokeURI)
if err != nil {
r.logMissing("payment_gateway", "invalid payment gateway invoke uri", invokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if client, ok := r.chainClients[endpoint.key()]; ok && client != nil {
return client, nil
}
client, dialErr := chainclient.New(ctx, chainclient.Config{
Address: endpoint.address,
Insecure: endpoint.insecure,
})
if dialErr != nil {
r.logMissing("payment_gateway", "failed to dial payment gateway", endpoint.raw, dialErr)
return nil, dialErr
}
r.chainClients[endpoint.key()] = client
return client, nil
}
func (r *discoveryClientResolver) findEntry(key string, services []string, rail string, network string) (*discovery.RegistryEntry, bool) {
if r == nil || r.registry == nil {
r.logMissing(key, "discovery registry unavailable", "", nil)
return nil, false
}
entries := r.registry.List(time.Now(), true)
matches := make([]discovery.RegistryEntry, 0)
for _, entry := range entries {
if !matchesService(entry.Service, services) {
continue
}
if rail != "" && !strings.EqualFold(strings.TrimSpace(entry.Rail), rail) {
continue
}
if network != "" && !strings.EqualFold(strings.TrimSpace(entry.Network), network) {
continue
}
matches = append(matches, entry)
}
if len(matches) == 0 {
r.logMissing(key, "discovery entry missing", "", nil)
return nil, false
}
sort.Slice(matches, func(i, j int) bool {
if matches[i].RoutingPriority != matches[j].RoutingPriority {
return matches[i].RoutingPriority > matches[j].RoutingPriority
}
if matches[i].ID != matches[j].ID {
return matches[i].ID < matches[j].ID
}
return matches[i].InstanceID < matches[j].InstanceID
})
entry := matches[0]
entryKey := discoveryEntryKey(entry)
r.logSelection(key, entryKey, entry)
return &entry, true
}
func (r *discoveryClientResolver) logSelection(key, entryKey string, entry discovery.RegistryEntry) {
if r.logger == nil {
return
}
r.mu.Lock()
last := r.lastSelection[key]
if last == entryKey {
r.mu.Unlock()
return
}
r.lastSelection[key] = entryKey
r.mu.Unlock()
r.logger.Info("Discovery endpoint selected",
zap.String("service_key", key),
zap.String("service", entry.Service),
zap.String("rail", entry.Rail),
zap.String("network", entry.Network),
zap.String("entry_id", entry.ID),
zap.String("instance_id", entry.InstanceID),
zap.String("invoke_uri", entry.InvokeURI))
}
func (r *discoveryClientResolver) logMissing(key, message, invokeURI string, err error) {
if r.logger == nil {
return
}
now := time.Now()
r.mu.Lock()
last := r.lastMissing[key]
if !last.IsZero() && now.Sub(last) < discoveryLogThrottle {
r.mu.Unlock()
return
}
r.lastMissing[key] = now
r.mu.Unlock()
fields := []zap.Field{zap.String("service_key", key)}
if invokeURI != "" {
fields = append(fields, zap.String("invoke_uri", strings.TrimSpace(invokeURI)))
}
if err != nil {
fields = append(fields, zap.Error(err))
}
r.logger.Warn(message, fields...)
}
func discoveryEntryKey(entry discovery.RegistryEntry) string {
return fmt.Sprintf("%s|%s|%s|%s|%s|%s",
strings.TrimSpace(entry.Service),
strings.TrimSpace(entry.ID),
strings.TrimSpace(entry.InstanceID),
strings.TrimSpace(entry.Rail),
strings.TrimSpace(entry.Network),
strings.TrimSpace(entry.InvokeURI))
}
func matchesService(service string, candidates []string) bool {
service = strings.TrimSpace(service)
if service == "" || len(candidates) == 0 {
return false
}
for _, candidate := range candidates {
if strings.EqualFold(service, strings.TrimSpace(candidate)) {
return true
}
}
return false
}
func parseDiscoveryEndpoint(raw string) (discoveryEndpoint, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri is required")
}
parsed, err := url.Parse(raw)
if err != nil || parsed.Scheme == "" {
if _, _, splitErr := net.SplitHostPort(raw); splitErr != nil {
if err != nil {
return discoveryEndpoint{}, err
}
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
}
return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil
}
scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme))
switch scheme {
case "grpc":
address := strings.TrimSpace(parsed.Host)
if _, _, splitErr := net.SplitHostPort(address); splitErr != nil {
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
}
return discoveryEndpoint{address: address, insecure: true, raw: raw}, nil
case "grpcs":
address := strings.TrimSpace(parsed.Host)
if _, _, splitErr := net.SplitHostPort(address); splitErr != nil {
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
}
return discoveryEndpoint{address: address, insecure: false, raw: raw}, nil
case "dns", "passthrough":
return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil
default:
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: unsupported invoke uri scheme")
}
}
func dialGrpc(ctx context.Context, endpoint discoveryEndpoint) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{}
if endpoint.insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(nil)))
}
if ctx == nil {
ctx = context.Background()
}
return grpc.DialContext(ctx, endpoint.address, dialOpts...)
}

View File

@@ -0,0 +1,19 @@
package serverimp
import (
"context"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/pkg/merrors"
)
type discoveryGatewayInvokeResolver struct {
resolver *discoveryClientResolver
}
func (r discoveryGatewayInvokeResolver) Resolve(ctx context.Context, invokeURI string) (chainclient.Client, error) {
if r.resolver == nil {
return nil, merrors.Internal("discovery gateway resolver unavailable")
}
return r.resolver.ChainClient(ctx, invokeURI)
}

View File

@@ -0,0 +1,370 @@
package serverimp
import (
"context"
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/payments/rail"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"google.golang.org/grpc"
)
type discoveryFeeClient struct {
resolver *discoveryClientResolver
}
func (c *discoveryFeeClient) Available() bool {
if c == nil || c.resolver == nil {
return false
}
return c.resolver.FeesAvailable()
}
func (c *discoveryFeeClient) QuoteFees(ctx context.Context, req *feesv1.QuoteFeesRequest, opts ...grpc.CallOption) (*feesv1.QuoteFeesResponse, error) {
client, err := c.resolver.FeesClient(ctx)
if err != nil {
return nil, err
}
return client.QuoteFees(ctx, req, opts...)
}
func (c *discoveryFeeClient) PrecomputeFees(ctx context.Context, req *feesv1.PrecomputeFeesRequest, opts ...grpc.CallOption) (*feesv1.PrecomputeFeesResponse, error) {
client, err := c.resolver.FeesClient(ctx)
if err != nil {
return nil, err
}
return client.PrecomputeFees(ctx, req, opts...)
}
func (c *discoveryFeeClient) ValidateFeeToken(ctx context.Context, req *feesv1.ValidateFeeTokenRequest, opts ...grpc.CallOption) (*feesv1.ValidateFeeTokenResponse, error) {
client, err := c.resolver.FeesClient(ctx)
if err != nil {
return nil, err
}
return client.ValidateFeeToken(ctx, req, opts...)
}
type discoveryLedgerClient struct {
resolver *discoveryClientResolver
}
func (c *discoveryLedgerClient) Available() bool {
if c == nil || c.resolver == nil {
return false
}
return c.resolver.LedgerAvailable()
}
func (c *discoveryLedgerClient) ReadBalance(ctx context.Context, accountID string) (*moneyv1.Money, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.ReadBalance(ctx, accountID)
}
func (c *discoveryLedgerClient) CreateTransaction(ctx context.Context, tx rail.LedgerTx) (string, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return "", err
}
return client.CreateTransaction(ctx, tx)
}
func (c *discoveryLedgerClient) TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.TransferInternal(ctx, req)
}
func (c *discoveryLedgerClient) HoldBalance(ctx context.Context, accountID string, amount string) error {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return err
}
return client.HoldBalance(ctx, accountID, amount)
}
func (c *discoveryLedgerClient) CreateAccount(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.CreateAccount(ctx, req)
}
func (c *discoveryLedgerClient) ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.ListAccounts(ctx, req)
}
func (c *discoveryLedgerClient) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.PostCreditWithCharges(ctx, req)
}
func (c *discoveryLedgerClient) PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.PostDebitWithCharges(ctx, req)
}
func (c *discoveryLedgerClient) ApplyFXWithCharges(ctx context.Context, req *ledgerv1.FXRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.ApplyFXWithCharges(ctx, req)
}
func (c *discoveryLedgerClient) GetBalance(ctx context.Context, req *ledgerv1.GetBalanceRequest) (*ledgerv1.BalanceResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.GetBalance(ctx, req)
}
func (c *discoveryLedgerClient) GetJournalEntry(ctx context.Context, req *ledgerv1.GetEntryRequest) (*ledgerv1.JournalEntryResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.GetJournalEntry(ctx, req)
}
func (c *discoveryLedgerClient) GetStatement(ctx context.Context, req *ledgerv1.GetStatementRequest) (*ledgerv1.StatementResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.GetStatement(ctx, req)
}
func (c *discoveryLedgerClient) Close() error {
if c == nil || c.resolver == nil {
return nil
}
client, err := c.resolver.LedgerClient(context.Background())
if err != nil {
return nil
}
return client.Close()
}
type discoveryOracleClient struct {
resolver *discoveryClientResolver
}
func (c *discoveryOracleClient) Available() bool {
if c == nil || c.resolver == nil {
return false
}
return c.resolver.OracleAvailable()
}
func (c *discoveryOracleClient) LatestRate(ctx context.Context, req oracleclient.LatestRateParams) (*oracleclient.RateSnapshot, error) {
client, err := c.resolver.OracleClient(ctx)
if err != nil {
return nil, err
}
return client.LatestRate(ctx, req)
}
func (c *discoveryOracleClient) GetQuote(ctx context.Context, req oracleclient.GetQuoteParams) (*oracleclient.Quote, error) {
client, err := c.resolver.OracleClient(ctx)
if err != nil {
return nil, err
}
return client.GetQuote(ctx, req)
}
func (c *discoveryOracleClient) Close() error {
if c == nil || c.resolver == nil {
return nil
}
client, err := c.resolver.OracleClient(context.Background())
if err != nil {
return nil
}
return client.Close()
}
type discoveryMntxClient struct {
resolver *discoveryClientResolver
}
func (c *discoveryMntxClient) Available() bool {
if c == nil || c.resolver == nil {
return false
}
return c.resolver.MntxAvailable()
}
func (c *discoveryMntxClient) CreateCardPayout(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
client, err := c.resolver.MntxClient(ctx)
if err != nil {
return nil, err
}
return client.CreateCardPayout(ctx, req)
}
func (c *discoveryMntxClient) CreateCardTokenPayout(ctx context.Context, req *mntxv1.CardTokenPayoutRequest) (*mntxv1.CardTokenPayoutResponse, error) {
client, err := c.resolver.MntxClient(ctx)
if err != nil {
return nil, err
}
return client.CreateCardTokenPayout(ctx, req)
}
func (c *discoveryMntxClient) GetCardPayoutStatus(ctx context.Context, req *mntxv1.GetCardPayoutStatusRequest) (*mntxv1.GetCardPayoutStatusResponse, error) {
client, err := c.resolver.MntxClient(ctx)
if err != nil {
return nil, err
}
return client.GetCardPayoutStatus(ctx, req)
}
func (c *discoveryMntxClient) ListGatewayInstances(ctx context.Context, req *mntxv1.ListGatewayInstancesRequest) (*mntxv1.ListGatewayInstancesResponse, error) {
client, err := c.resolver.MntxClient(ctx)
if err != nil {
return nil, err
}
return client.ListGatewayInstances(ctx, req)
}
func (c *discoveryMntxClient) Close() error {
if c == nil || c.resolver == nil {
return nil
}
client, err := c.resolver.MntxClient(context.Background())
if err != nil {
return nil
}
return client.Close()
}
type discoveryChainClient struct {
resolver *discoveryClientResolver
invokeURI string
}
func (c *discoveryChainClient) Available() bool {
return c != nil && c.resolver != nil && c.invokeURI != ""
}
func (c *discoveryChainClient) client(ctx context.Context) (chainclient.Client, error) {
if c == nil || c.resolver == nil {
return nil, merrors.Internal("discovery chain client unavailable")
}
return c.resolver.ChainClient(ctx, c.invokeURI)
}
func (c *discoveryChainClient) CreateManagedWallet(ctx context.Context, req *chainv1.CreateManagedWalletRequest) (*chainv1.CreateManagedWalletResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.CreateManagedWallet(ctx, req)
}
func (c *discoveryChainClient) GetManagedWallet(ctx context.Context, req *chainv1.GetManagedWalletRequest) (*chainv1.GetManagedWalletResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.GetManagedWallet(ctx, req)
}
func (c *discoveryChainClient) ListManagedWallets(ctx context.Context, req *chainv1.ListManagedWalletsRequest) (*chainv1.ListManagedWalletsResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.ListManagedWallets(ctx, req)
}
func (c *discoveryChainClient) GetWalletBalance(ctx context.Context, req *chainv1.GetWalletBalanceRequest) (*chainv1.GetWalletBalanceResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.GetWalletBalance(ctx, req)
}
func (c *discoveryChainClient) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.SubmitTransfer(ctx, req)
}
func (c *discoveryChainClient) GetTransfer(ctx context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.GetTransfer(ctx, req)
}
func (c *discoveryChainClient) ListTransfers(ctx context.Context, req *chainv1.ListTransfersRequest) (*chainv1.ListTransfersResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.ListTransfers(ctx, req)
}
func (c *discoveryChainClient) EstimateTransferFee(ctx context.Context, req *chainv1.EstimateTransferFeeRequest) (*chainv1.EstimateTransferFeeResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.EstimateTransferFee(ctx, req)
}
func (c *discoveryChainClient) ComputeGasTopUp(ctx context.Context, req *chainv1.ComputeGasTopUpRequest) (*chainv1.ComputeGasTopUpResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.ComputeGasTopUp(ctx, req)
}
func (c *discoveryChainClient) EnsureGasTopUp(ctx context.Context, req *chainv1.EnsureGasTopUpRequest) (*chainv1.EnsureGasTopUpResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.EnsureGasTopUp(ctx, req)
}
func (c *discoveryChainClient) Close() error {
if c == nil || c.resolver == nil {
return nil
}
client, err := c.resolver.ChainClient(context.Background(), c.invokeURI)
if err != nil {
return nil
}
return client.Close()
}

View File

@@ -1,16 +1,11 @@
package serverimp package serverimp
import ( import (
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
"github.com/tech/sendico/payments/orchestrator/storage" "github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/server/grpcapp" "github.com/tech/sendico/pkg/server/grpcapp"
"google.golang.org/grpc"
) )
type Imp struct { type Imp struct {
@@ -23,11 +18,6 @@ type Imp struct {
discoveryWatcher *discovery.RegistryWatcher discoveryWatcher *discovery.RegistryWatcher
discoveryReg *discovery.Registry discoveryReg *discovery.Registry
discoveryAnnouncer *discovery.Announcer discoveryAnnouncer *discovery.Announcer
discoveryClients *discoveryClientResolver
service *orchestrator.Service service *orchestrator.Service
feesConn *grpc.ClientConn
ledgerClient ledgerclient.Client
gatewayClient chainclient.Client
paymentGatewayClient chainclient.Client
mntxClient mntxclient.Client
oracleClient oracleclient.Client
} }

View File

@@ -5,6 +5,7 @@ import (
"strings" "strings"
"github.com/shopspring/decimal" "github.com/shopspring/decimal"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/payments/orchestrator/storage/model" "github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types" paymenttypes "github.com/tech/sendico/pkg/payments/types"
@@ -23,11 +24,6 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
if source == nil || strings.TrimSpace(source.ManagedWalletRef) == "" { if source == nil || strings.TrimSpace(source.ManagedWalletRef) == "" {
return merrors.InvalidArgument("card funding: source managed wallet is required") return merrors.InvalidArgument("card funding: source managed wallet is required")
} }
if !s.deps.gateway.available() {
s.logger.Warn("card funding aborted: chain gateway unavailable")
return merrors.InvalidArgument("card funding: chain gateway unavailable")
}
route, err := s.cardRoute(defaultCardGateway) route, err := s.cardRoute(defaultCardGateway)
if err != nil { if err != nil {
return err return err
@@ -67,10 +63,22 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
feeRequired := feeDecimal.IsPositive() feeRequired := feeDecimal.IsPositive()
feeAmountProto := protoMoney(feeAmount) feeAmountProto := protoMoney(feeAmount)
network := networkFromEndpoint(intent.Source)
instanceID := strings.TrimSpace(intent.Source.InstanceID)
actions := []model.RailOperation{model.RailOperationSend}
if feeRequired {
actions = append(actions, model.RailOperationFee)
}
chainClient, _, err := s.resolveChainGatewayClient(ctx, network, intentAmount, actions, instanceID, payment.PaymentRef)
if err != nil {
s.logger.Warn("card funding gateway resolution failed", zap.Error(err), zap.String("payment_ref", payment.PaymentRef))
return err
}
fundingDest := &chainv1.TransferDestination{ fundingDest := &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ExternalAddress{ExternalAddress: fundingAddress}, Destination: &chainv1.TransferDestination_ExternalAddress{ExternalAddress: fundingAddress},
} }
fundingFee, err := s.estimateTransferNetworkFee(ctx, sourceWalletRef, fundingDest, intentAmountProto) fundingFee, err := s.estimateTransferNetworkFee(ctx, chainClient, sourceWalletRef, fundingDest, intentAmountProto)
if err != nil { if err != nil {
return err return err
} }
@@ -83,7 +91,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
feeDest := &chainv1.TransferDestination{ feeDest := &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: feeWalletRef}, Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: feeWalletRef},
} }
feeTransferFee, err = s.estimateTransferNetworkFee(ctx, sourceWalletRef, feeDest, feeAmountProto) feeTransferFee, err = s.estimateTransferNetworkFee(ctx, chainClient, sourceWalletRef, feeDest, feeAmountProto)
if err != nil { if err != nil {
return err return err
} }
@@ -103,7 +111,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
var topUpFee *moneyv1.Money var topUpFee *moneyv1.Money
topUpPositive := false topUpPositive := false
if estimatedTotalFee != nil { if estimatedTotalFee != nil {
computeResp, err := s.deps.gateway.client.ComputeGasTopUp(ctx, &chainv1.ComputeGasTopUpRequest{ computeResp, err := chainClient.ComputeGasTopUp(ctx, &chainv1.ComputeGasTopUpRequest{
WalletRef: sourceWalletRef, WalletRef: sourceWalletRef,
EstimatedTotalFee: estimatedTotalFee, EstimatedTotalFee: estimatedTotalFee,
}) })
@@ -131,7 +139,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
topUpDest := &chainv1.TransferDestination{ topUpDest := &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: sourceWalletRef}, Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: sourceWalletRef},
} }
topUpFee, err = s.estimateTransferNetworkFee(ctx, feeWalletRef, topUpDest, topUpMoney) topUpFee, err = s.estimateTransferNetworkFee(ctx, chainClient, feeWalletRef, topUpDest, topUpMoney)
if err != nil { if err != nil {
return err return err
} }
@@ -191,7 +199,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
} }
if topUpMoney != nil && topUpPositive { if topUpMoney != nil && topUpPositive {
ensureResp, gasErr := s.deps.gateway.client.EnsureGasTopUp(ctx, &chainv1.EnsureGasTopUpRequest{ ensureResp, gasErr := chainClient.EnsureGasTopUp(ctx, &chainv1.EnsureGasTopUpRequest{
IdempotencyKey: payment.IdempotencyKey + ":card:gas", IdempotencyKey: payment.IdempotencyKey + ":card:gas",
OrganizationRef: payment.OrganizationRef.Hex(), OrganizationRef: payment.OrganizationRef.Hex(),
SourceWalletRef: feeWalletRef, SourceWalletRef: feeWalletRef,
@@ -228,7 +236,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
topUpDest := &chainv1.TransferDestination{ topUpDest := &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: sourceWalletRef}, Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: sourceWalletRef},
} }
topUpFee, err = s.estimateTransferNetworkFee(ctx, feeWalletRef, topUpDest, actual) topUpFee, err = s.estimateTransferNetworkFee(ctx, chainClient, feeWalletRef, topUpDest, actual)
if err != nil { if err != nil {
return err return err
} }
@@ -247,7 +255,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
updateExecutionPlanTotalNetworkFee(plan) updateExecutionPlanTotalNetworkFee(plan)
} }
fundResp, err := s.deps.gateway.client.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{ fundResp, err := chainClient.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{
IdempotencyKey: payment.IdempotencyKey + ":card:fund", IdempotencyKey: payment.IdempotencyKey + ":card:fund",
OrganizationRef: payment.OrganizationRef.Hex(), OrganizationRef: payment.OrganizationRef.Hex(),
SourceWalletRef: sourceWalletRef, SourceWalletRef: sourceWalletRef,
@@ -267,7 +275,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
updateExecutionPlanTotalNetworkFee(plan) updateExecutionPlanTotalNetworkFee(plan)
if feeRequired { if feeRequired {
feeResp, err := s.deps.gateway.client.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{ feeResp, err := chainClient.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{
IdempotencyKey: payment.IdempotencyKey + ":card:fee", IdempotencyKey: payment.IdempotencyKey + ":card:fee",
OrganizationRef: payment.OrganizationRef.Hex(), OrganizationRef: payment.OrganizationRef.Hex(),
SourceWalletRef: sourceWalletRef, SourceWalletRef: sourceWalletRef,
@@ -293,8 +301,8 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
return nil return nil
} }
func (s *Service) estimateTransferNetworkFee(ctx context.Context, sourceWalletRef string, destination *chainv1.TransferDestination, amount *moneyv1.Money) (*moneyv1.Money, error) { func (s *Service) estimateTransferNetworkFee(ctx context.Context, client chainclient.Client, sourceWalletRef string, destination *chainv1.TransferDestination, amount *moneyv1.Money) (*moneyv1.Money, error) {
if !s.deps.gateway.available() { if client == nil {
return nil, merrors.InvalidArgument("chain gateway unavailable") return nil, merrors.InvalidArgument("chain gateway unavailable")
} }
sourceWalletRef = strings.TrimSpace(sourceWalletRef) sourceWalletRef = strings.TrimSpace(sourceWalletRef)
@@ -305,7 +313,7 @@ func (s *Service) estimateTransferNetworkFee(ctx context.Context, sourceWalletRe
return nil, merrors.InvalidArgument("amount is required") return nil, merrors.InvalidArgument("amount is required")
} }
resp, err := s.deps.gateway.client.EstimateTransferFee(ctx, &chainv1.EstimateTransferFeeRequest{ resp, err := client.EstimateTransferFee(ctx, &chainv1.EstimateTransferFeeRequest{
SourceWalletRef: sourceWalletRef, SourceWalletRef: sourceWalletRef,
Destination: destination, Destination: destination,
Amount: cloneProtoMoney(amount), Amount: cloneProtoMoney(amount),

View File

@@ -74,7 +74,7 @@ func TestSubmitCardFundingTransfers_PlansTopUpAndFunding(t *testing.T) {
svc := &Service{ svc := &Service{
logger: zap.NewNop(), logger: zap.NewNop(),
deps: serviceDependencies{ deps: serviceDependencies{
gateway: gatewayDependency{client: gateway}, gateway: gatewayDependency{resolver: staticChainGatewayResolver{client: gateway}},
cardRoutes: map[string]CardGatewayRoute{ cardRoutes: map[string]CardGatewayRoute{
defaultCardGateway: { defaultCardGateway: {
FundingAddress: fundingAddress, FundingAddress: fundingAddress,
@@ -241,7 +241,7 @@ func TestSubmitCardPayout_UsesSettlementAmount(t *testing.T) {
svc := &Service{ svc := &Service{
logger: zap.NewNop(), logger: zap.NewNop(),
deps: serviceDependencies{ deps: serviceDependencies{
gateway: gatewayDependency{client: gateway}, gateway: gatewayDependency{resolver: staticChainGatewayResolver{client: gateway}},
mntx: mntxDependency{client: mntx}, mntx: mntxDependency{client: mntx},
}, },
} }
@@ -326,7 +326,7 @@ func TestSubmitCardFundingTransfers_RequiresFeeWalletRef(t *testing.T) {
svc := &Service{ svc := &Service{
logger: zap.NewNop(), logger: zap.NewNop(),
deps: serviceDependencies{ deps: serviceDependencies{
gateway: gatewayDependency{client: gateway}, gateway: gatewayDependency{resolver: staticChainGatewayResolver{client: gateway}},
cardRoutes: map[string]CardGatewayRoute{ cardRoutes: map[string]CardGatewayRoute{
defaultCardGateway: { defaultCardGateway: {
FundingAddress: "0xfunding", FundingAddress: "0xfunding",

View File

@@ -475,6 +475,10 @@ func protoRailOperationFromModel(action model.RailOperation) gatewayv1.RailOpera
return gatewayv1.RailOperation_RAIL_OPERATION_OBSERVE_CONFIRM return gatewayv1.RailOperation_RAIL_OPERATION_OBSERVE_CONFIRM
case string(model.RailOperationFXConvert): case string(model.RailOperationFXConvert):
return gatewayv1.RailOperation_RAIL_OPERATION_FX_CONVERT return gatewayv1.RailOperation_RAIL_OPERATION_FX_CONVERT
case string(model.RailOperationBlock):
return gatewayv1.RailOperation_RAIL_OPERATION_BLOCK
case string(model.RailOperationRelease):
return gatewayv1.RailOperation_RAIL_OPERATION_RELEASE
default: default:
return gatewayv1.RailOperation_RAIL_OPERATION_UNSPECIFIED return gatewayv1.RailOperation_RAIL_OPERATION_UNSPECIFIED
} }

View File

@@ -48,6 +48,7 @@ func (r *discoveryGatewayRegistry) List(_ context.Context) ([]*model.GatewayInst
InstanceID: entry.InstanceID, InstanceID: entry.InstanceID,
Rail: rail, Rail: rail,
Network: entry.Network, Network: entry.Network,
InvokeURI: strings.TrimSpace(entry.InvokeURI),
Currencies: normalizeCurrencies(entry.Currencies), Currencies: normalizeCurrencies(entry.Currencies),
Capabilities: capabilitiesFromOps(entry.Operations), Capabilities: capabilitiesFromOps(entry.Operations),
Limits: limitsFromDiscovery(entry.Limits), Limits: limitsFromDiscovery(entry.Limits),
@@ -92,6 +93,10 @@ func capabilitiesFromOps(ops []string) model.RailCapabilities {
cap.CanSendFee = true cap.CanSendFee = true
case "observe.confirm", "observe.confirmation": case "observe.confirm", "observe.confirmation":
cap.RequiresObserveConfirm = true cap.RequiresObserveConfirm = true
case "block", "funds.block", "balance.block", "ledger.block":
cap.CanBlock = true
case "release", "funds.release", "balance.release", "ledger.release":
cap.CanRelease = true
} }
} }
return cap return cap

View File

@@ -60,10 +60,6 @@ func isSourceExecutionStep(step *model.ExecutionStep) bool {
return executionStepRole(step) == executionStepRoleSource return executionStepRole(step) == executionStepRoleSource
} }
func isConsumerExecutionStep(step *model.ExecutionStep) bool {
return executionStepRole(step) == executionStepRoleConsumer
}
func sourceStepsConfirmed(plan *model.ExecutionPlan) bool { func sourceStepsConfirmed(plan *model.ExecutionPlan) bool {
if plan == nil || len(plan.Steps) == 0 { if plan == nil || len(plan.Steps) == 0 {
return false return false

View File

@@ -7,7 +7,6 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage/model" "github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1"
) )
type gatewayRegistry struct { type gatewayRegistry struct {
@@ -52,108 +51,6 @@ func (r *gatewayRegistry) List(ctx context.Context) ([]*model.GatewayInstanceDes
return result, nil return result, nil
} }
func modelGatewayFromProto(src *gatewayv1.GatewayInstanceDescriptor) *model.GatewayInstanceDescriptor {
if src == nil {
return nil
}
limits := modelLimitsFromProto(src.GetLimits())
return &model.GatewayInstanceDescriptor{
ID: strings.TrimSpace(src.GetId()),
Rail: modelRailFromProto(src.GetRail()),
Network: strings.ToUpper(strings.TrimSpace(src.GetNetwork())),
Currencies: normalizeCurrencies(src.GetCurrencies()),
Capabilities: modelCapabilitiesFromProto(src.GetCapabilities()),
Limits: limits,
Version: strings.TrimSpace(src.GetVersion()),
IsEnabled: src.GetIsEnabled(),
}
}
func modelRailFromProto(rail gatewayv1.Rail) model.Rail {
switch rail {
case gatewayv1.Rail_RAIL_CRYPTO:
return model.RailCrypto
case gatewayv1.Rail_RAIL_PROVIDER_SETTLEMENT:
return model.RailProviderSettlement
case gatewayv1.Rail_RAIL_LEDGER:
return model.RailLedger
case gatewayv1.Rail_RAIL_CARD_PAYOUT:
return model.RailCardPayout
case gatewayv1.Rail_RAIL_FIAT_ONRAMP:
return model.RailFiatOnRamp
default:
return model.RailUnspecified
}
}
func modelCapabilitiesFromProto(src *gatewayv1.RailCapabilities) model.RailCapabilities {
if src == nil {
return model.RailCapabilities{}
}
return model.RailCapabilities{
CanPayIn: src.GetCanPayIn(),
CanPayOut: src.GetCanPayOut(),
CanReadBalance: src.GetCanReadBalance(),
CanSendFee: src.GetCanSendFee(),
RequiresObserveConfirm: src.GetRequiresObserveConfirm(),
}
}
func modelLimitsFromProto(src *gatewayv1.Limits) model.Limits {
if src == nil {
return model.Limits{}
}
limits := model.Limits{
MinAmount: strings.TrimSpace(src.GetMinAmount()),
MaxAmount: strings.TrimSpace(src.GetMaxAmount()),
PerTxMaxFee: strings.TrimSpace(src.GetPerTxMaxFee()),
PerTxMinAmount: strings.TrimSpace(src.GetPerTxMinAmount()),
PerTxMaxAmount: strings.TrimSpace(src.GetPerTxMaxAmount()),
}
if len(src.GetVolumeLimit()) > 0 {
limits.VolumeLimit = map[string]string{}
for key, value := range src.GetVolumeLimit() {
bucket := strings.TrimSpace(key)
amount := strings.TrimSpace(value)
if bucket == "" || amount == "" {
continue
}
limits.VolumeLimit[bucket] = amount
}
}
if len(src.GetVelocityLimit()) > 0 {
limits.VelocityLimit = map[string]int{}
for key, value := range src.GetVelocityLimit() {
bucket := strings.TrimSpace(key)
if bucket == "" {
continue
}
limits.VelocityLimit[bucket] = int(value)
}
}
if len(src.GetCurrencyLimits()) > 0 {
limits.CurrencyLimits = map[string]model.LimitsOverride{}
for key, override := range src.GetCurrencyLimits() {
currency := strings.ToUpper(strings.TrimSpace(key))
if currency == "" || override == nil {
continue
}
limits.CurrencyLimits[currency] = model.LimitsOverride{
MaxVolume: strings.TrimSpace(override.GetMaxVolume()),
MinAmount: strings.TrimSpace(override.GetMinAmount()),
MaxAmount: strings.TrimSpace(override.GetMaxAmount()),
MaxFee: strings.TrimSpace(override.GetMaxFee()),
MaxOps: int(override.GetMaxOps()),
}
}
}
return limits
}
func normalizeCurrencies(values []string) []string { func normalizeCurrencies(values []string) []string {
if len(values) == 0 { if len(values) == 0 {
return nil return nil

View File

@@ -0,0 +1,133 @@
package orchestrator
import (
"context"
"sort"
"strings"
"github.com/shopspring/decimal"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.uber.org/zap"
)
func (s *Service) resolveChainGatewayClient(ctx context.Context, network string, amount *paymenttypes.Money, actions []model.RailOperation, instanceID string, paymentRef string) (chainclient.Client, *model.GatewayInstanceDescriptor, error) {
if s.deps.gatewayRegistry != nil && s.deps.gatewayInvokeResolver != nil {
entry, err := selectGatewayForActions(ctx, s.deps.gatewayRegistry, model.RailCrypto, network, amount, actions, instanceID, sendDirectionForRail(model.RailCrypto))
if err != nil {
return nil, nil, err
}
invokeURI := strings.TrimSpace(entry.InvokeURI)
if invokeURI == "" {
return nil, nil, merrors.InvalidArgument("chain gateway: invoke uri is required")
}
client, err := s.deps.gatewayInvokeResolver.Resolve(ctx, invokeURI)
if err != nil {
return nil, nil, err
}
if s.logger != nil {
fields := []zap.Field{
zap.String("gateway_id", entry.ID),
zap.String("instance_id", entry.InstanceID),
zap.String("rail", string(entry.Rail)),
zap.String("network", entry.Network),
zap.String("invoke_uri", invokeURI),
}
if paymentRef != "" {
fields = append(fields, zap.String("payment_ref", paymentRef))
}
if len(actions) > 0 {
fields = append(fields, zap.Strings("actions", railActionNames(actions)))
}
s.logger.Info("Chain gateway selected", fields...)
}
return client, entry, nil
}
if s.deps.gateway.resolver != nil {
client, err := s.deps.gateway.resolver.Resolve(ctx, network)
if err != nil {
return nil, nil, err
}
return client, nil, nil
}
return nil, nil, merrors.NoData("chain gateway unavailable")
}
func selectGatewayForActions(ctx context.Context, registry GatewayRegistry, rail model.Rail, network string, amount *paymenttypes.Money, actions []model.RailOperation, instanceID string, dir sendDirection) (*model.GatewayInstanceDescriptor, error) {
if registry == nil {
return nil, merrors.NoData("gateway registry unavailable")
}
all, err := registry.List(ctx)
if err != nil {
return nil, err
}
if len(all) == 0 {
return nil, merrors.NoData("no gateway instances available")
}
if len(actions) == 0 {
actions = []model.RailOperation{model.RailOperationSend}
}
currency := ""
amt := decimal.Zero
if amount != nil && strings.TrimSpace(amount.GetAmount()) != "" {
amt, err = decimalFromMoney(amount)
if err != nil {
return nil, err
}
currency = strings.ToUpper(strings.TrimSpace(amount.GetCurrency()))
}
network = strings.ToUpper(strings.TrimSpace(network))
eligible := make([]*model.GatewayInstanceDescriptor, 0)
for _, entry := range all {
if entry == nil || !entry.IsEnabled {
continue
}
if entry.Rail != rail {
continue
}
if instanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(instanceID)) {
continue
}
ok := true
for _, action := range actions {
if !isGatewayEligible(entry, rail, network, currency, action, dir, amt) {
ok = false
break
}
}
if !ok {
continue
}
eligible = append(eligible, entry)
}
if len(eligible) == 0 {
return nil, merrors.NoData("no eligible gateway instance found")
}
sort.Slice(eligible, func(i, j int) bool {
return eligible[i].ID < eligible[j].ID
})
return eligible[0], nil
}
func railActionNames(actions []model.RailOperation) []string {
if len(actions) == 0 {
return nil
}
names := make([]string, 0, len(actions))
for _, action := range actions {
name := strings.TrimSpace(string(action))
if name == "" {
continue
}
names = append(names, name)
}
if len(names) == 0 {
return nil
}
return names
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice" "github.com/tech/sendico/pkg/mservice"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1" chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1" orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -21,15 +22,17 @@ type paymentEventHandler struct {
logger mlogger.Logger logger mlogger.Logger
submitCardPayout func(ctx context.Context, payment *model.Payment) error submitCardPayout func(ctx context.Context, payment *model.Payment) error
resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error
releaseHold func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error
} }
func newPaymentEventHandler(repo storage.Repository, ensure func(ctx context.Context) error, logger mlogger.Logger, submitCardPayout func(ctx context.Context, payment *model.Payment) error, resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error) *paymentEventHandler { func newPaymentEventHandler(repo storage.Repository, ensure func(ctx context.Context) error, logger mlogger.Logger, submitCardPayout func(ctx context.Context, payment *model.Payment) error, resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error, releaseHold func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error) *paymentEventHandler {
return &paymentEventHandler{ return &paymentEventHandler{
repo: repo, repo: repo,
ensureRepo: ensure, ensureRepo: ensure,
logger: logger, logger: logger,
submitCardPayout: submitCardPayout, submitCardPayout: submitCardPayout,
resumePlan: resumePlan, resumePlan: resumePlan,
releaseHold: releaseHold,
} }
} }
@@ -248,6 +251,21 @@ func (h *paymentEventHandler) processCardPayoutUpdate(ctx context.Context, req *
} }
applyCardPayoutUpdate(payment, payout) applyCardPayoutUpdate(payment, payout)
switch payout.GetStatus() {
case mntxv1.PayoutStatus_PAYOUT_STATUS_PROCESSED:
if h.resumePlan != nil && payment.PaymentPlan != nil && len(payment.PaymentPlan.Steps) > 0 {
if err := h.resumePlan(ctx, store, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
}
case mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED:
if h.releaseHold != nil && payment.PaymentPlan != nil && len(payment.PaymentPlan.Steps) > 0 {
if err := h.releaseHold(ctx, store, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
}
}
if err := store.Update(ctx, payment); err != nil { if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err) return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
} }

View File

@@ -6,6 +6,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/shopspring/decimal"
oracleclient "github.com/tech/sendico/fx/oracle/client" oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client" chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client" mntxclient "github.com/tech/sendico/gateway/mntx/client"
@@ -14,20 +15,38 @@ import (
clockpkg "github.com/tech/sendico/pkg/clock" clockpkg "github.com/tech/sendico/pkg/clock"
"github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/merrors"
mb "github.com/tech/sendico/pkg/messaging/broker" mb "github.com/tech/sendico/pkg/messaging/broker"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/payments/rail" "github.com/tech/sendico/pkg/payments/rail"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1" feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
"go.uber.org/zap"
) )
// Option configures service dependencies. // Option configures service dependencies.
type Option func(*Service) type Option func(*Service)
// GatewayInvokeResolver resolves gateway invoke URIs into chain gateway clients.
type GatewayInvokeResolver interface {
Resolve(ctx context.Context, invokeURI string) (chainclient.Client, error)
}
// ChainGatewayResolver resolves chain gateway clients by network.
type ChainGatewayResolver interface {
Resolve(ctx context.Context, network string) (chainclient.Client, error)
}
type feesDependency struct { type feesDependency struct {
client feesv1.FeeEngineClient client feesv1.FeeEngineClient
timeout time.Duration timeout time.Duration
} }
func (f feesDependency) available() bool { func (f feesDependency) available() bool {
return f.client != nil if f.client == nil {
return false
}
if checker, ok := f.client.(interface{ Available() bool }); ok {
return checker.Available()
}
return true
} }
type ledgerDependency struct { type ledgerDependency struct {
@@ -35,28 +54,25 @@ type ledgerDependency struct {
internal rail.InternalLedger internal rail.InternalLedger
} }
func (l ledgerDependency) available() bool {
return l.client != nil
}
type gatewayDependency struct { type gatewayDependency struct {
client chainclient.Client resolver ChainGatewayResolver
} }
func (g gatewayDependency) available() bool { func (g gatewayDependency) available() bool {
return g.client != nil return g.resolver != nil
} }
type railGatewayDependency struct { type railGatewayDependency struct {
byID map[string]rail.RailGateway byID map[string]rail.RailGateway
byRail map[model.Rail][]rail.RailGateway byRail map[model.Rail][]rail.RailGateway
registry GatewayRegistry registry GatewayRegistry
chainClient chainclient.Client chainResolver GatewayInvokeResolver
providerClient chainclient.Client providerResolver GatewayInvokeResolver
logger mlogger.Logger
} }
func (g railGatewayDependency) available() bool { func (g railGatewayDependency) available() bool {
return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && (g.chainClient != nil || g.providerClient != nil)) return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && (g.chainResolver != nil || g.providerResolver != nil))
} }
func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) { func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) {
@@ -64,11 +80,10 @@ func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentS
return nil, merrors.InvalidArgument("rail gateway: step is required") return nil, merrors.InvalidArgument("rail gateway: step is required")
} }
if id := strings.TrimSpace(step.GatewayID); id != "" { if id := strings.TrimSpace(step.GatewayID); id != "" {
gw, ok := g.byID[id] if gw, ok := g.byID[id]; ok {
if !ok { return gw, nil
return nil, merrors.InvalidArgument("rail gateway: unknown gateway id")
} }
return gw, nil return g.resolveDynamic(ctx, step)
} }
if len(g.byRail) == 0 { if len(g.byRail) == 0 {
return g.resolveDynamic(ctx, step) return g.resolveDynamic(ctx, step)
@@ -81,13 +96,32 @@ func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentS
} }
func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) { func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) {
if g.registry == nil || (g.chainClient == nil && g.providerClient == nil) { if g.registry == nil {
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail") return nil, merrors.InvalidArgument("rail gateway: registry is required")
}
if g.chainResolver == nil && g.providerResolver == nil {
return nil, merrors.InvalidArgument("rail gateway: gateway resolver is required")
} }
items, err := g.registry.List(ctx) items, err := g.registry.List(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(items) == 0 {
return nil, merrors.InvalidArgument("rail gateway: no gateway instances available")
}
currency := ""
amount := decimal.Zero
if step.Amount != nil && strings.TrimSpace(step.Amount.GetAmount()) != "" {
value, err := decimalFromMoney(step.Amount)
if err != nil {
return nil, err
}
amount = value
currency = strings.ToUpper(strings.TrimSpace(step.Amount.GetCurrency()))
}
candidates := make([]*model.GatewayInstanceDescriptor, 0)
for _, entry := range items { for _, entry := range items {
if entry == nil || !entry.IsEnabled { if entry == nil || !entry.IsEnabled {
continue continue
@@ -98,31 +132,73 @@ func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.P
if step.GatewayID != "" && entry.ID != step.GatewayID { if step.GatewayID != "" && entry.ID != step.GatewayID {
continue continue
} }
cfg := chainclient.RailGatewayConfig{ if step.InstanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(step.InstanceID)) {
Rail: string(entry.Rail), continue
Network: entry.Network,
Capabilities: rail.RailCapabilities{
CanPayIn: entry.Capabilities.CanPayIn,
CanPayOut: entry.Capabilities.CanPayOut,
CanReadBalance: entry.Capabilities.CanReadBalance,
CanSendFee: entry.Capabilities.CanSendFee,
RequiresObserveConfirm: entry.Capabilities.RequiresObserveConfirm,
},
} }
switch entry.Rail { if step.Action != model.RailOperationUnspecified {
case model.RailProviderSettlement: if !isGatewayEligible(entry, step.Rail, "", currency, step.Action, sendDirectionForRail(step.Rail), amount) {
if g.providerClient == nil { continue
return nil, merrors.InvalidArgument("rail gateway: missing provider settlement client")
} }
return NewProviderSettlementGateway(g.providerClient, cfg), nil
default:
if g.chainClient == nil {
return nil, merrors.InvalidArgument("rail gateway: missing gateway client")
}
return chainclient.NewRailGateway(g.chainClient, cfg), nil
} }
candidates = append(candidates, entry)
}
if len(candidates) == 0 {
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail")
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].ID < candidates[j].ID
})
entry := candidates[0]
invokeURI := strings.TrimSpace(entry.InvokeURI)
if invokeURI == "" {
return nil, merrors.InvalidArgument("rail gateway: invoke uri is required")
}
cfg := chainclient.RailGatewayConfig{
Rail: string(entry.Rail),
Network: entry.Network,
Capabilities: rail.RailCapabilities{
CanPayIn: entry.Capabilities.CanPayIn,
CanPayOut: entry.Capabilities.CanPayOut,
CanReadBalance: entry.Capabilities.CanReadBalance,
CanSendFee: entry.Capabilities.CanSendFee,
RequiresObserveConfirm: entry.Capabilities.RequiresObserveConfirm,
CanBlock: entry.Capabilities.CanBlock,
CanRelease: entry.Capabilities.CanRelease,
},
}
if g.logger != nil {
g.logger.Info("Rail gateway resolved",
zap.String("step_id", strings.TrimSpace(step.StepID)),
zap.String("action", string(step.Action)),
zap.String("gateway_id", entry.ID),
zap.String("instance_id", entry.InstanceID),
zap.String("rail", string(entry.Rail)),
zap.String("network", entry.Network),
zap.String("invoke_uri", invokeURI))
}
switch entry.Rail {
case model.RailProviderSettlement:
if g.providerResolver == nil {
return nil, merrors.InvalidArgument("rail gateway: provider settlement resolver required")
}
client, err := g.providerResolver.Resolve(ctx, invokeURI)
if err != nil {
return nil, err
}
return NewProviderSettlementGateway(client, cfg), nil
default:
if g.chainResolver == nil {
return nil, merrors.InvalidArgument("rail gateway: chain gateway resolver required")
}
client, err := g.chainResolver.Resolve(ctx, invokeURI)
if err != nil {
return nil, err
}
return chainclient.NewRailGateway(client, cfg), nil
} }
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail")
} }
type oracleDependency struct { type oracleDependency struct {
@@ -130,7 +206,13 @@ type oracleDependency struct {
} }
func (o oracleDependency) available() bool { func (o oracleDependency) available() bool {
return o.client != nil if o.client == nil {
return false
}
if checker, ok := o.client.(interface{ Available() bool }); ok {
return checker.Available()
}
return true
} }
type mntxDependency struct { type mntxDependency struct {
@@ -138,23 +220,32 @@ type mntxDependency struct {
} }
func (m mntxDependency) available() bool { func (m mntxDependency) available() bool {
return m.client != nil if m.client == nil {
} return false
}
type gatewayRegistryDependency struct { if checker, ok := m.client.(interface{ Available() bool }); ok {
registry GatewayRegistry return checker.Available()
} }
return true
func (g gatewayRegistryDependency) available() bool {
return g.registry != nil
} }
type providerGatewayDependency struct { type providerGatewayDependency struct {
client chainclient.Client resolver ChainGatewayResolver
} }
func (p providerGatewayDependency) available() bool { func (p providerGatewayDependency) available() bool {
return p.client != nil return p.resolver != nil
}
type staticChainGatewayResolver struct {
client chainclient.Client
}
func (r staticChainGatewayResolver) Resolve(ctx context.Context, _ string) (chainclient.Client, error) {
if r.client == nil {
return nil, merrors.InvalidArgument("chain gateway client is required")
}
return r.client, nil
} }
// CardGatewayRoute maps a gateway to its funding and fee destinations. // CardGatewayRoute maps a gateway to its funding and fee destinations.
@@ -195,14 +286,44 @@ func WithLedgerClient(client ledgerclient.Client) Option {
// WithChainGatewayClient wires the chain gateway client. // WithChainGatewayClient wires the chain gateway client.
func WithChainGatewayClient(client chainclient.Client) Option { func WithChainGatewayClient(client chainclient.Client) Option {
return func(s *Service) { return func(s *Service) {
s.deps.gateway = gatewayDependency{client: client} s.deps.gateway = gatewayDependency{resolver: staticChainGatewayResolver{client: client}}
}
}
// WithChainGatewayResolver wires a resolver for chain gateway clients.
func WithChainGatewayResolver(resolver ChainGatewayResolver) Option {
return func(s *Service) {
if resolver != nil {
s.deps.gateway = gatewayDependency{resolver: resolver}
}
} }
} }
// WithProviderSettlementGatewayClient wires the provider settlement gateway client. // WithProviderSettlementGatewayClient wires the provider settlement gateway client.
func WithProviderSettlementGatewayClient(client chainclient.Client) Option { func WithProviderSettlementGatewayClient(client chainclient.Client) Option {
return func(s *Service) { return func(s *Service) {
s.deps.providerGateway = providerGatewayDependency{client: client} s.deps.providerGateway = providerGatewayDependency{resolver: staticChainGatewayResolver{client: client}}
}
}
// WithProviderSettlementGatewayResolver wires a resolver for provider settlement gateway clients.
func WithProviderSettlementGatewayResolver(resolver ChainGatewayResolver) Option {
return func(s *Service) {
if resolver != nil {
s.deps.providerGateway = providerGatewayDependency{resolver: resolver}
}
}
}
// WithGatewayInvokeResolver wires a resolver for gateway invoke URIs.
func WithGatewayInvokeResolver(resolver GatewayInvokeResolver) Option {
return func(s *Service) {
if resolver == nil {
return
}
s.deps.gatewayInvokeResolver = resolver
s.deps.railGateways.chainResolver = resolver
s.deps.railGateways.providerResolver = resolver
} }
} }
@@ -212,7 +333,7 @@ func WithRailGateways(gateways map[string]rail.RailGateway) Option {
if len(gateways) == 0 { if len(gateways) == 0 {
return return
} }
s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gateway.client, s.deps.providerGateway.client) s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gatewayInvokeResolver, s.deps.gatewayInvokeResolver, s.logger)
} }
} }
@@ -276,8 +397,9 @@ func WithGatewayRegistry(registry GatewayRegistry) Option {
if registry != nil { if registry != nil {
s.deps.gatewayRegistry = registry s.deps.gatewayRegistry = registry
s.deps.railGateways.registry = registry s.deps.railGateways.registry = registry
s.deps.railGateways.chainClient = s.deps.gateway.client s.deps.railGateways.chainResolver = s.deps.gatewayInvokeResolver
s.deps.railGateways.providerClient = s.deps.providerGateway.client s.deps.railGateways.providerResolver = s.deps.gatewayInvokeResolver
s.deps.railGateways.logger = s.logger.Named("rail_gateways")
if s.deps.planBuilder == nil { if s.deps.planBuilder == nil {
s.deps.planBuilder = &defaultPlanBuilder{} s.deps.planBuilder = &defaultPlanBuilder{}
} }
@@ -294,13 +416,14 @@ func WithClock(clock clockpkg.Clock) Option {
} }
} }
func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainClient chainclient.Client, providerClient chainclient.Client) railGatewayDependency { func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainResolver GatewayInvokeResolver, providerResolver GatewayInvokeResolver, logger mlogger.Logger) railGatewayDependency {
result := railGatewayDependency{ result := railGatewayDependency{
byID: map[string]rail.RailGateway{}, byID: map[string]rail.RailGateway{},
byRail: map[model.Rail][]rail.RailGateway{}, byRail: map[model.Rail][]rail.RailGateway{},
registry: registry, registry: registry,
chainClient: chainClient, chainResolver: chainResolver,
providerClient: providerClient, providerResolver: providerResolver,
logger: logger,
} }
if len(gateways) == 0 { if len(gateways) == 0 {
return result return result

View File

@@ -46,6 +46,10 @@ func (p *paymentExecutor) executePaymentPlan(ctx context.Context, store storage.
execStep = &model.ExecutionStep{Code: stepID} execStep = &model.ExecutionStep{Code: stepID}
execSteps[stepID] = execStep execSteps[stepID] = execStep
} }
if step.Action == model.RailOperationRelease {
setExecutionStepStatus(execStep, executionStepStatusSkipped)
continue
}
status := executionStepStatus(execStep) status := executionStepStatus(execStep)
switch status { switch status {
case executionStepStatusConfirmed, executionStepStatusSkipped: case executionStepStatusConfirmed, executionStepStatusSkipped:
@@ -86,7 +90,11 @@ func (p *paymentExecutor) executePaymentPlan(ctx context.Context, store storage.
} }
if asyncSubmitted && !executionPlanComplete(execPlan) { if asyncSubmitted && !executionPlanComplete(execPlan) {
payment.State = model.PaymentStateSubmitted if blockStepConfirmed(plan, execPlan) {
payment.State = model.PaymentStateFundsReserved
} else {
payment.State = model.PaymentStateSubmitted
}
return p.persistPayment(ctx, store, payment) return p.persistPayment(ctx, store, payment)
} }
payment.State = model.PaymentStateSettled payment.State = model.PaymentStateSettled

View File

@@ -12,6 +12,7 @@ import (
"github.com/tech/sendico/pkg/payments/rail" "github.com/tech/sendico/pkg/payments/rail"
paymenttypes "github.com/tech/sendico/pkg/payments/types" paymenttypes "github.com/tech/sendico/pkg/payments/types"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1" orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap" "go.uber.org/zap"
@@ -64,7 +65,7 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
deps: serviceDependencies{ deps: serviceDependencies{
railGateways: buildRailGatewayDependency(map[string]rail.RailGateway{ railGateways: buildRailGatewayDependency(map[string]rail.RailGateway{
"crypto-default": railGateway, "crypto-default": railGateway,
}, nil, nil, nil), }, nil, nil, nil, nil),
ledger: ledgerDependency{ ledger: ledgerDependency{
client: ledgerFake, client: ledgerFake,
internal: ledgerFake, internal: ledgerFake,
@@ -196,3 +197,146 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
t.Fatalf("expected ledger debit after payout confirmation, debit=%d credit=%d", debitCalls, creditCalls) t.Fatalf("expected ledger debit after payout confirmation, debit=%d credit=%d", debitCalls, creditCalls)
} }
} }
func TestExecutePaymentPlan_BlockThenDebitFromHold(t *testing.T) {
ctx := context.Background()
store := newStubPaymentsStore()
repo := &stubRepository{store: store}
blockCalls := 0
var blockReq *ledgerv1.TransferRequest
debitCalls := 0
var debitTx rail.LedgerTx
ledgerFake := &ledgerclient.Fake{
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
blockCalls++
blockReq = req
return &ledgerv1.PostResponse{JournalEntryRef: "hold-1"}, nil
},
CreateTransactionFn: func(ctx context.Context, tx rail.LedgerTx) (string, error) {
debitCalls++
debitTx = tx
return "debit-1", nil
},
}
mntxFake := &mntxclient.Fake{
CreateCardPayoutFn: func(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
return &mntxv1.CardPayoutResponse{Payout: &mntxv1.CardPayoutState{PayoutId: "payout-1"}}, nil
},
}
svc := &Service{
logger: zap.NewNop(),
storage: repo,
deps: serviceDependencies{
ledger: ledgerDependency{
client: ledgerFake,
internal: ledgerFake,
},
mntx: mntxDependency{client: mntxFake},
cardRoutes: map[string]CardGatewayRoute{
defaultCardGateway: {
FundingAddress: "funding-address",
FeeWalletRef: "fee-wallet",
},
},
},
}
executor := newPaymentExecutor(&svc.deps, svc.logger, svc)
payment := &model.Payment{
PaymentRef: "pay-block-1",
IdempotencyKey: "pay-block-1",
OrganizationBoundBase: mo.OrganizationBoundBase{
OrganizationRef: primitive.NewObjectID(),
},
Intent: model.PaymentIntent{
Kind: model.PaymentKindPayout,
Source: model.PaymentEndpoint{
Type: model.EndpointTypeManagedWallet,
ManagedWallet: &model.ManagedWalletEndpoint{
ManagedWalletRef: "wallet-src",
},
},
Destination: model.PaymentEndpoint{
Type: model.EndpointTypeCard,
Card: &model.CardEndpoint{
Pan: "4111111111111111",
Cardholder: "Ada",
CardholderSurname: "Lovelace",
ExpMonth: 1,
ExpYear: 2030,
MaskedPan: "4111",
},
},
Attributes: map[string]string{
"ledger_debit_account_ref": "ledger:debit",
"ledger_block_account_ref": "ledger:block",
},
Customer: &model.Customer{
ID: "cust-1",
FirstName: "Ada",
LastName: "Lovelace",
IP: "1.2.3.4",
},
},
PaymentPlan: &model.PaymentPlan{
ID: "pay-block-1",
IdempotencyKey: "pay-block-1",
Steps: []*model.PaymentStep{
{StepID: "ledger_block", Rail: model.RailLedger, Action: model.RailOperationBlock, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "card_payout", Rail: model.RailCardPayout, Action: model.RailOperationSend, DependsOn: []string{"ledger_block"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "ledger_debit", Rail: model.RailLedger, Action: model.RailOperationDebit, DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "ledger_release", Rail: model.RailLedger, Action: model.RailOperationRelease, DependsOn: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
},
},
}
store.payments[payment.PaymentRef] = payment
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan error: %v", err)
}
if blockCalls != 1 || blockReq == nil {
t.Fatalf("expected ledger block transfer, calls=%d", blockCalls)
}
if blockReq.GetFromLedgerAccountRef() != "ledger:debit" {
t.Fatalf("unexpected block from account: %s", blockReq.GetFromLedgerAccountRef())
}
if blockReq.GetToLedgerAccountRef() != "ledger:block" {
t.Fatalf("unexpected block to account: %s", blockReq.GetToLedgerAccountRef())
}
if debitCalls != 0 {
t.Fatalf("expected no debit before payout confirmation, got %d", debitCalls)
}
if payment.State != model.PaymentStateFundsReserved {
t.Fatalf("expected funds reserved state, got %s", payment.State)
}
steps := executionStepsByCode(payment.ExecutionPlan)
cardStep := steps["card_payout"]
if cardStep == nil {
t.Fatalf("expected card payout step in execution plan")
}
setExecutionStepStatus(cardStep, executionStepStatusConfirmed)
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan resume error: %v", err)
}
if debitCalls != 1 {
t.Fatalf("expected ledger debit after payout confirmation, got %d", debitCalls)
}
if debitTx.LedgerAccountRef != "ledger:block" {
t.Fatalf("expected debit from block account, got %s", debitTx.LedgerAccountRef)
}
if debitTx.ContraLedgerAccountRef != "" {
t.Fatalf("expected contra to be cleared after block, got %s", debitTx.ContraLedgerAccountRef)
}
if payment.State != model.PaymentStateSettled {
t.Fatalf("expected settled state, got %s", payment.State)
}
}

View File

@@ -80,6 +80,26 @@ func executionPlanComplete(plan *model.ExecutionPlan) bool {
return true return true
} }
func blockStepConfirmed(plan *model.PaymentPlan, execPlan *model.ExecutionPlan) bool {
if plan == nil || execPlan == nil || len(plan.Steps) == 0 {
return false
}
execSteps := executionStepsByCode(execPlan)
for idx, step := range plan.Steps {
if step == nil || step.Action != model.RailOperationBlock {
continue
}
execStep := execSteps[planStepID(step, idx)]
if execStep == nil {
continue
}
if executionStepStatus(execStep) == executionStepStatusConfirmed {
return true
}
}
return false
}
func planStepID(step *model.PaymentStep, idx int) string { func planStepID(step *model.PaymentStep, idx int) string {
if step != nil { if step != nil {
if val := strings.TrimSpace(step.StepID); val != "" { if val := strings.TrimSpace(step.StepID); val != "" {

View File

@@ -11,28 +11,167 @@ import (
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1" orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
) )
func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) { func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
}
if p.deps.ledger.internal == nil { if p.deps.ledger.internal == nil {
p.logger.Error("Ledger client unavailable", zap.String("action", "debit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable") return "", merrors.Internal("ledger_client_unavailable")
} }
tx, err := p.ledgerTxForAction(payment, amount, charges, idempotencyKey, idx, model.RailOperationDebit, quote) tx, err := p.ledgerTxForAction(payment, amount, charges, idempotencyKey, idx, model.RailOperationDebit, quote)
if err != nil { if err != nil {
p.logger.Warn("Ledger debit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err return "", err
} }
return p.deps.ledger.internal.CreateTransaction(ctx, tx) ref, err := p.deps.ledger.internal.CreateTransaction(ctx, tx)
if err != nil {
p.logger.Warn("Ledger debit failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
}
return ref, nil
} }
func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) { func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
}
if p.deps.ledger.internal == nil { if p.deps.ledger.internal == nil {
p.logger.Error("Ledger client unavailable", zap.String("action", "credit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable") return "", merrors.Internal("ledger_client_unavailable")
} }
tx, err := p.ledgerTxForAction(payment, amount, nil, idempotencyKey, idx, model.RailOperationCredit, quote) tx, err := p.ledgerTxForAction(payment, amount, nil, idempotencyKey, idx, model.RailOperationCredit, quote)
if err != nil { if err != nil {
p.logger.Warn("Ledger credit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err return "", err
} }
return p.deps.ledger.internal.CreateTransaction(ctx, tx) ref, err := p.deps.ledger.internal.CreateTransaction(ctx, tx)
if err != nil {
p.logger.Warn("Ledger credit failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
}
return ref, nil
}
func (p *paymentExecutor) postLedgerBlock(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
}
if p.deps.ledger.internal == nil {
p.logger.Error("Ledger client unavailable", zap.String("action", "block"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
if payment == nil {
return "", merrors.InvalidArgument("ledger: payment is required")
}
if payment.OrganizationRef == primitive.NilObjectID {
return "", merrors.InvalidArgument("ledger: organization_ref is required")
}
if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" {
return "", merrors.InvalidArgument("ledger: amount is required")
}
sourceAccount, err := ledgerDebitAccountRef(payment)
if err != nil {
return "", err
}
blockAccount, err := ledgerBlockAccount(payment)
if err != nil {
return "", err
}
resp, err := p.deps.ledger.internal.TransferInternal(ctx, &ledgerv1.TransferRequest{
IdempotencyKey: strings.TrimSpace(idempotencyKey),
OrganizationRef: payment.OrganizationRef.Hex(),
FromLedgerAccountRef: strings.TrimSpace(sourceAccount),
ToLedgerAccountRef: strings.TrimSpace(blockAccount),
Money: cloneProtoMoney(amount),
Description: paymentDescription(payment),
Metadata: cloneMetadata(payment.Metadata),
})
if err != nil {
p.logger.Warn("Ledger block failed",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("from_account", strings.TrimSpace(sourceAccount)),
zap.String("to_account", strings.TrimSpace(blockAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", strings.TrimSpace(amount.GetCurrency())),
zap.Error(err))
return "", err
}
entryRef := strings.TrimSpace(resp.GetJournalEntryRef())
p.logger.Info("Ledger block posted",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("entry_ref", entryRef),
zap.String("from_account", strings.TrimSpace(sourceAccount)),
zap.String("to_account", strings.TrimSpace(blockAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", strings.TrimSpace(amount.GetCurrency())))
return entryRef, nil
}
func (p *paymentExecutor) postLedgerRelease(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
}
if p.deps.ledger.internal == nil {
p.logger.Error("Ledger client unavailable", zap.String("action", "release"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
if payment == nil {
return "", merrors.InvalidArgument("ledger: payment is required")
}
if payment.OrganizationRef == primitive.NilObjectID {
return "", merrors.InvalidArgument("ledger: organization_ref is required")
}
if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" {
return "", merrors.InvalidArgument("ledger: amount is required")
}
sourceAccount, err := ledgerDebitAccountRef(payment)
if err != nil {
return "", err
}
blockAccount, err := ledgerBlockAccount(payment)
if err != nil {
return "", err
}
resp, err := p.deps.ledger.internal.TransferInternal(ctx, &ledgerv1.TransferRequest{
IdempotencyKey: strings.TrimSpace(idempotencyKey),
OrganizationRef: payment.OrganizationRef.Hex(),
FromLedgerAccountRef: strings.TrimSpace(blockAccount),
ToLedgerAccountRef: strings.TrimSpace(sourceAccount),
Money: cloneProtoMoney(amount),
Description: paymentDescription(payment),
Metadata: cloneMetadata(payment.Metadata),
})
if err != nil {
p.logger.Warn("Ledger release failed",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("from_account", strings.TrimSpace(blockAccount)),
zap.String("to_account", strings.TrimSpace(sourceAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", strings.TrimSpace(amount.GetCurrency())),
zap.Error(err))
return "", err
}
entryRef := strings.TrimSpace(resp.GetJournalEntryRef())
p.logger.Info("Ledger release posted",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("entry_ref", entryRef),
zap.String("from_account", strings.TrimSpace(blockAccount)),
zap.String("to_account", strings.TrimSpace(sourceAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", strings.TrimSpace(amount.GetCurrency())))
return entryRef, nil
} }
func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (rail.LedgerTx, error) { func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (rail.LedgerTx, error) {
@@ -66,6 +205,12 @@ func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *mone
fromRail = model.RailLedger fromRail = model.RailLedger
toRail = ledgerStepToRail(payment.PaymentPlan, idx, destRail) toRail = ledgerStepToRail(payment.PaymentPlan, idx, destRail)
accountRef, contraRef, err = ledgerDebitAccount(payment) accountRef, contraRef, err = ledgerDebitAccount(payment)
if err == nil {
if blockRef := ledgerBlockAccountIfConfirmed(payment); blockRef != "" {
accountRef = blockRef
contraRef = ""
}
}
case model.RailOperationCredit: case model.RailOperationCredit:
fromRail = ledgerStepFromRail(payment.PaymentPlan, idx, sourceRail) fromRail = ledgerStepFromRail(payment.PaymentPlan, idx, sourceRail)
toRail = model.RailLedger toRail = model.RailLedger
@@ -190,6 +335,48 @@ func ledgerDebitAccount(payment *model.Payment) (string, string, error) {
return "", "", merrors.InvalidArgument("ledger: source account is required") return "", "", merrors.InvalidArgument("ledger: source account is required")
} }
func ledgerDebitAccountRef(payment *model.Payment) (string, error) {
account, _, err := ledgerDebitAccount(payment)
return account, err
}
func ledgerBlockAccount(payment *model.Payment) (string, error) {
if payment == nil {
return "", merrors.InvalidArgument("ledger: payment is required")
}
intent := payment.Intent
if intent.Source.Ledger != nil {
if ref := strings.TrimSpace(intent.Source.Ledger.ContraLedgerAccountRef); ref != "" {
return ref, nil
}
}
if ref := attributeLookup(intent.Attributes,
"ledger_block_account_ref",
"ledgerBlockAccountRef",
"ledger_hold_account_ref",
"ledgerHoldAccountRef",
"ledger_debit_contra_account_ref",
"ledgerDebitContraAccountRef",
); ref != "" {
return ref, nil
}
return "", merrors.InvalidArgument("ledger: block account is required")
}
func ledgerBlockAccountIfConfirmed(payment *model.Payment) string {
if payment == nil {
return ""
}
if !blockStepConfirmed(payment.PaymentPlan, payment.ExecutionPlan) {
return ""
}
ref, err := ledgerBlockAccount(payment)
if err != nil {
return ""
}
return ref
}
func ledgerCreditAccount(payment *model.Payment) (string, string, error) { func ledgerCreditAccount(payment *model.Payment) (string, string, error) {
if payment == nil { if payment == nil {
return "", "", merrors.InvalidArgument("ledger: payment is required") return "", "", merrors.InvalidArgument("ledger: payment is required")

View File

@@ -0,0 +1,51 @@
package orchestrator
import (
"context"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"go.uber.org/zap"
)
func (p *paymentExecutor) releasePaymentHold(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error {
if store == nil {
return errStorageUnavailable
}
if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 {
return nil
}
execPlan := ensureExecutionPlanForPlan(payment, payment.PaymentPlan)
if execPlan == nil || !blockStepConfirmed(payment.PaymentPlan, execPlan) {
return nil
}
execSteps := executionStepsByCode(execPlan)
execQuote := executionQuote(payment, nil)
for idx, step := range payment.PaymentPlan.Steps {
if step == nil || step.Action != model.RailOperationRelease {
continue
}
stepID := planStepID(step, idx)
execStep := execSteps[stepID]
if execStep == nil {
execStep = &model.ExecutionStep{Code: stepID}
execSteps[stepID] = execStep
if idx < len(execPlan.Steps) {
execPlan.Steps[idx] = execStep
}
}
status := executionStepStatus(execStep)
if status == executionStepStatusConfirmed {
p.logger.Debug("Payment step already confirmed, skipping", zap.String("step_id", stepID), zap.String("quutation", execQuote.QuoteRef))
continue
}
if _, err := p.executePlanStep(ctx, payment, step, execStep, execQuote, nil, idx); err != nil {
p.logger.Warn("Failed to execute payment step", zap.Error(err),
zap.String("step_id", stepID), zap.String("quutation", execQuote.QuoteRef))
return err
}
}
return p.persistPayment(ctx, store, payment)
}

View File

@@ -0,0 +1,107 @@
package orchestrator
import (
"context"
"testing"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
mo "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
)
func TestReleasePaymentHold_TransfersFromHoldAccount(t *testing.T) {
ctx := context.Background()
store := newStubPaymentsStore()
repo := &stubRepository{store: store}
var releaseReq *ledgerv1.TransferRequest
ledgerFake := &ledgerclient.Fake{
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
releaseReq = req
return &ledgerv1.PostResponse{JournalEntryRef: "release-1"}, nil
},
}
svc := &Service{
logger: zap.NewNop(),
storage: repo,
deps: serviceDependencies{
ledger: ledgerDependency{
client: ledgerFake,
internal: ledgerFake,
},
},
}
executor := newPaymentExecutor(&svc.deps, svc.logger, svc)
payment := &model.Payment{
PaymentRef: "pay-release-1",
IdempotencyKey: "pay-release-1",
OrganizationBoundBase: mo.OrganizationBoundBase{
OrganizationRef: primitive.NewObjectID(),
},
Intent: model.PaymentIntent{
Kind: model.PaymentKindPayout,
Source: model.PaymentEndpoint{
Type: model.EndpointTypeManagedWallet,
ManagedWallet: &model.ManagedWalletEndpoint{
ManagedWalletRef: "wallet-src",
},
},
Attributes: map[string]string{
"ledger_debit_account_ref": "ledger:debit",
"ledger_block_account_ref": "ledger:block",
},
},
PaymentPlan: &model.PaymentPlan{
ID: "pay-release-1",
IdempotencyKey: "pay-release-1",
Steps: []*model.PaymentStep{
{StepID: "ledger_block", Rail: model.RailLedger, Action: model.RailOperationBlock, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "ledger_release", Rail: model.RailLedger, Action: model.RailOperationRelease, DependsOn: []string{"ledger_block"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
},
},
}
store.payments[payment.PaymentRef] = payment
execPlan := ensureExecutionPlanForPlan(payment, payment.PaymentPlan)
steps := executionStepsByCode(execPlan)
blockStep := steps["ledger_block"]
if blockStep == nil {
t.Fatalf("expected block step in execution plan")
}
setExecutionStepStatus(blockStep, executionStepStatusConfirmed)
if err := executor.releasePaymentHold(ctx, store, payment); err != nil {
t.Fatalf("releasePaymentHold error: %v", err)
}
if releaseReq == nil {
t.Fatalf("expected ledger release transfer")
}
if releaseReq.GetFromLedgerAccountRef() != "ledger:block" {
t.Fatalf("unexpected release from account: %s", releaseReq.GetFromLedgerAccountRef())
}
if releaseReq.GetToLedgerAccountRef() != "ledger:debit" {
t.Fatalf("unexpected release to account: %s", releaseReq.GetToLedgerAccountRef())
}
steps = executionStepsByCode(payment.ExecutionPlan)
releaseStep := steps["ledger_release"]
if releaseStep == nil {
t.Fatalf("expected release step in execution plan")
}
if executionStepStatus(releaseStep) != executionStepStatusConfirmed {
t.Fatalf("expected release step confirmed, got %s", executionStepStatus(releaseStep))
}
if releaseStep.TransferRef != "release-1" {
t.Fatalf("expected release transfer ref set, got %s", releaseStep.TransferRef)
}
}

View File

@@ -40,6 +40,36 @@ func (p *paymentExecutor) executePlanStep(ctx context.Context, payment *model.Pa
ensureExecutionRefs(payment).CreditEntryRef = ref ensureExecutionRefs(payment).CreditEntryRef = ref
setExecutionStepStatus(execStep, executionStepStatusConfirmed) setExecutionStepStatus(execStep, executionStepStatusConfirmed)
return false, nil return false, nil
case model.RailOperationBlock:
if step.Rail != model.RailLedger {
return false, merrors.InvalidArgument("payment plan: block requires ledger rail")
}
amount, err := requireMoney(cloneMoney(step.Amount), "ledger block amount")
if err != nil {
return false, err
}
ref, err := p.postLedgerBlock(ctx, payment, protoMoney(amount), planStepIdempotencyKey(payment, idx, step), idx)
if err != nil {
return false, err
}
execStep.TransferRef = strings.TrimSpace(ref)
setExecutionStepStatus(execStep, executionStepStatusConfirmed)
return false, nil
case model.RailOperationRelease:
if step.Rail != model.RailLedger {
return false, merrors.InvalidArgument("payment plan: release requires ledger rail")
}
amount, err := requireMoney(cloneMoney(step.Amount), "ledger release amount")
if err != nil {
return false, err
}
ref, err := p.postLedgerRelease(ctx, payment, protoMoney(amount), planStepIdempotencyKey(payment, idx, step), idx)
if err != nil {
return false, err
}
execStep.TransferRef = strings.TrimSpace(ref)
setExecutionStepStatus(execStep, executionStepStatusConfirmed)
return false, nil
case model.RailOperationFXConvert: case model.RailOperationFXConvert:
if err := p.applyFX(ctx, payment, quote, charges, paymentDescription(payment), cloneMetadata(payment.Metadata), ensureExecutionRefs(payment)); err != nil { if err := p.applyFX(ctx, payment, quote, charges, paymentDescription(payment), cloneMetadata(payment.Metadata), ensureExecutionRefs(payment)); err != nil {
return false, err return false, err

View File

@@ -125,7 +125,7 @@ func TestDefaultPlanBuilder_BuildsPlanFromRoutes_CryptoToCard(t *testing.T) {
t.Fatalf("expected 6 steps, got %d", len(plan.Steps)) t.Fatalf("expected 6 steps, got %d", len(plan.Steps))
} }
assertPlanStep(t, plan.Steps[0], "crypto_send", model.RailCrypto, model.RailOperationSend, "crypto-tron", "crypto-tron-1", "USDT", "100") assertPlanStep(t, plan.Steps[0], "crypto_send", model.RailCrypto, model.RailOperationSend, "crypto-tron", "crypto-tron-1", "USDT", "95")
assertPlanStep(t, plan.Steps[1], "crypto_fee", model.RailCrypto, model.RailOperationFee, "crypto-tron", "crypto-tron-1", "USDT", "5") assertPlanStep(t, plan.Steps[1], "crypto_fee", model.RailCrypto, model.RailOperationFee, "crypto-tron", "crypto-tron-1", "USDT", "5")
assertPlanStep(t, plan.Steps[2], "crypto_observe", model.RailCrypto, model.RailOperationObserveConfirm, "crypto-tron", "crypto-tron-1", "", "") assertPlanStep(t, plan.Steps[2], "crypto_observe", model.RailCrypto, model.RailOperationObserveConfirm, "crypto-tron", "crypto-tron-1", "", "")
assertPlanStep(t, plan.Steps[3], "ledger_credit", model.RailLedger, model.RailOperationCredit, "", "", "USDT", "95") assertPlanStep(t, plan.Steps[3], "ledger_credit", model.RailLedger, model.RailOperationCredit, "", "", "USDT", "95")

View File

@@ -160,6 +160,10 @@ func capabilityAllowsAction(cap model.RailCapabilities, action model.RailOperati
return cap.CanSendFee return cap.CanSendFee
case model.RailOperationObserveConfirm: case model.RailOperationObserveConfirm:
return cap.RequiresObserveConfirm return cap.RequiresObserveConfirm
case model.RailOperationBlock:
return cap.CanBlock
case model.RailOperationRelease:
return cap.CanRelease
default: default:
return true return true
} }

View File

@@ -69,6 +69,16 @@ func resolveSettlementAmount(payment *model.Payment, quote *orchestratorv1.Payme
return cloneMoney(fallback) return cloneMoney(fallback)
} }
func resolveDebitAmount(payment *model.Payment, quote *orchestratorv1.PaymentQuote, fallback *paymenttypes.Money) *paymenttypes.Money {
if quote != nil && quote.GetDebitAmount() != nil {
return moneyFromProto(quote.GetDebitAmount())
}
if payment != nil && payment.LastQuote != nil {
return cloneMoney(payment.LastQuote.DebitAmount)
}
return cloneMoney(fallback)
}
func resolveFeeAmount(payment *model.Payment, quote *orchestratorv1.PaymentQuote) *paymenttypes.Money { func resolveFeeAmount(payment *model.Payment, quote *orchestratorv1.PaymentQuote) *paymenttypes.Money {
if quote != nil && quote.GetExpectedFeeTotal() != nil { if quote != nil && quote.GetExpectedFeeTotal() != nil {
return moneyFromProto(quote.GetExpectedFeeTotal()) return moneyFromProto(quote.GetExpectedFeeTotal())

View File

@@ -15,7 +15,11 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
return nil, merrors.InvalidArgument("plan builder: plan template is required") return nil, merrors.InvalidArgument("plan builder: plan template is required")
} }
sourceAmount, err := requireMoney(cloneMoney(payment.Intent.Amount), "amount") intentAmount, err := requireMoney(cloneMoney(payment.Intent.Amount), "amount")
if err != nil {
return nil, err
}
sourceAmount, err := requireMoney(resolveDebitAmount(payment, quote, intentAmount), "debit amount")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -81,7 +85,11 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
Amount: cloneMoney(amount), Amount: cloneMoney(amount),
} }
if action == model.RailOperationSend || action == model.RailOperationFee || action == model.RailOperationObserveConfirm { needsGateway := action == model.RailOperationSend || action == model.RailOperationFee || action == model.RailOperationObserveConfirm
if (action == model.RailOperationBlock || action == model.RailOperationRelease) && tpl.Rail != model.RailLedger {
needsGateway = true
}
if needsGateway {
network := gatewayNetworkForRail(tpl.Rail, sourceRail, destRail, sourceNetwork, destNetwork) network := gatewayNetworkForRail(tpl.Rail, sourceRail, destRail, sourceNetwork, destNetwork)
instanceID := stepInstanceIDForRail(payment.Intent, tpl.Rail, sourceRail, destRail) instanceID := stepInstanceIDForRail(payment.Intent, tpl.Rail, sourceRail, destRail)
checkAmount := amount checkAmount := amount
@@ -129,6 +137,10 @@ func actionForOperation(operation string) (model.RailOperation, error) {
return model.RailOperationFee, nil return model.RailOperationFee, nil
case "send", "payout.card", "payout.crypto", "payout.fiat", "payin.crypto", "payin.fiat", "fund.crypto", "fund.card": case "send", "payout.card", "payout.crypto", "payout.fiat", "payin.crypto", "payin.fiat", "fund.crypto", "fund.card":
return model.RailOperationSend, nil return model.RailOperationSend, nil
case "block", "hold", "reserve", "ledger.block", "ledger.hold", "ledger.reserve":
return model.RailOperationBlock, nil
case "release", "unblock", "ledger.release":
return model.RailOperationRelease, nil
} }
switch strings.ToUpper(strings.TrimSpace(operation)) { switch strings.ToUpper(strings.TrimSpace(operation)) {
@@ -144,6 +156,10 @@ func actionForOperation(operation string) (model.RailOperation, error) {
return model.RailOperationObserveConfirm, nil return model.RailOperationObserveConfirm, nil
case string(model.RailOperationFXConvert): case string(model.RailOperationFXConvert):
return model.RailOperationFXConvert, nil return model.RailOperationFXConvert, nil
case string(model.RailOperationBlock):
return model.RailOperationBlock, nil
case string(model.RailOperationRelease):
return model.RailOperationRelease, nil
} }
return model.RailOperationUnspecified, merrors.InvalidArgument("plan builder: unsupported operation") return model.RailOperationUnspecified, merrors.InvalidArgument("plan builder: unsupported operation")
@@ -164,12 +180,20 @@ func stepAmountForAction(action model.RailOperation, rail, sourceRail, destRail
case model.RailOperationSend: case model.RailOperationSend:
switch rail { switch rail {
case sourceRail: case sourceRail:
if feeRequired {
return cloneMoney(settlementAmount), nil
}
return cloneMoney(sourceAmount), nil return cloneMoney(sourceAmount), nil
case destRail: case destRail:
return cloneMoney(payoutAmount), nil return cloneMoney(payoutAmount), nil
default: default:
return cloneMoney(settlementAmount), nil return cloneMoney(settlementAmount), nil
} }
case model.RailOperationBlock, model.RailOperationRelease:
if rail == model.RailLedger {
return cloneMoney(ledgerDebitAmount), nil
}
return cloneMoney(settlementAmount), nil
case model.RailOperationFee: case model.RailOperationFee:
if !feeRequired { if !feeRequired {
return nil, nil return nil, nil
@@ -197,6 +221,9 @@ func stepInstanceIDForRail(intent model.PaymentIntent, rail, sourceRail, destRai
func observeAmountForRail(rail model.Rail, source, settlement, payout *paymenttypes.Money) *paymenttypes.Money { func observeAmountForRail(rail model.Rail, source, settlement, payout *paymenttypes.Money) *paymenttypes.Money {
switch rail { switch rail {
case model.RailCrypto, model.RailFiatOnRamp: case model.RailCrypto, model.RailFiatOnRamp:
if settlement != nil {
return settlement
}
if source != nil { if source != nil {
return source return source
} }

View File

@@ -123,6 +123,14 @@ func (g *providerSettlementGateway) Observe(ctx context.Context, referenceID str
}, nil }, nil
} }
func (g *providerSettlementGateway) Block(ctx context.Context, req rail.BlockRequest) (rail.RailResult, error) {
return rail.RailResult{}, merrors.NotImplemented("provider settlement gateway: block not supported")
}
func (g *providerSettlementGateway) Release(ctx context.Context, req rail.ReleaseRequest) (rail.RailResult, error) {
return rail.RailResult{}, merrors.NotImplemented("provider settlement gateway: release not supported")
}
func buildProviderSettlementDestination(req rail.TransferRequest) *chainv1.TransferDestination { func buildProviderSettlementDestination(req rail.TransferRequest) *chainv1.TransferDestination {
destRef := strings.TrimSpace(req.ToAccountID) destRef := strings.TrimSpace(req.ToAccountID)
memo := strings.TrimSpace(req.DestinationMemo) memo := strings.TrimSpace(req.DestinationMemo)

View File

@@ -2,6 +2,7 @@ package orchestrator
import ( import (
"context" "context"
"errors"
"strings" "strings"
"time" "time"
@@ -132,10 +133,6 @@ func (s *Service) quoteFees(ctx context.Context, orgRef string, req *orchestrato
} }
func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1.PaymentIntent) (*chainv1.EstimateTransferFeeResponse, error) { func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1.PaymentIntent) (*chainv1.EstimateTransferFeeResponse, error) {
if !s.deps.gateway.available() {
return nil, nil
}
req := &chainv1.EstimateTransferFeeRequest{ req := &chainv1.EstimateTransferFeeRequest{
Amount: cloneProtoMoney(intent.GetAmount()), Amount: cloneProtoMoney(intent.GetAmount()),
} }
@@ -160,7 +157,28 @@ func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1
} }
} }
resp, err := s.deps.gateway.client.EstimateTransferFee(ctx, req) network := ""
if req.Asset != nil {
network = strings.ToUpper(strings.TrimSpace(req.Asset.GetChain()))
}
instanceID := strings.TrimSpace(intent.GetSource().GetInstanceId())
if instanceID == "" {
instanceID = strings.TrimSpace(intent.GetDestination().GetInstanceId())
}
client, _, err := s.resolveChainGatewayClient(ctx, network, moneyFromProto(req.Amount), []model.RailOperation{model.RailOperationSend}, instanceID, "")
if err != nil {
if errors.Is(err, merrors.ErrNoData) {
s.logger.Debug("network fee estimation skipped: gateway unavailable", zap.Error(err))
return nil, nil
}
s.logger.Warn("chain gateway resolution failed", zap.Error(err))
return nil, err
}
if client == nil {
return nil, nil
}
resp, err := client.EstimateTransferFee(ctx, req)
if err != nil { if err != nil {
s.logger.Warn("chain gateway fee estimation failed", zap.Error(err)) s.logger.Warn("chain gateway fee estimation failed", zap.Error(err))
return nil, merrors.Internal("chain_gateway_fee_estimation_failed") return nil, merrors.Internal("chain_gateway_fee_estimation_failed")

View File

@@ -12,6 +12,8 @@ type fakeRailGateway struct {
capabilities rail.RailCapabilities capabilities rail.RailCapabilities
sendFn func(context.Context, rail.TransferRequest) (rail.RailResult, error) sendFn func(context.Context, rail.TransferRequest) (rail.RailResult, error)
observeFn func(context.Context, string) (rail.ObserveResult, error) observeFn func(context.Context, string) (rail.ObserveResult, error)
blockFn func(context.Context, rail.BlockRequest) (rail.RailResult, error)
releaseFn func(context.Context, rail.ReleaseRequest) (rail.RailResult, error)
} }
func (f *fakeRailGateway) Rail() string { func (f *fakeRailGateway) Rail() string {
@@ -39,3 +41,17 @@ func (f *fakeRailGateway) Observe(ctx context.Context, referenceID string) (rail
} }
return rail.ObserveResult{ReferenceID: referenceID, Status: rail.TransferStatusPending}, nil return rail.ObserveResult{ReferenceID: referenceID, Status: rail.TransferStatusPending}, nil
} }
func (f *fakeRailGateway) Block(ctx context.Context, req rail.BlockRequest) (rail.RailResult, error) {
if f.blockFn != nil {
return f.blockFn(ctx, req)
}
return rail.RailResult{ReferenceID: req.IdempotencyKey, Status: rail.TransferStatusPending}, nil
}
func (f *fakeRailGateway) Release(ctx context.Context, req rail.ReleaseRequest) (rail.RailResult, error) {
if f.releaseFn != nil {
return f.releaseFn(ctx, req)
}
return rail.RailResult{ReferenceID: req.ReferenceID, Status: rail.TransferStatusPending}, nil
}

View File

@@ -46,17 +46,18 @@ type Service struct {
} }
type serviceDependencies struct { type serviceDependencies struct {
fees feesDependency fees feesDependency
ledger ledgerDependency ledger ledgerDependency
gateway gatewayDependency gateway gatewayDependency
railGateways railGatewayDependency railGateways railGatewayDependency
providerGateway providerGatewayDependency providerGateway providerGatewayDependency
oracle oracleDependency oracle oracleDependency
mntx mntxDependency mntx mntxDependency
gatewayRegistry GatewayRegistry gatewayRegistry GatewayRegistry
cardRoutes map[string]CardGatewayRoute gatewayInvokeResolver GatewayInvokeResolver
feeLedgerAccounts map[string]string cardRoutes map[string]CardGatewayRoute
planBuilder PlanBuilder feeLedgerAccounts map[string]string
planBuilder PlanBuilder
} }
type handlerSet struct { type handlerSet struct {
@@ -92,7 +93,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option)
engine := defaultPaymentEngine{svc: svc} engine := defaultPaymentEngine{svc: svc}
svc.h.commands = newPaymentCommandFactory(engine, svc.logger) svc.h.commands = newPaymentCommandFactory(engine, svc.logger)
svc.h.queries = newPaymentQueryHandler(svc.storage, svc.ensureRepository, svc.logger.Named("queries")) svc.h.queries = newPaymentQueryHandler(svc.storage, svc.ensureRepository, svc.logger.Named("queries"))
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger.Named("events"), svc.submitCardPayout, svc.resumePaymentPlan) svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger.Named("events"), svc.submitCardPayout, svc.resumePaymentPlan, svc.releasePaymentHold)
svc.comp.executor = newPaymentExecutor(&svc.deps, svc.logger.Named("payment_executor"), svc) svc.comp.executor = newPaymentExecutor(&svc.deps, svc.logger.Named("payment_executor"), svc)
svc.startGatewayConsumers() svc.startGatewayConsumers()
@@ -107,7 +108,7 @@ func (s *Service) ensureHandlers() {
s.h.queries = newPaymentQueryHandler(s.storage, s.ensureRepository, s.logger.Named("queries")) s.h.queries = newPaymentQueryHandler(s.storage, s.ensureRepository, s.logger.Named("queries"))
} }
if s.h.events == nil { if s.h.events == nil {
s.h.events = newPaymentEventHandler(s.storage, s.ensureRepository, s.logger.Named("events"), s.submitCardPayout, s.resumePaymentPlan) s.h.events = newPaymentEventHandler(s.storage, s.ensureRepository, s.logger.Named("events"), s.submitCardPayout, s.resumePaymentPlan, s.releasePaymentHold)
} }
if s.comp.executor == nil { if s.comp.executor == nil {
s.comp.executor = newPaymentExecutor(&s.deps, s.logger.Named("payment_executor"), s) s.comp.executor = newPaymentExecutor(&s.deps, s.logger.Named("payment_executor"), s)
@@ -199,3 +200,11 @@ func (s *Service) resumePaymentPlan(ctx context.Context, store storage.PaymentsS
s.ensureHandlers() s.ensureHandlers()
return s.comp.executor.executePaymentPlan(ctx, store, payment, nil) return s.comp.executor.executePaymentPlan(ctx, store, payment, nil)
} }
func (s *Service) releasePaymentHold(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error {
if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 {
return nil
}
s.ensureHandlers()
return s.comp.executor.releasePaymentHold(ctx, store, payment)
}

View File

@@ -125,7 +125,7 @@ func TestExecutePayment_ChainFailure(t *testing.T) {
return rail.RailResult{}, errors.New("chain failure") return rail.RailResult{}, errors.New("chain failure")
}, },
}, },
}, nil, nil, nil), }, nil, nil, nil, nil),
gatewayRegistry: &stubGatewayRegistry{ gatewayRegistry: &stubGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{ items: []*model.GatewayInstanceDescriptor{
{ {
@@ -204,7 +204,7 @@ func TestProcessTransferUpdateHandler_Settled(t *testing.T) {
clock: testClock{now: time.Now()}, clock: testClock{now: time.Now()},
storage: &stubRepository{store: store}, storage: &stubRepository{store: store},
} }
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil) svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil, nil)
req := &orchestratorv1.ProcessTransferUpdateRequest{ req := &orchestratorv1.ProcessTransferUpdateRequest{
Event: &chainv1.TransferStatusChangedEvent{ Event: &chainv1.TransferStatusChangedEvent{
@@ -278,7 +278,7 @@ func TestProcessTransferUpdateHandler_CardFundingWaitsForSources(t *testing.T) {
setExecutionStepStatus(step, executionStepStatusSubmitted) setExecutionStepStatus(step, executionStepStatusSubmitted)
return nil return nil
} }
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, submit, nil) svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, submit, nil, nil)
req := &orchestratorv1.ProcessTransferUpdateRequest{ req := &orchestratorv1.ProcessTransferUpdateRequest{
Event: &chainv1.TransferStatusChangedEvent{ Event: &chainv1.TransferStatusChangedEvent{
@@ -349,7 +349,7 @@ func TestProcessDepositObservedHandler_MatchesPayment(t *testing.T) {
clock: testClock{now: time.Now()}, clock: testClock{now: time.Now()},
storage: &stubRepository{store: store}, storage: &stubRepository{store: store},
} }
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil) svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil, nil)
req := &orchestratorv1.ProcessDepositObservedRequest{ req := &orchestratorv1.ProcessDepositObservedRequest{
Event: &chainv1.WalletDepositObservedEvent{ Event: &chainv1.WalletDepositObservedEvent{

View File

@@ -87,6 +87,8 @@ const (
RailOperationFee RailOperation = "FEE" RailOperationFee RailOperation = "FEE"
RailOperationObserveConfirm RailOperation = "OBSERVE_CONFIRM" RailOperationObserveConfirm RailOperation = "OBSERVE_CONFIRM"
RailOperationFXConvert RailOperation = "FX_CONVERT" RailOperationFXConvert RailOperation = "FX_CONVERT"
RailOperationBlock RailOperation = "BLOCK"
RailOperationRelease RailOperation = "RELEASE"
) )
// RailCapabilities are declared per gateway instance. // RailCapabilities are declared per gateway instance.
@@ -96,6 +98,8 @@ type RailCapabilities struct {
CanReadBalance bool `bson:"canReadBalance,omitempty" json:"canReadBalance,omitempty"` CanReadBalance bool `bson:"canReadBalance,omitempty" json:"canReadBalance,omitempty"`
CanSendFee bool `bson:"canSendFee,omitempty" json:"canSendFee,omitempty"` CanSendFee bool `bson:"canSendFee,omitempty" json:"canSendFee,omitempty"`
RequiresObserveConfirm bool `bson:"requiresObserveConfirm,omitempty" json:"requiresObserveConfirm,omitempty"` RequiresObserveConfirm bool `bson:"requiresObserveConfirm,omitempty" json:"requiresObserveConfirm,omitempty"`
CanBlock bool `bson:"canBlock,omitempty" json:"canBlock,omitempty"`
CanRelease bool `bson:"canRelease,omitempty" json:"canRelease,omitempty"`
} }
// LimitsOverride applies per-currency overrides for limits. // LimitsOverride applies per-currency overrides for limits.
@@ -125,6 +129,7 @@ type GatewayInstanceDescriptor struct {
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"` InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
Rail Rail `bson:"rail" json:"rail"` Rail Rail `bson:"rail" json:"rail"`
Network string `bson:"network,omitempty" json:"network,omitempty"` Network string `bson:"network,omitempty" json:"network,omitempty"`
InvokeURI string `bson:"invokeUri,omitempty" json:"invokeUri,omitempty"`
Currencies []string `bson:"currencies,omitempty" json:"currencies,omitempty"` Currencies []string `bson:"currencies,omitempty" json:"currencies,omitempty"`
Capabilities RailCapabilities `bson:"capabilities,omitempty" json:"capabilities,omitempty"` Capabilities RailCapabilities `bson:"capabilities,omitempty" json:"capabilities,omitempty"`
Limits Limits `bson:"limits,omitempty" json:"limits,omitempty"` Limits Limits `bson:"limits,omitempty" json:"limits,omitempty"`

View File

@@ -24,6 +24,8 @@ type RailCapabilities struct {
CanReadBalance bool CanReadBalance bool
CanSendFee bool CanSendFee bool
RequiresObserveConfirm bool RequiresObserveConfirm bool
CanBlock bool
CanRelease bool
} }
// FeeBreakdown provides a gateway-level fee description. // FeeBreakdown provides a gateway-level fee description.
@@ -49,6 +51,25 @@ type TransferRequest struct {
DestinationMemo string DestinationMemo string
} }
// BlockRequest defines the inputs for reserving value through a rail gateway.
type BlockRequest struct {
OrganizationRef string
AccountID string
Currency string
Amount string
IdempotencyKey string
Metadata map[string]string
ClientReference string
}
// ReleaseRequest defines the inputs for releasing a prior block.
type ReleaseRequest struct {
ReferenceID string
IdempotencyKey string
Metadata map[string]string
ClientReference string
}
// RailResult reports the outcome of a rail gateway operation. // RailResult reports the outcome of a rail gateway operation.
type RailResult struct { type RailResult struct {
ReferenceID string ReferenceID string
@@ -80,4 +101,6 @@ type RailGateway interface {
Capabilities() RailCapabilities Capabilities() RailCapabilities
Send(ctx context.Context, req TransferRequest) (RailResult, error) Send(ctx context.Context, req TransferRequest) (RailResult, error)
Observe(ctx context.Context, referenceID string) (ObserveResult, error) Observe(ctx context.Context, referenceID string) (ObserveResult, error)
Block(ctx context.Context, req BlockRequest) (RailResult, error)
Release(ctx context.Context, req ReleaseRequest) (RailResult, error)
} }

View File

@@ -12,6 +12,7 @@ import (
type InternalLedger interface { type InternalLedger interface {
ReadBalance(ctx context.Context, accountID string) (*moneyv1.Money, error) ReadBalance(ctx context.Context, accountID string) (*moneyv1.Money, error)
CreateTransaction(ctx context.Context, tx LedgerTx) (string, error) CreateTransaction(ctx context.Context, tx LedgerTx) (string, error)
TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error)
HoldBalance(ctx context.Context, accountID string, amount string) error HoldBalance(ctx context.Context, accountID string, amount string) error
} }

View File

@@ -45,6 +45,8 @@ enum RailOperation {
RAIL_OPERATION_FEE = 4; RAIL_OPERATION_FEE = 4;
RAIL_OPERATION_OBSERVE_CONFIRM = 5; RAIL_OPERATION_OBSERVE_CONFIRM = 5;
RAIL_OPERATION_FX_CONVERT = 6; RAIL_OPERATION_FX_CONVERT = 6;
RAIL_OPERATION_BLOCK = 7;
RAIL_OPERATION_RELEASE = 8;
} }
// Limits in minor units, e.g. cents // Limits in minor units, e.g. cents
@@ -124,6 +126,8 @@ message RailCapabilities {
bool can_read_balance = 3; bool can_read_balance = 3;
bool can_send_fee = 4; bool can_send_fee = 4;
bool requires_observe_confirm = 5; bool requires_observe_confirm = 5;
bool can_block = 6;
bool can_release = 7;
} }
message LimitsOverride { message LimitsOverride {

View File

@@ -7,6 +7,7 @@ environment:
# Add regular dependencies here. # Add regular dependencies here.
dependencies: dependencies:
analyzer: 9.0.0
json_annotation: ^4.9.0 json_annotation: ^4.9.0
http: ^1.1.0 http: ^1.1.0
provider: ^6.0.5 provider: ^6.0.5

View File

@@ -13,6 +13,7 @@ import 'package:pshared/provider/permissions.dart';
import 'package:pshared/provider/account.dart'; import 'package:pshared/provider/account.dart';
import 'package:pshared/provider/organizations.dart'; import 'package:pshared/provider/organizations.dart';
import 'package:pshared/provider/accounts/employees.dart'; import 'package:pshared/provider/accounts/employees.dart';
import 'package:pshared/provider/recipient/pmethods.dart';
import 'package:pshared/provider/recipient/provider.dart'; import 'package:pshared/provider/recipient/provider.dart';
import 'package:pshared/provider/payment/wallets.dart'; import 'package:pshared/provider/payment/wallets.dart';
import 'package:pshared/provider/invitations.dart'; import 'package:pshared/provider/invitations.dart';

View File

@@ -75,7 +75,7 @@ class InvitationFormFields extends StatelessWidget {
SizedBox( SizedBox(
width: 260, width: 260,
child: DropdownButtonFormField<String>( child: DropdownButtonFormField<String>(
value: selectedRoleRef, initialValue: selectedRoleRef,
items: roles.map((role) => DropdownMenuItem( items: roles.map((role) => DropdownMenuItem(
value: role.storable.id, value: role.storable.id,
child: Text(role.describable.name), child: Text(role.describable.name),

View File

@@ -28,6 +28,7 @@ environment:
# the latest version available on pub.dev. To see which dependencies have newer # the latest version available on pub.dev. To see which dependencies have newer
# versions available, run `flutter pub outdated`. # versions available, run `flutter pub outdated`.
dependencies: dependencies:
analyzer: 9.0.0
amplitude_flutter: ^4.0.1 amplitude_flutter: ^4.0.1
flutter: flutter:
sdk: flutter sdk: flutter
@@ -140,8 +141,3 @@ flutter_launcher_icons:
web: web:
generate: true generate: true
image_path: "resources/logo.png" image_path: "resources/logo.png"
# temporary
dependency_overrides:
analyzer: ">=9.0.0 <10.0.0"