Merge pull request 'hr-263' (#264) from hr-263 into main
Some checks failed
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/discovery Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/gateway_chain Pipeline was successful
ci/woodpecker/push/gateway_mntx Pipeline was successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/nats Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline failed
ci/woodpecker/push/frontend Pipeline failed

Reviewed-on: #264
This commit was merged in pull request #264.
This commit is contained in:
2026-01-16 13:38:43 +00:00
46 changed files with 2089 additions and 351 deletions

View File

@@ -147,6 +147,14 @@ func (g *chainRailGateway) Observe(ctx context.Context, referenceID string) (rai
}, nil
}
func (g *chainRailGateway) Block(ctx context.Context, req rail.BlockRequest) (rail.RailResult, error) {
return rail.RailResult{}, merrors.NotImplemented("chain gateway: block not supported")
}
func (g *chainRailGateway) Release(ctx context.Context, req rail.ReleaseRequest) (rail.RailResult, error) {
return rail.RailResult{}, merrors.NotImplemented("chain gateway: release not supported")
}
func (g *chainRailGateway) resolveDestination(ctx context.Context, destRef, memo string) (*chainv1.TransferDestination, error) {
managed, err := g.isManagedWallet(ctx, destRef)
if err != nil {

View File

@@ -22,7 +22,7 @@ require (
require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260112020553-64c30dda3cfd // indirect
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260116123424-9c8b2bad3688 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.24.4 // indirect
github.com/bmatcuk/doublestar/v4 v4.9.2 // indirect

View File

@@ -6,8 +6,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260112020553-64c30dda3cfd h1:ifR6oQZU+7Lqemu0dqf6X4pVWuzmMeKX6WtwZ87rH+M=
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260112020553-64c30dda3cfd/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI=
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260116123424-9c8b2bad3688 h1:heyVIwznmKcWu1dhijYnRnHTvRzVgWduNlLcRXFVxP8=
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20260116123424-9c8b2bad3688/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI=
github.com/VictoriaMetrics/fastcache v1.13.0 h1:AW4mheMR5Vd9FkAPUv+NH6Nhw+fmbTMGMsNAoA/+4G0=
github.com/VictoriaMetrics/fastcache v1.13.0/go.mod h1:hHXhl4DA2fTL2HTZDJFXWgW0LNjo6B+4aj2Wmng3TjU=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=

View File

@@ -0,0 +1,90 @@
package client
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"google.golang.org/grpc"
)
type stubConnector struct {
submitFn func(ctx context.Context, req *connectorv1.SubmitOperationRequest) (*connectorv1.SubmitOperationResponse, error)
}
func (s *stubConnector) OpenAccount(context.Context, *connectorv1.OpenAccountRequest, ...grpc.CallOption) (*connectorv1.OpenAccountResponse, error) {
return nil, nil
}
func (s *stubConnector) GetAccount(context.Context, *connectorv1.GetAccountRequest, ...grpc.CallOption) (*connectorv1.GetAccountResponse, error) {
return nil, nil
}
func (s *stubConnector) ListAccounts(context.Context, *connectorv1.ListAccountsRequest, ...grpc.CallOption) (*connectorv1.ListAccountsResponse, error) {
return nil, nil
}
func (s *stubConnector) GetBalance(context.Context, *connectorv1.GetBalanceRequest, ...grpc.CallOption) (*connectorv1.GetBalanceResponse, error) {
return nil, nil
}
func (s *stubConnector) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOperationRequest, _ ...grpc.CallOption) (*connectorv1.SubmitOperationResponse, error) {
if s.submitFn != nil {
return s.submitFn(ctx, req)
}
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{OperationId: "op-1"}}, nil
}
func (s *stubConnector) GetOperation(context.Context, *connectorv1.GetOperationRequest, ...grpc.CallOption) (*connectorv1.GetOperationResponse, error) {
return nil, nil
}
func (s *stubConnector) ListOperations(context.Context, *connectorv1.ListOperationsRequest, ...grpc.CallOption) (*connectorv1.ListOperationsResponse, error) {
return nil, nil
}
func TestTransferInternal_SubmitsTransferOperation(t *testing.T) {
ctx := context.Background()
var captured *connectorv1.Operation
stub := &stubConnector{
submitFn: func(ctx context.Context, req *connectorv1.SubmitOperationRequest) (*connectorv1.SubmitOperationResponse, error) {
captured = req.GetOperation()
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{OperationId: "op-1"}}, nil
},
}
client := NewWithClient(Config{}, stub)
resp, err := client.TransferInternal(ctx, &ledgerv1.TransferRequest{
IdempotencyKey: "id-1",
OrganizationRef: "org-1",
FromLedgerAccountRef: "acct-from",
ToLedgerAccountRef: "acct-to",
Money: &moneyv1.Money{Currency: "USD", Amount: "10"},
Description: "hold",
})
require.NoError(t, err)
require.NotNil(t, resp)
require.NotNil(t, captured)
assert.Equal(t, connectorv1.OperationType_TRANSFER, captured.GetType())
assert.Equal(t, "id-1", captured.GetIdempotencyKey())
assert.Equal(t, "acct-from", captured.GetFrom().GetAccount().GetAccountId())
assert.Equal(t, "acct-to", captured.GetTo().GetAccount().GetAccountId())
assert.Equal(t, ledgerConnectorID, captured.GetFrom().GetAccount().GetConnectorId())
assert.Equal(t, ledgerConnectorID, captured.GetTo().GetAccount().GetConnectorId())
assert.Equal(t, "USD", captured.GetMoney().GetCurrency())
assert.Equal(t, "10", captured.GetMoney().GetAmount())
params := captured.GetParams().AsMap()
assert.Equal(t, "org-1", params["organization_ref"])
assert.Equal(t, "hold", params["description"])
assert.Equal(t, "op-1", resp.GetJournalEntryRef())
assert.Equal(t, ledgerv1.EntryType_ENTRY_TRANSFER, resp.GetEntryType())
}

View File

@@ -34,42 +34,7 @@ messaging:
reconnect_wait: 5
buffer_size: 1024
fees:
address: "sendico_billing_fees:50060"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
ledger:
address: "sendico_ledger:50052"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
gateway:
address: "sendico_chain_gateway:50070"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
payment_gateway:
address: "sendico_tgsettle_gateway:50080"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
mntx:
address: "sendico_mntx_gateway:50075"
dial_timeout_seconds: 5
call_timeout_seconds: 15
insecure: true
oracle:
address: "sendico_fx_oracle:50051"
dial_timeout_seconds: 5
call_timeout_seconds: 3
insecure: true
# Service endpoints are sourced from discovery; no static overrides.
card_gateways:
monetix:
funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF"
@@ -78,14 +43,4 @@ card_gateways:
fee_ledger_accounts:
monetix: "ledger:fees:monetix"
# gateway_instances:
# - id: "crypto-tron"
# rail: "CRYPTO"
# network: "TRON"
# currencies: ["USDT"]
# capabilities:
# can_pay_out: true
# can_send_fee: true
# limits:
# min_amount: "0"
# max_amount: "100000"
# Gateway instances and capabilities are sourced from service discovery.

View File

@@ -48,10 +48,19 @@ func buildFeeLedgerAccounts(src map[string]string) map[string]string {
}
func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, registry *discovery.Registry) orchestrator.GatewayRegistry {
static := buildGatewayInstances(logger, src)
staticRegistry := orchestrator.NewGatewayRegistry(logger, static)
discoveryRegistry := orchestrator.NewDiscoveryGatewayRegistry(logger, registry)
return orchestrator.NewCompositeGatewayRegistry(logger, staticRegistry, discoveryRegistry)
if logger != nil {
logger = logger.Named("gateway_registry")
}
if len(src) > 0 && logger != nil {
logger.Warn("Static gateway configuration ignored; using discovery registry only")
}
if registry == nil {
if logger != nil {
logger.Warn("Discovery registry unavailable; gateway routing disabled")
}
return nil
}
return orchestrator.NewDiscoveryGatewayRegistry(logger, registry)
}
func buildRailGateways(chainClient chainclient.Client, paymentGatewayClient chainclient.Client, src []gatewayInstanceConfig) map[string]rail.RailGateway {
@@ -76,6 +85,8 @@ func buildRailGateways(chainClient chainclient.Client, paymentGatewayClient chai
CanReadBalance: inst.Capabilities.CanReadBalance,
CanSendFee: inst.Capabilities.CanSendFee,
RequiresObserveConfirm: inst.Capabilities.RequiresObserveConfirm,
CanBlock: inst.Capabilities.CanBlock,
CanRelease: inst.Capabilities.CanRelease,
},
}
switch inst.Rail {
@@ -135,6 +146,8 @@ func buildGatewayInstances(logger mlogger.Logger, src []gatewayInstanceConfig) [
CanReadBalance: cfg.Capabilities.CanReadBalance,
CanSendFee: cfg.Capabilities.CanSendFee,
RequiresObserveConfirm: cfg.Capabilities.RequiresObserveConfirm,
CanBlock: cfg.Capabilities.CanBlock,
CanRelease: cfg.Capabilities.CanRelease,
},
Limits: buildGatewayLimits(cfg.Limits),
Version: strings.TrimSpace(cfg.Version),

View File

@@ -155,22 +155,8 @@ func (i *Imp) initOracleClient(cfg clientConfig) oracleclient.Client {
}
func (i *Imp) closeClients() {
if i.ledgerClient != nil {
_ = i.ledgerClient.Close()
}
if i.gatewayClient != nil {
_ = i.gatewayClient.Close()
}
if i.paymentGatewayClient != nil {
_ = i.paymentGatewayClient.Close()
}
if i.mntxClient != nil {
_ = i.mntxClient.Close()
}
if i.oracleClient != nil {
_ = i.oracleClient.Close()
}
if i.feesConn != nil {
_ = i.feesConn.Close()
if i.discoveryClients != nil {
i.discoveryClients.Close()
i.discoveryClients = nil
}
}

View File

@@ -54,6 +54,8 @@ type gatewayCapabilitiesConfig struct {
CanReadBalance bool `yaml:"can_read_balance"`
CanSendFee bool `yaml:"can_send_fee"`
RequiresObserveConfirm bool `yaml:"requires_observe_confirm"`
CanBlock bool `yaml:"can_block"`
CanRelease bool `yaml:"can_release"`
}
type limitsConfig struct {

View File

@@ -2,7 +2,6 @@ package serverimp
import (
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
@@ -12,45 +11,26 @@ import (
type orchestratorDeps struct {
feesClient feesv1.FeeEngineClient
ledgerClient ledgerclient.Client
gatewayClient chainclient.Client
paymentGatewayClient chainclient.Client
mntxClient mntxclient.Client
oracleClient oracleclient.Client
gatewayInvokeResolver orchestrator.GatewayInvokeResolver
}
func (i *Imp) initDependencies(cfg *config) *orchestratorDeps {
deps := &orchestratorDeps{}
if cfg == nil {
if i.discoveryReg == nil {
if i.logger != nil {
i.logger.Warn("Discovery registry unavailable; downstream clients disabled")
}
return deps
}
deps.feesClient, i.feesConn = i.initFeesClient(cfg.Fees)
deps.ledgerClient = i.initLedgerClient(cfg.Ledger)
if deps.ledgerClient != nil {
i.ledgerClient = deps.ledgerClient
}
deps.gatewayClient = i.initGatewayClient(cfg.Gateway)
if deps.gatewayClient != nil {
i.gatewayClient = deps.gatewayClient
}
deps.paymentGatewayClient = i.initPaymentGatewayClient(cfg.PaymentGateway)
if deps.paymentGatewayClient != nil {
i.paymentGatewayClient = deps.paymentGatewayClient
}
deps.mntxClient = i.initMntxClient(cfg.Mntx)
if deps.mntxClient != nil {
i.mntxClient = deps.mntxClient
}
deps.oracleClient = i.initOracleClient(cfg.Oracle)
if deps.oracleClient != nil {
i.oracleClient = deps.oracleClient
}
i.discoveryClients = newDiscoveryClientResolver(i.logger, i.discoveryReg)
deps.feesClient = &discoveryFeeClient{resolver: i.discoveryClients}
deps.ledgerClient = &discoveryLedgerClient{resolver: i.discoveryClients}
deps.oracleClient = &discoveryOracleClient{resolver: i.discoveryClients}
deps.mntxClient = &discoveryMntxClient{resolver: i.discoveryClients}
deps.gatewayInvokeResolver = discoveryGatewayInvokeResolver{resolver: i.discoveryClients}
return deps
}
@@ -65,21 +45,15 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest
if deps.ledgerClient != nil {
opts = append(opts, orchestrator.WithLedgerClient(deps.ledgerClient))
}
if deps.gatewayClient != nil {
opts = append(opts, orchestrator.WithChainGatewayClient(deps.gatewayClient))
}
if deps.paymentGatewayClient != nil {
opts = append(opts, orchestrator.WithProviderSettlementGatewayClient(deps.paymentGatewayClient))
}
if railGateways := buildRailGateways(deps.gatewayClient, deps.paymentGatewayClient, cfg.GatewayInstances); len(railGateways) > 0 {
opts = append(opts, orchestrator.WithRailGateways(railGateways))
}
if deps.mntxClient != nil {
opts = append(opts, orchestrator.WithMntxGateway(deps.mntxClient))
}
if deps.oracleClient != nil {
opts = append(opts, orchestrator.WithOracleClient(deps.oracleClient))
}
if deps.gatewayInvokeResolver != nil {
opts = append(opts, orchestrator.WithGatewayInvokeResolver(deps.gatewayInvokeResolver))
}
if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 {
opts = append(opts, orchestrator.WithCardGatewayRoutes(routes))
}

View File

@@ -0,0 +1,477 @@
package serverimp
import (
"context"
"fmt"
"net"
"net/url"
"sort"
"strings"
"sync"
"time"
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
const discoveryLogThrottle = 30 * time.Second
var (
feesServiceNames = []string{"BILLING_FEES", string(mservice.FeePlans)}
ledgerServiceNames = []string{"LEDGER", string(mservice.Ledger)}
oracleServiceNames = []string{"FX_ORACLE", string(mservice.FXOracle)}
mntxServiceNames = []string{"CARD_PAYOUT_RAIL_GATEWAY", string(mservice.MntxGateway)}
)
type discoveryEndpoint struct {
address string
insecure bool
raw string
}
func (e discoveryEndpoint) key() string {
return fmt.Sprintf("%s|%t", e.address, e.insecure)
}
type discoveryClientResolver struct {
logger mlogger.Logger
registry *discovery.Registry
mu sync.Mutex
feesConn *grpc.ClientConn
feesEndpoint discoveryEndpoint
ledgerClient ledgerclient.Client
ledgerEndpoint discoveryEndpoint
oracleClient oracleclient.Client
oracleEndpoint discoveryEndpoint
mntxClient mntxclient.Client
mntxEndpoint discoveryEndpoint
chainClients map[string]chainclient.Client
lastSelection map[string]string
lastMissing map[string]time.Time
}
func newDiscoveryClientResolver(logger mlogger.Logger, registry *discovery.Registry) *discoveryClientResolver {
if logger != nil {
logger = logger.Named("discovery_clients")
}
return &discoveryClientResolver{
logger: logger,
registry: registry,
chainClients: map[string]chainclient.Client{},
lastSelection: map[string]string{},
lastMissing: map[string]time.Time{},
}
}
func (r *discoveryClientResolver) Close() {
r.mu.Lock()
defer r.mu.Unlock()
if r.feesConn != nil {
_ = r.feesConn.Close()
r.feesConn = nil
}
if r.ledgerClient != nil {
_ = r.ledgerClient.Close()
r.ledgerClient = nil
}
if r.oracleClient != nil {
_ = r.oracleClient.Close()
r.oracleClient = nil
}
if r.mntxClient != nil {
_ = r.mntxClient.Close()
r.mntxClient = nil
}
for key, client := range r.chainClients {
if client != nil {
_ = client.Close()
}
delete(r.chainClients, key)
}
}
func (r *discoveryClientResolver) FeesAvailable() bool {
_, ok := r.findEntry("fees", feesServiceNames, "", "")
return ok
}
func (r *discoveryClientResolver) LedgerAvailable() bool {
_, ok := r.findEntry("ledger", ledgerServiceNames, "", "")
return ok
}
func (r *discoveryClientResolver) OracleAvailable() bool {
_, ok := r.findEntry("oracle", oracleServiceNames, "", "")
return ok
}
func (r *discoveryClientResolver) MntxAvailable() bool {
_, ok := r.findEntry("mntx", mntxServiceNames, "", "")
return ok
}
func (r *discoveryClientResolver) FeesClient(ctx context.Context) (feesv1.FeeEngineClient, error) {
entry, ok := r.findEntry("fees", feesServiceNames, "", "")
if !ok {
return nil, merrors.NoData("discovery: fees service unavailable")
}
endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI)
if err != nil {
r.logMissing("fees", "invalid fees invoke uri", entry.InvokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if r.feesConn == nil || r.feesEndpoint.key() != endpoint.key() || r.feesEndpoint.address != endpoint.address {
if r.feesConn != nil {
_ = r.feesConn.Close()
r.feesConn = nil
}
conn, dialErr := dialGrpc(ctx, endpoint)
if dialErr != nil {
r.logMissing("fees", "failed to dial fees service", endpoint.raw, dialErr)
return nil, dialErr
}
r.feesConn = conn
r.feesEndpoint = endpoint
}
return feesv1.NewFeeEngineClient(r.feesConn), nil
}
func (r *discoveryClientResolver) LedgerClient(ctx context.Context) (ledgerclient.Client, error) {
entry, ok := r.findEntry("ledger", ledgerServiceNames, "", "")
if !ok {
return nil, merrors.NoData("discovery: ledger service unavailable")
}
endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI)
if err != nil {
r.logMissing("ledger", "invalid ledger invoke uri", entry.InvokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if r.ledgerClient == nil || r.ledgerEndpoint.key() != endpoint.key() || r.ledgerEndpoint.address != endpoint.address {
if r.ledgerClient != nil {
_ = r.ledgerClient.Close()
r.ledgerClient = nil
}
client, dialErr := ledgerclient.New(ctx, ledgerclient.Config{
Address: endpoint.address,
Insecure: endpoint.insecure,
})
if dialErr != nil {
r.logMissing("ledger", "failed to dial ledger service", endpoint.raw, dialErr)
return nil, dialErr
}
r.ledgerClient = client
r.ledgerEndpoint = endpoint
}
return r.ledgerClient, nil
}
func (r *discoveryClientResolver) OracleClient(ctx context.Context) (oracleclient.Client, error) {
entry, ok := r.findEntry("oracle", oracleServiceNames, "", "")
if !ok {
return nil, merrors.NoData("discovery: oracle service unavailable")
}
endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI)
if err != nil {
r.logMissing("oracle", "invalid oracle invoke uri", entry.InvokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if r.oracleClient == nil || r.oracleEndpoint.key() != endpoint.key() || r.oracleEndpoint.address != endpoint.address {
if r.oracleClient != nil {
_ = r.oracleClient.Close()
r.oracleClient = nil
}
client, dialErr := oracleclient.New(ctx, oracleclient.Config{
Address: endpoint.address,
Insecure: endpoint.insecure,
})
if dialErr != nil {
r.logMissing("oracle", "failed to dial oracle service", endpoint.raw, dialErr)
return nil, dialErr
}
r.oracleClient = client
r.oracleEndpoint = endpoint
}
return r.oracleClient, nil
}
func (r *discoveryClientResolver) MntxClient(ctx context.Context) (mntxclient.Client, error) {
entry, ok := r.findEntry("mntx", mntxServiceNames, "", "")
if !ok {
return nil, merrors.NoData("discovery: mntx service unavailable")
}
endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI)
if err != nil {
r.logMissing("mntx", "invalid mntx invoke uri", entry.InvokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if r.mntxClient == nil || r.mntxEndpoint.key() != endpoint.key() || r.mntxEndpoint.address != endpoint.address {
if r.mntxClient != nil {
_ = r.mntxClient.Close()
r.mntxClient = nil
}
if !endpoint.insecure && r.logger != nil {
r.logger.Warn("Mntx gateway does not support TLS, falling back to insecure transport",
zap.String("invoke_uri", endpoint.raw))
}
client, dialErr := mntxclient.New(ctx, mntxclient.Config{
Address: endpoint.address,
})
if dialErr != nil {
r.logMissing("mntx", "failed to dial mntx service", endpoint.raw, dialErr)
return nil, dialErr
}
r.mntxClient = client
r.mntxEndpoint = endpoint
}
return r.mntxClient, nil
}
func (r *discoveryClientResolver) ChainClient(ctx context.Context, invokeURI string) (chainclient.Client, error) {
endpoint, err := parseDiscoveryEndpoint(invokeURI)
if err != nil {
r.logMissing("chain", "invalid chain gateway invoke uri", invokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if client, ok := r.chainClients[endpoint.key()]; ok && client != nil {
return client, nil
}
client, dialErr := chainclient.New(ctx, chainclient.Config{
Address: endpoint.address,
Insecure: endpoint.insecure,
})
if dialErr != nil {
r.logMissing("chain", "failed to dial chain gateway", endpoint.raw, dialErr)
return nil, dialErr
}
r.chainClients[endpoint.key()] = client
return client, nil
}
func (r *discoveryClientResolver) PaymentGatewayClient(ctx context.Context, invokeURI string) (chainclient.Client, error) {
endpoint, err := parseDiscoveryEndpoint(invokeURI)
if err != nil {
r.logMissing("payment_gateway", "invalid payment gateway invoke uri", invokeURI, err)
return nil, err
}
r.mu.Lock()
defer r.mu.Unlock()
if client, ok := r.chainClients[endpoint.key()]; ok && client != nil {
return client, nil
}
client, dialErr := chainclient.New(ctx, chainclient.Config{
Address: endpoint.address,
Insecure: endpoint.insecure,
})
if dialErr != nil {
r.logMissing("payment_gateway", "failed to dial payment gateway", endpoint.raw, dialErr)
return nil, dialErr
}
r.chainClients[endpoint.key()] = client
return client, nil
}
func (r *discoveryClientResolver) findEntry(key string, services []string, rail string, network string) (*discovery.RegistryEntry, bool) {
if r == nil || r.registry == nil {
r.logMissing(key, "discovery registry unavailable", "", nil)
return nil, false
}
entries := r.registry.List(time.Now(), true)
matches := make([]discovery.RegistryEntry, 0)
for _, entry := range entries {
if !matchesService(entry.Service, services) {
continue
}
if rail != "" && !strings.EqualFold(strings.TrimSpace(entry.Rail), rail) {
continue
}
if network != "" && !strings.EqualFold(strings.TrimSpace(entry.Network), network) {
continue
}
matches = append(matches, entry)
}
if len(matches) == 0 {
r.logMissing(key, "discovery entry missing", "", nil)
return nil, false
}
sort.Slice(matches, func(i, j int) bool {
if matches[i].RoutingPriority != matches[j].RoutingPriority {
return matches[i].RoutingPriority > matches[j].RoutingPriority
}
if matches[i].ID != matches[j].ID {
return matches[i].ID < matches[j].ID
}
return matches[i].InstanceID < matches[j].InstanceID
})
entry := matches[0]
entryKey := discoveryEntryKey(entry)
r.logSelection(key, entryKey, entry)
return &entry, true
}
func (r *discoveryClientResolver) logSelection(key, entryKey string, entry discovery.RegistryEntry) {
if r.logger == nil {
return
}
r.mu.Lock()
last := r.lastSelection[key]
if last == entryKey {
r.mu.Unlock()
return
}
r.lastSelection[key] = entryKey
r.mu.Unlock()
r.logger.Info("Discovery endpoint selected",
zap.String("service_key", key),
zap.String("service", entry.Service),
zap.String("rail", entry.Rail),
zap.String("network", entry.Network),
zap.String("entry_id", entry.ID),
zap.String("instance_id", entry.InstanceID),
zap.String("invoke_uri", entry.InvokeURI))
}
func (r *discoveryClientResolver) logMissing(key, message, invokeURI string, err error) {
if r.logger == nil {
return
}
now := time.Now()
r.mu.Lock()
last := r.lastMissing[key]
if !last.IsZero() && now.Sub(last) < discoveryLogThrottle {
r.mu.Unlock()
return
}
r.lastMissing[key] = now
r.mu.Unlock()
fields := []zap.Field{zap.String("service_key", key)}
if invokeURI != "" {
fields = append(fields, zap.String("invoke_uri", strings.TrimSpace(invokeURI)))
}
if err != nil {
fields = append(fields, zap.Error(err))
}
r.logger.Warn(message, fields...)
}
func discoveryEntryKey(entry discovery.RegistryEntry) string {
return fmt.Sprintf("%s|%s|%s|%s|%s|%s",
strings.TrimSpace(entry.Service),
strings.TrimSpace(entry.ID),
strings.TrimSpace(entry.InstanceID),
strings.TrimSpace(entry.Rail),
strings.TrimSpace(entry.Network),
strings.TrimSpace(entry.InvokeURI))
}
func matchesService(service string, candidates []string) bool {
service = strings.TrimSpace(service)
if service == "" || len(candidates) == 0 {
return false
}
for _, candidate := range candidates {
if strings.EqualFold(service, strings.TrimSpace(candidate)) {
return true
}
}
return false
}
func parseDiscoveryEndpoint(raw string) (discoveryEndpoint, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri is required")
}
parsed, err := url.Parse(raw)
if err != nil || parsed.Scheme == "" {
if _, _, splitErr := net.SplitHostPort(raw); splitErr != nil {
if err != nil {
return discoveryEndpoint{}, err
}
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
}
return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil
}
scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme))
switch scheme {
case "grpc":
address := strings.TrimSpace(parsed.Host)
if _, _, splitErr := net.SplitHostPort(address); splitErr != nil {
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
}
return discoveryEndpoint{address: address, insecure: true, raw: raw}, nil
case "grpcs":
address := strings.TrimSpace(parsed.Host)
if _, _, splitErr := net.SplitHostPort(address); splitErr != nil {
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
}
return discoveryEndpoint{address: address, insecure: false, raw: raw}, nil
case "dns", "passthrough":
return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil
default:
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: unsupported invoke uri scheme")
}
}
func dialGrpc(ctx context.Context, endpoint discoveryEndpoint) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{}
if endpoint.insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(nil)))
}
if ctx == nil {
ctx = context.Background()
}
return grpc.DialContext(ctx, endpoint.address, dialOpts...)
}

View File

@@ -0,0 +1,19 @@
package serverimp
import (
"context"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/pkg/merrors"
)
type discoveryGatewayInvokeResolver struct {
resolver *discoveryClientResolver
}
func (r discoveryGatewayInvokeResolver) Resolve(ctx context.Context, invokeURI string) (chainclient.Client, error) {
if r.resolver == nil {
return nil, merrors.Internal("discovery gateway resolver unavailable")
}
return r.resolver.ChainClient(ctx, invokeURI)
}

View File

@@ -0,0 +1,370 @@
package serverimp
import (
"context"
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/payments/rail"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"google.golang.org/grpc"
)
type discoveryFeeClient struct {
resolver *discoveryClientResolver
}
func (c *discoveryFeeClient) Available() bool {
if c == nil || c.resolver == nil {
return false
}
return c.resolver.FeesAvailable()
}
func (c *discoveryFeeClient) QuoteFees(ctx context.Context, req *feesv1.QuoteFeesRequest, opts ...grpc.CallOption) (*feesv1.QuoteFeesResponse, error) {
client, err := c.resolver.FeesClient(ctx)
if err != nil {
return nil, err
}
return client.QuoteFees(ctx, req, opts...)
}
func (c *discoveryFeeClient) PrecomputeFees(ctx context.Context, req *feesv1.PrecomputeFeesRequest, opts ...grpc.CallOption) (*feesv1.PrecomputeFeesResponse, error) {
client, err := c.resolver.FeesClient(ctx)
if err != nil {
return nil, err
}
return client.PrecomputeFees(ctx, req, opts...)
}
func (c *discoveryFeeClient) ValidateFeeToken(ctx context.Context, req *feesv1.ValidateFeeTokenRequest, opts ...grpc.CallOption) (*feesv1.ValidateFeeTokenResponse, error) {
client, err := c.resolver.FeesClient(ctx)
if err != nil {
return nil, err
}
return client.ValidateFeeToken(ctx, req, opts...)
}
type discoveryLedgerClient struct {
resolver *discoveryClientResolver
}
func (c *discoveryLedgerClient) Available() bool {
if c == nil || c.resolver == nil {
return false
}
return c.resolver.LedgerAvailable()
}
func (c *discoveryLedgerClient) ReadBalance(ctx context.Context, accountID string) (*moneyv1.Money, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.ReadBalance(ctx, accountID)
}
func (c *discoveryLedgerClient) CreateTransaction(ctx context.Context, tx rail.LedgerTx) (string, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return "", err
}
return client.CreateTransaction(ctx, tx)
}
func (c *discoveryLedgerClient) TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.TransferInternal(ctx, req)
}
func (c *discoveryLedgerClient) HoldBalance(ctx context.Context, accountID string, amount string) error {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return err
}
return client.HoldBalance(ctx, accountID, amount)
}
func (c *discoveryLedgerClient) CreateAccount(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.CreateAccount(ctx, req)
}
func (c *discoveryLedgerClient) ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.ListAccounts(ctx, req)
}
func (c *discoveryLedgerClient) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.PostCreditWithCharges(ctx, req)
}
func (c *discoveryLedgerClient) PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.PostDebitWithCharges(ctx, req)
}
func (c *discoveryLedgerClient) ApplyFXWithCharges(ctx context.Context, req *ledgerv1.FXRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.ApplyFXWithCharges(ctx, req)
}
func (c *discoveryLedgerClient) GetBalance(ctx context.Context, req *ledgerv1.GetBalanceRequest) (*ledgerv1.BalanceResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.GetBalance(ctx, req)
}
func (c *discoveryLedgerClient) GetJournalEntry(ctx context.Context, req *ledgerv1.GetEntryRequest) (*ledgerv1.JournalEntryResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.GetJournalEntry(ctx, req)
}
func (c *discoveryLedgerClient) GetStatement(ctx context.Context, req *ledgerv1.GetStatementRequest) (*ledgerv1.StatementResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.GetStatement(ctx, req)
}
func (c *discoveryLedgerClient) Close() error {
if c == nil || c.resolver == nil {
return nil
}
client, err := c.resolver.LedgerClient(context.Background())
if err != nil {
return nil
}
return client.Close()
}
type discoveryOracleClient struct {
resolver *discoveryClientResolver
}
func (c *discoveryOracleClient) Available() bool {
if c == nil || c.resolver == nil {
return false
}
return c.resolver.OracleAvailable()
}
func (c *discoveryOracleClient) LatestRate(ctx context.Context, req oracleclient.LatestRateParams) (*oracleclient.RateSnapshot, error) {
client, err := c.resolver.OracleClient(ctx)
if err != nil {
return nil, err
}
return client.LatestRate(ctx, req)
}
func (c *discoveryOracleClient) GetQuote(ctx context.Context, req oracleclient.GetQuoteParams) (*oracleclient.Quote, error) {
client, err := c.resolver.OracleClient(ctx)
if err != nil {
return nil, err
}
return client.GetQuote(ctx, req)
}
func (c *discoveryOracleClient) Close() error {
if c == nil || c.resolver == nil {
return nil
}
client, err := c.resolver.OracleClient(context.Background())
if err != nil {
return nil
}
return client.Close()
}
type discoveryMntxClient struct {
resolver *discoveryClientResolver
}
func (c *discoveryMntxClient) Available() bool {
if c == nil || c.resolver == nil {
return false
}
return c.resolver.MntxAvailable()
}
func (c *discoveryMntxClient) CreateCardPayout(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
client, err := c.resolver.MntxClient(ctx)
if err != nil {
return nil, err
}
return client.CreateCardPayout(ctx, req)
}
func (c *discoveryMntxClient) CreateCardTokenPayout(ctx context.Context, req *mntxv1.CardTokenPayoutRequest) (*mntxv1.CardTokenPayoutResponse, error) {
client, err := c.resolver.MntxClient(ctx)
if err != nil {
return nil, err
}
return client.CreateCardTokenPayout(ctx, req)
}
func (c *discoveryMntxClient) GetCardPayoutStatus(ctx context.Context, req *mntxv1.GetCardPayoutStatusRequest) (*mntxv1.GetCardPayoutStatusResponse, error) {
client, err := c.resolver.MntxClient(ctx)
if err != nil {
return nil, err
}
return client.GetCardPayoutStatus(ctx, req)
}
func (c *discoveryMntxClient) ListGatewayInstances(ctx context.Context, req *mntxv1.ListGatewayInstancesRequest) (*mntxv1.ListGatewayInstancesResponse, error) {
client, err := c.resolver.MntxClient(ctx)
if err != nil {
return nil, err
}
return client.ListGatewayInstances(ctx, req)
}
func (c *discoveryMntxClient) Close() error {
if c == nil || c.resolver == nil {
return nil
}
client, err := c.resolver.MntxClient(context.Background())
if err != nil {
return nil
}
return client.Close()
}
type discoveryChainClient struct {
resolver *discoveryClientResolver
invokeURI string
}
func (c *discoveryChainClient) Available() bool {
return c != nil && c.resolver != nil && c.invokeURI != ""
}
func (c *discoveryChainClient) client(ctx context.Context) (chainclient.Client, error) {
if c == nil || c.resolver == nil {
return nil, merrors.Internal("discovery chain client unavailable")
}
return c.resolver.ChainClient(ctx, c.invokeURI)
}
func (c *discoveryChainClient) CreateManagedWallet(ctx context.Context, req *chainv1.CreateManagedWalletRequest) (*chainv1.CreateManagedWalletResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.CreateManagedWallet(ctx, req)
}
func (c *discoveryChainClient) GetManagedWallet(ctx context.Context, req *chainv1.GetManagedWalletRequest) (*chainv1.GetManagedWalletResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.GetManagedWallet(ctx, req)
}
func (c *discoveryChainClient) ListManagedWallets(ctx context.Context, req *chainv1.ListManagedWalletsRequest) (*chainv1.ListManagedWalletsResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.ListManagedWallets(ctx, req)
}
func (c *discoveryChainClient) GetWalletBalance(ctx context.Context, req *chainv1.GetWalletBalanceRequest) (*chainv1.GetWalletBalanceResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.GetWalletBalance(ctx, req)
}
func (c *discoveryChainClient) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.SubmitTransfer(ctx, req)
}
func (c *discoveryChainClient) GetTransfer(ctx context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.GetTransfer(ctx, req)
}
func (c *discoveryChainClient) ListTransfers(ctx context.Context, req *chainv1.ListTransfersRequest) (*chainv1.ListTransfersResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.ListTransfers(ctx, req)
}
func (c *discoveryChainClient) EstimateTransferFee(ctx context.Context, req *chainv1.EstimateTransferFeeRequest) (*chainv1.EstimateTransferFeeResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.EstimateTransferFee(ctx, req)
}
func (c *discoveryChainClient) ComputeGasTopUp(ctx context.Context, req *chainv1.ComputeGasTopUpRequest) (*chainv1.ComputeGasTopUpResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.ComputeGasTopUp(ctx, req)
}
func (c *discoveryChainClient) EnsureGasTopUp(ctx context.Context, req *chainv1.EnsureGasTopUpRequest) (*chainv1.EnsureGasTopUpResponse, error) {
client, err := c.client(ctx)
if err != nil {
return nil, err
}
return client.EnsureGasTopUp(ctx, req)
}
func (c *discoveryChainClient) Close() error {
if c == nil || c.resolver == nil {
return nil
}
client, err := c.resolver.ChainClient(context.Background(), c.invokeURI)
if err != nil {
return nil
}
return client.Close()
}

View File

@@ -1,16 +1,11 @@
package serverimp
import (
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/server/grpcapp"
"google.golang.org/grpc"
)
type Imp struct {
@@ -23,11 +18,6 @@ type Imp struct {
discoveryWatcher *discovery.RegistryWatcher
discoveryReg *discovery.Registry
discoveryAnnouncer *discovery.Announcer
discoveryClients *discoveryClientResolver
service *orchestrator.Service
feesConn *grpc.ClientConn
ledgerClient ledgerclient.Client
gatewayClient chainclient.Client
paymentGatewayClient chainclient.Client
mntxClient mntxclient.Client
oracleClient oracleclient.Client
}

View File

@@ -5,6 +5,7 @@ import (
"strings"
"github.com/shopspring/decimal"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
@@ -23,11 +24,6 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
if source == nil || strings.TrimSpace(source.ManagedWalletRef) == "" {
return merrors.InvalidArgument("card funding: source managed wallet is required")
}
if !s.deps.gateway.available() {
s.logger.Warn("card funding aborted: chain gateway unavailable")
return merrors.InvalidArgument("card funding: chain gateway unavailable")
}
route, err := s.cardRoute(defaultCardGateway)
if err != nil {
return err
@@ -67,10 +63,22 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
feeRequired := feeDecimal.IsPositive()
feeAmountProto := protoMoney(feeAmount)
network := networkFromEndpoint(intent.Source)
instanceID := strings.TrimSpace(intent.Source.InstanceID)
actions := []model.RailOperation{model.RailOperationSend}
if feeRequired {
actions = append(actions, model.RailOperationFee)
}
chainClient, _, err := s.resolveChainGatewayClient(ctx, network, intentAmount, actions, instanceID, payment.PaymentRef)
if err != nil {
s.logger.Warn("card funding gateway resolution failed", zap.Error(err), zap.String("payment_ref", payment.PaymentRef))
return err
}
fundingDest := &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ExternalAddress{ExternalAddress: fundingAddress},
}
fundingFee, err := s.estimateTransferNetworkFee(ctx, sourceWalletRef, fundingDest, intentAmountProto)
fundingFee, err := s.estimateTransferNetworkFee(ctx, chainClient, sourceWalletRef, fundingDest, intentAmountProto)
if err != nil {
return err
}
@@ -83,7 +91,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
feeDest := &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: feeWalletRef},
}
feeTransferFee, err = s.estimateTransferNetworkFee(ctx, sourceWalletRef, feeDest, feeAmountProto)
feeTransferFee, err = s.estimateTransferNetworkFee(ctx, chainClient, sourceWalletRef, feeDest, feeAmountProto)
if err != nil {
return err
}
@@ -103,7 +111,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
var topUpFee *moneyv1.Money
topUpPositive := false
if estimatedTotalFee != nil {
computeResp, err := s.deps.gateway.client.ComputeGasTopUp(ctx, &chainv1.ComputeGasTopUpRequest{
computeResp, err := chainClient.ComputeGasTopUp(ctx, &chainv1.ComputeGasTopUpRequest{
WalletRef: sourceWalletRef,
EstimatedTotalFee: estimatedTotalFee,
})
@@ -131,7 +139,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
topUpDest := &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: sourceWalletRef},
}
topUpFee, err = s.estimateTransferNetworkFee(ctx, feeWalletRef, topUpDest, topUpMoney)
topUpFee, err = s.estimateTransferNetworkFee(ctx, chainClient, feeWalletRef, topUpDest, topUpMoney)
if err != nil {
return err
}
@@ -191,7 +199,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
}
if topUpMoney != nil && topUpPositive {
ensureResp, gasErr := s.deps.gateway.client.EnsureGasTopUp(ctx, &chainv1.EnsureGasTopUpRequest{
ensureResp, gasErr := chainClient.EnsureGasTopUp(ctx, &chainv1.EnsureGasTopUpRequest{
IdempotencyKey: payment.IdempotencyKey + ":card:gas",
OrganizationRef: payment.OrganizationRef.Hex(),
SourceWalletRef: feeWalletRef,
@@ -228,7 +236,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
topUpDest := &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: sourceWalletRef},
}
topUpFee, err = s.estimateTransferNetworkFee(ctx, feeWalletRef, topUpDest, actual)
topUpFee, err = s.estimateTransferNetworkFee(ctx, chainClient, feeWalletRef, topUpDest, actual)
if err != nil {
return err
}
@@ -247,7 +255,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
updateExecutionPlanTotalNetworkFee(plan)
}
fundResp, err := s.deps.gateway.client.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{
fundResp, err := chainClient.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{
IdempotencyKey: payment.IdempotencyKey + ":card:fund",
OrganizationRef: payment.OrganizationRef.Hex(),
SourceWalletRef: sourceWalletRef,
@@ -267,7 +275,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
updateExecutionPlanTotalNetworkFee(plan)
if feeRequired {
feeResp, err := s.deps.gateway.client.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{
feeResp, err := chainClient.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{
IdempotencyKey: payment.IdempotencyKey + ":card:fee",
OrganizationRef: payment.OrganizationRef.Hex(),
SourceWalletRef: sourceWalletRef,
@@ -293,8 +301,8 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model
return nil
}
func (s *Service) estimateTransferNetworkFee(ctx context.Context, sourceWalletRef string, destination *chainv1.TransferDestination, amount *moneyv1.Money) (*moneyv1.Money, error) {
if !s.deps.gateway.available() {
func (s *Service) estimateTransferNetworkFee(ctx context.Context, client chainclient.Client, sourceWalletRef string, destination *chainv1.TransferDestination, amount *moneyv1.Money) (*moneyv1.Money, error) {
if client == nil {
return nil, merrors.InvalidArgument("chain gateway unavailable")
}
sourceWalletRef = strings.TrimSpace(sourceWalletRef)
@@ -305,7 +313,7 @@ func (s *Service) estimateTransferNetworkFee(ctx context.Context, sourceWalletRe
return nil, merrors.InvalidArgument("amount is required")
}
resp, err := s.deps.gateway.client.EstimateTransferFee(ctx, &chainv1.EstimateTransferFeeRequest{
resp, err := client.EstimateTransferFee(ctx, &chainv1.EstimateTransferFeeRequest{
SourceWalletRef: sourceWalletRef,
Destination: destination,
Amount: cloneProtoMoney(amount),

View File

@@ -74,7 +74,7 @@ func TestSubmitCardFundingTransfers_PlansTopUpAndFunding(t *testing.T) {
svc := &Service{
logger: zap.NewNop(),
deps: serviceDependencies{
gateway: gatewayDependency{client: gateway},
gateway: gatewayDependency{resolver: staticChainGatewayResolver{client: gateway}},
cardRoutes: map[string]CardGatewayRoute{
defaultCardGateway: {
FundingAddress: fundingAddress,
@@ -241,7 +241,7 @@ func TestSubmitCardPayout_UsesSettlementAmount(t *testing.T) {
svc := &Service{
logger: zap.NewNop(),
deps: serviceDependencies{
gateway: gatewayDependency{client: gateway},
gateway: gatewayDependency{resolver: staticChainGatewayResolver{client: gateway}},
mntx: mntxDependency{client: mntx},
},
}
@@ -326,7 +326,7 @@ func TestSubmitCardFundingTransfers_RequiresFeeWalletRef(t *testing.T) {
svc := &Service{
logger: zap.NewNop(),
deps: serviceDependencies{
gateway: gatewayDependency{client: gateway},
gateway: gatewayDependency{resolver: staticChainGatewayResolver{client: gateway}},
cardRoutes: map[string]CardGatewayRoute{
defaultCardGateway: {
FundingAddress: "0xfunding",

View File

@@ -475,6 +475,10 @@ func protoRailOperationFromModel(action model.RailOperation) gatewayv1.RailOpera
return gatewayv1.RailOperation_RAIL_OPERATION_OBSERVE_CONFIRM
case string(model.RailOperationFXConvert):
return gatewayv1.RailOperation_RAIL_OPERATION_FX_CONVERT
case string(model.RailOperationBlock):
return gatewayv1.RailOperation_RAIL_OPERATION_BLOCK
case string(model.RailOperationRelease):
return gatewayv1.RailOperation_RAIL_OPERATION_RELEASE
default:
return gatewayv1.RailOperation_RAIL_OPERATION_UNSPECIFIED
}

View File

@@ -48,6 +48,7 @@ func (r *discoveryGatewayRegistry) List(_ context.Context) ([]*model.GatewayInst
InstanceID: entry.InstanceID,
Rail: rail,
Network: entry.Network,
InvokeURI: strings.TrimSpace(entry.InvokeURI),
Currencies: normalizeCurrencies(entry.Currencies),
Capabilities: capabilitiesFromOps(entry.Operations),
Limits: limitsFromDiscovery(entry.Limits),
@@ -92,6 +93,10 @@ func capabilitiesFromOps(ops []string) model.RailCapabilities {
cap.CanSendFee = true
case "observe.confirm", "observe.confirmation":
cap.RequiresObserveConfirm = true
case "block", "funds.block", "balance.block", "ledger.block":
cap.CanBlock = true
case "release", "funds.release", "balance.release", "ledger.release":
cap.CanRelease = true
}
}
return cap

View File

@@ -60,10 +60,6 @@ func isSourceExecutionStep(step *model.ExecutionStep) bool {
return executionStepRole(step) == executionStepRoleSource
}
func isConsumerExecutionStep(step *model.ExecutionStep) bool {
return executionStepRole(step) == executionStepRoleConsumer
}
func sourceStepsConfirmed(plan *model.ExecutionPlan) bool {
if plan == nil || len(plan.Steps) == 0 {
return false

View File

@@ -7,7 +7,6 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/mlogger"
gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1"
)
type gatewayRegistry struct {
@@ -52,108 +51,6 @@ func (r *gatewayRegistry) List(ctx context.Context) ([]*model.GatewayInstanceDes
return result, nil
}
func modelGatewayFromProto(src *gatewayv1.GatewayInstanceDescriptor) *model.GatewayInstanceDescriptor {
if src == nil {
return nil
}
limits := modelLimitsFromProto(src.GetLimits())
return &model.GatewayInstanceDescriptor{
ID: strings.TrimSpace(src.GetId()),
Rail: modelRailFromProto(src.GetRail()),
Network: strings.ToUpper(strings.TrimSpace(src.GetNetwork())),
Currencies: normalizeCurrencies(src.GetCurrencies()),
Capabilities: modelCapabilitiesFromProto(src.GetCapabilities()),
Limits: limits,
Version: strings.TrimSpace(src.GetVersion()),
IsEnabled: src.GetIsEnabled(),
}
}
func modelRailFromProto(rail gatewayv1.Rail) model.Rail {
switch rail {
case gatewayv1.Rail_RAIL_CRYPTO:
return model.RailCrypto
case gatewayv1.Rail_RAIL_PROVIDER_SETTLEMENT:
return model.RailProviderSettlement
case gatewayv1.Rail_RAIL_LEDGER:
return model.RailLedger
case gatewayv1.Rail_RAIL_CARD_PAYOUT:
return model.RailCardPayout
case gatewayv1.Rail_RAIL_FIAT_ONRAMP:
return model.RailFiatOnRamp
default:
return model.RailUnspecified
}
}
func modelCapabilitiesFromProto(src *gatewayv1.RailCapabilities) model.RailCapabilities {
if src == nil {
return model.RailCapabilities{}
}
return model.RailCapabilities{
CanPayIn: src.GetCanPayIn(),
CanPayOut: src.GetCanPayOut(),
CanReadBalance: src.GetCanReadBalance(),
CanSendFee: src.GetCanSendFee(),
RequiresObserveConfirm: src.GetRequiresObserveConfirm(),
}
}
func modelLimitsFromProto(src *gatewayv1.Limits) model.Limits {
if src == nil {
return model.Limits{}
}
limits := model.Limits{
MinAmount: strings.TrimSpace(src.GetMinAmount()),
MaxAmount: strings.TrimSpace(src.GetMaxAmount()),
PerTxMaxFee: strings.TrimSpace(src.GetPerTxMaxFee()),
PerTxMinAmount: strings.TrimSpace(src.GetPerTxMinAmount()),
PerTxMaxAmount: strings.TrimSpace(src.GetPerTxMaxAmount()),
}
if len(src.GetVolumeLimit()) > 0 {
limits.VolumeLimit = map[string]string{}
for key, value := range src.GetVolumeLimit() {
bucket := strings.TrimSpace(key)
amount := strings.TrimSpace(value)
if bucket == "" || amount == "" {
continue
}
limits.VolumeLimit[bucket] = amount
}
}
if len(src.GetVelocityLimit()) > 0 {
limits.VelocityLimit = map[string]int{}
for key, value := range src.GetVelocityLimit() {
bucket := strings.TrimSpace(key)
if bucket == "" {
continue
}
limits.VelocityLimit[bucket] = int(value)
}
}
if len(src.GetCurrencyLimits()) > 0 {
limits.CurrencyLimits = map[string]model.LimitsOverride{}
for key, override := range src.GetCurrencyLimits() {
currency := strings.ToUpper(strings.TrimSpace(key))
if currency == "" || override == nil {
continue
}
limits.CurrencyLimits[currency] = model.LimitsOverride{
MaxVolume: strings.TrimSpace(override.GetMaxVolume()),
MinAmount: strings.TrimSpace(override.GetMinAmount()),
MaxAmount: strings.TrimSpace(override.GetMaxAmount()),
MaxFee: strings.TrimSpace(override.GetMaxFee()),
MaxOps: int(override.GetMaxOps()),
}
}
}
return limits
}
func normalizeCurrencies(values []string) []string {
if len(values) == 0 {
return nil

View File

@@ -0,0 +1,133 @@
package orchestrator
import (
"context"
"sort"
"strings"
"github.com/shopspring/decimal"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.uber.org/zap"
)
func (s *Service) resolveChainGatewayClient(ctx context.Context, network string, amount *paymenttypes.Money, actions []model.RailOperation, instanceID string, paymentRef string) (chainclient.Client, *model.GatewayInstanceDescriptor, error) {
if s.deps.gatewayRegistry != nil && s.deps.gatewayInvokeResolver != nil {
entry, err := selectGatewayForActions(ctx, s.deps.gatewayRegistry, model.RailCrypto, network, amount, actions, instanceID, sendDirectionForRail(model.RailCrypto))
if err != nil {
return nil, nil, err
}
invokeURI := strings.TrimSpace(entry.InvokeURI)
if invokeURI == "" {
return nil, nil, merrors.InvalidArgument("chain gateway: invoke uri is required")
}
client, err := s.deps.gatewayInvokeResolver.Resolve(ctx, invokeURI)
if err != nil {
return nil, nil, err
}
if s.logger != nil {
fields := []zap.Field{
zap.String("gateway_id", entry.ID),
zap.String("instance_id", entry.InstanceID),
zap.String("rail", string(entry.Rail)),
zap.String("network", entry.Network),
zap.String("invoke_uri", invokeURI),
}
if paymentRef != "" {
fields = append(fields, zap.String("payment_ref", paymentRef))
}
if len(actions) > 0 {
fields = append(fields, zap.Strings("actions", railActionNames(actions)))
}
s.logger.Info("Chain gateway selected", fields...)
}
return client, entry, nil
}
if s.deps.gateway.resolver != nil {
client, err := s.deps.gateway.resolver.Resolve(ctx, network)
if err != nil {
return nil, nil, err
}
return client, nil, nil
}
return nil, nil, merrors.NoData("chain gateway unavailable")
}
func selectGatewayForActions(ctx context.Context, registry GatewayRegistry, rail model.Rail, network string, amount *paymenttypes.Money, actions []model.RailOperation, instanceID string, dir sendDirection) (*model.GatewayInstanceDescriptor, error) {
if registry == nil {
return nil, merrors.NoData("gateway registry unavailable")
}
all, err := registry.List(ctx)
if err != nil {
return nil, err
}
if len(all) == 0 {
return nil, merrors.NoData("no gateway instances available")
}
if len(actions) == 0 {
actions = []model.RailOperation{model.RailOperationSend}
}
currency := ""
amt := decimal.Zero
if amount != nil && strings.TrimSpace(amount.GetAmount()) != "" {
amt, err = decimalFromMoney(amount)
if err != nil {
return nil, err
}
currency = strings.ToUpper(strings.TrimSpace(amount.GetCurrency()))
}
network = strings.ToUpper(strings.TrimSpace(network))
eligible := make([]*model.GatewayInstanceDescriptor, 0)
for _, entry := range all {
if entry == nil || !entry.IsEnabled {
continue
}
if entry.Rail != rail {
continue
}
if instanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(instanceID)) {
continue
}
ok := true
for _, action := range actions {
if !isGatewayEligible(entry, rail, network, currency, action, dir, amt) {
ok = false
break
}
}
if !ok {
continue
}
eligible = append(eligible, entry)
}
if len(eligible) == 0 {
return nil, merrors.NoData("no eligible gateway instance found")
}
sort.Slice(eligible, func(i, j int) bool {
return eligible[i].ID < eligible[j].ID
})
return eligible[0], nil
}
func railActionNames(actions []model.RailOperation) []string {
if len(actions) == 0 {
return nil
}
names := make([]string, 0, len(actions))
for _, action := range actions {
name := strings.TrimSpace(string(action))
if name == "" {
continue
}
names = append(names, name)
}
if len(names) == 0 {
return nil
}
return names
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.uber.org/zap"
)
@@ -21,15 +22,17 @@ type paymentEventHandler struct {
logger mlogger.Logger
submitCardPayout func(ctx context.Context, payment *model.Payment) error
resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error
releaseHold func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error
}
func newPaymentEventHandler(repo storage.Repository, ensure func(ctx context.Context) error, logger mlogger.Logger, submitCardPayout func(ctx context.Context, payment *model.Payment) error, resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error) *paymentEventHandler {
func newPaymentEventHandler(repo storage.Repository, ensure func(ctx context.Context) error, logger mlogger.Logger, submitCardPayout func(ctx context.Context, payment *model.Payment) error, resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error, releaseHold func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error) *paymentEventHandler {
return &paymentEventHandler{
repo: repo,
ensureRepo: ensure,
logger: logger,
submitCardPayout: submitCardPayout,
resumePlan: resumePlan,
releaseHold: releaseHold,
}
}
@@ -248,6 +251,21 @@ func (h *paymentEventHandler) processCardPayoutUpdate(ctx context.Context, req *
}
applyCardPayoutUpdate(payment, payout)
switch payout.GetStatus() {
case mntxv1.PayoutStatus_PAYOUT_STATUS_PROCESSED:
if h.resumePlan != nil && payment.PaymentPlan != nil && len(payment.PaymentPlan.Steps) > 0 {
if err := h.resumePlan(ctx, store, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
}
case mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED:
if h.releaseHold != nil && payment.PaymentPlan != nil && len(payment.PaymentPlan.Steps) > 0 {
if err := h.releaseHold(ctx, store, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
}
}
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}

View File

@@ -6,6 +6,7 @@ import (
"strings"
"time"
"github.com/shopspring/decimal"
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
@@ -14,20 +15,38 @@ import (
clockpkg "github.com/tech/sendico/pkg/clock"
"github.com/tech/sendico/pkg/merrors"
mb "github.com/tech/sendico/pkg/messaging/broker"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/payments/rail"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
"go.uber.org/zap"
)
// Option configures service dependencies.
type Option func(*Service)
// GatewayInvokeResolver resolves gateway invoke URIs into chain gateway clients.
type GatewayInvokeResolver interface {
Resolve(ctx context.Context, invokeURI string) (chainclient.Client, error)
}
// ChainGatewayResolver resolves chain gateway clients by network.
type ChainGatewayResolver interface {
Resolve(ctx context.Context, network string) (chainclient.Client, error)
}
type feesDependency struct {
client feesv1.FeeEngineClient
timeout time.Duration
}
func (f feesDependency) available() bool {
return f.client != nil
if f.client == nil {
return false
}
if checker, ok := f.client.(interface{ Available() bool }); ok {
return checker.Available()
}
return true
}
type ledgerDependency struct {
@@ -35,28 +54,25 @@ type ledgerDependency struct {
internal rail.InternalLedger
}
func (l ledgerDependency) available() bool {
return l.client != nil
}
type gatewayDependency struct {
client chainclient.Client
resolver ChainGatewayResolver
}
func (g gatewayDependency) available() bool {
return g.client != nil
return g.resolver != nil
}
type railGatewayDependency struct {
byID map[string]rail.RailGateway
byRail map[model.Rail][]rail.RailGateway
registry GatewayRegistry
chainClient chainclient.Client
providerClient chainclient.Client
chainResolver GatewayInvokeResolver
providerResolver GatewayInvokeResolver
logger mlogger.Logger
}
func (g railGatewayDependency) available() bool {
return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && (g.chainClient != nil || g.providerClient != nil))
return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && (g.chainResolver != nil || g.providerResolver != nil))
}
func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) {
@@ -64,12 +80,11 @@ func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentS
return nil, merrors.InvalidArgument("rail gateway: step is required")
}
if id := strings.TrimSpace(step.GatewayID); id != "" {
gw, ok := g.byID[id]
if !ok {
return nil, merrors.InvalidArgument("rail gateway: unknown gateway id")
}
if gw, ok := g.byID[id]; ok {
return gw, nil
}
return g.resolveDynamic(ctx, step)
}
if len(g.byRail) == 0 {
return g.resolveDynamic(ctx, step)
}
@@ -81,13 +96,32 @@ func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentS
}
func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) {
if g.registry == nil || (g.chainClient == nil && g.providerClient == nil) {
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail")
if g.registry == nil {
return nil, merrors.InvalidArgument("rail gateway: registry is required")
}
if g.chainResolver == nil && g.providerResolver == nil {
return nil, merrors.InvalidArgument("rail gateway: gateway resolver is required")
}
items, err := g.registry.List(ctx)
if err != nil {
return nil, err
}
if len(items) == 0 {
return nil, merrors.InvalidArgument("rail gateway: no gateway instances available")
}
currency := ""
amount := decimal.Zero
if step.Amount != nil && strings.TrimSpace(step.Amount.GetAmount()) != "" {
value, err := decimalFromMoney(step.Amount)
if err != nil {
return nil, err
}
amount = value
currency = strings.ToUpper(strings.TrimSpace(step.Amount.GetCurrency()))
}
candidates := make([]*model.GatewayInstanceDescriptor, 0)
for _, entry := range items {
if entry == nil || !entry.IsEnabled {
continue
@@ -98,6 +132,28 @@ func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.P
if step.GatewayID != "" && entry.ID != step.GatewayID {
continue
}
if step.InstanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(step.InstanceID)) {
continue
}
if step.Action != model.RailOperationUnspecified {
if !isGatewayEligible(entry, step.Rail, "", currency, step.Action, sendDirectionForRail(step.Rail), amount) {
continue
}
}
candidates = append(candidates, entry)
}
if len(candidates) == 0 {
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail")
}
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].ID < candidates[j].ID
})
entry := candidates[0]
invokeURI := strings.TrimSpace(entry.InvokeURI)
if invokeURI == "" {
return nil, merrors.InvalidArgument("rail gateway: invoke uri is required")
}
cfg := chainclient.RailGatewayConfig{
Rail: string(entry.Rail),
Network: entry.Network,
@@ -107,22 +163,42 @@ func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.P
CanReadBalance: entry.Capabilities.CanReadBalance,
CanSendFee: entry.Capabilities.CanSendFee,
RequiresObserveConfirm: entry.Capabilities.RequiresObserveConfirm,
CanBlock: entry.Capabilities.CanBlock,
CanRelease: entry.Capabilities.CanRelease,
},
}
if g.logger != nil {
g.logger.Info("Rail gateway resolved",
zap.String("step_id", strings.TrimSpace(step.StepID)),
zap.String("action", string(step.Action)),
zap.String("gateway_id", entry.ID),
zap.String("instance_id", entry.InstanceID),
zap.String("rail", string(entry.Rail)),
zap.String("network", entry.Network),
zap.String("invoke_uri", invokeURI))
}
switch entry.Rail {
case model.RailProviderSettlement:
if g.providerClient == nil {
return nil, merrors.InvalidArgument("rail gateway: missing provider settlement client")
if g.providerResolver == nil {
return nil, merrors.InvalidArgument("rail gateway: provider settlement resolver required")
}
return NewProviderSettlementGateway(g.providerClient, cfg), nil
client, err := g.providerResolver.Resolve(ctx, invokeURI)
if err != nil {
return nil, err
}
return NewProviderSettlementGateway(client, cfg), nil
default:
if g.chainClient == nil {
return nil, merrors.InvalidArgument("rail gateway: missing gateway client")
if g.chainResolver == nil {
return nil, merrors.InvalidArgument("rail gateway: chain gateway resolver required")
}
return chainclient.NewRailGateway(g.chainClient, cfg), nil
client, err := g.chainResolver.Resolve(ctx, invokeURI)
if err != nil {
return nil, err
}
return chainclient.NewRailGateway(client, cfg), nil
}
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail")
}
type oracleDependency struct {
@@ -130,7 +206,13 @@ type oracleDependency struct {
}
func (o oracleDependency) available() bool {
return o.client != nil
if o.client == nil {
return false
}
if checker, ok := o.client.(interface{ Available() bool }); ok {
return checker.Available()
}
return true
}
type mntxDependency struct {
@@ -138,23 +220,32 @@ type mntxDependency struct {
}
func (m mntxDependency) available() bool {
return m.client != nil
}
type gatewayRegistryDependency struct {
registry GatewayRegistry
}
func (g gatewayRegistryDependency) available() bool {
return g.registry != nil
if m.client == nil {
return false
}
if checker, ok := m.client.(interface{ Available() bool }); ok {
return checker.Available()
}
return true
}
type providerGatewayDependency struct {
client chainclient.Client
resolver ChainGatewayResolver
}
func (p providerGatewayDependency) available() bool {
return p.client != nil
return p.resolver != nil
}
type staticChainGatewayResolver struct {
client chainclient.Client
}
func (r staticChainGatewayResolver) Resolve(ctx context.Context, _ string) (chainclient.Client, error) {
if r.client == nil {
return nil, merrors.InvalidArgument("chain gateway client is required")
}
return r.client, nil
}
// CardGatewayRoute maps a gateway to its funding and fee destinations.
@@ -195,14 +286,44 @@ func WithLedgerClient(client ledgerclient.Client) Option {
// WithChainGatewayClient wires the chain gateway client.
func WithChainGatewayClient(client chainclient.Client) Option {
return func(s *Service) {
s.deps.gateway = gatewayDependency{client: client}
s.deps.gateway = gatewayDependency{resolver: staticChainGatewayResolver{client: client}}
}
}
// WithChainGatewayResolver wires a resolver for chain gateway clients.
func WithChainGatewayResolver(resolver ChainGatewayResolver) Option {
return func(s *Service) {
if resolver != nil {
s.deps.gateway = gatewayDependency{resolver: resolver}
}
}
}
// WithProviderSettlementGatewayClient wires the provider settlement gateway client.
func WithProviderSettlementGatewayClient(client chainclient.Client) Option {
return func(s *Service) {
s.deps.providerGateway = providerGatewayDependency{client: client}
s.deps.providerGateway = providerGatewayDependency{resolver: staticChainGatewayResolver{client: client}}
}
}
// WithProviderSettlementGatewayResolver wires a resolver for provider settlement gateway clients.
func WithProviderSettlementGatewayResolver(resolver ChainGatewayResolver) Option {
return func(s *Service) {
if resolver != nil {
s.deps.providerGateway = providerGatewayDependency{resolver: resolver}
}
}
}
// WithGatewayInvokeResolver wires a resolver for gateway invoke URIs.
func WithGatewayInvokeResolver(resolver GatewayInvokeResolver) Option {
return func(s *Service) {
if resolver == nil {
return
}
s.deps.gatewayInvokeResolver = resolver
s.deps.railGateways.chainResolver = resolver
s.deps.railGateways.providerResolver = resolver
}
}
@@ -212,7 +333,7 @@ func WithRailGateways(gateways map[string]rail.RailGateway) Option {
if len(gateways) == 0 {
return
}
s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gateway.client, s.deps.providerGateway.client)
s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gatewayInvokeResolver, s.deps.gatewayInvokeResolver, s.logger)
}
}
@@ -276,8 +397,9 @@ func WithGatewayRegistry(registry GatewayRegistry) Option {
if registry != nil {
s.deps.gatewayRegistry = registry
s.deps.railGateways.registry = registry
s.deps.railGateways.chainClient = s.deps.gateway.client
s.deps.railGateways.providerClient = s.deps.providerGateway.client
s.deps.railGateways.chainResolver = s.deps.gatewayInvokeResolver
s.deps.railGateways.providerResolver = s.deps.gatewayInvokeResolver
s.deps.railGateways.logger = s.logger.Named("rail_gateways")
if s.deps.planBuilder == nil {
s.deps.planBuilder = &defaultPlanBuilder{}
}
@@ -294,13 +416,14 @@ func WithClock(clock clockpkg.Clock) Option {
}
}
func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainClient chainclient.Client, providerClient chainclient.Client) railGatewayDependency {
func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainResolver GatewayInvokeResolver, providerResolver GatewayInvokeResolver, logger mlogger.Logger) railGatewayDependency {
result := railGatewayDependency{
byID: map[string]rail.RailGateway{},
byRail: map[model.Rail][]rail.RailGateway{},
registry: registry,
chainClient: chainClient,
providerClient: providerClient,
chainResolver: chainResolver,
providerResolver: providerResolver,
logger: logger,
}
if len(gateways) == 0 {
return result

View File

@@ -46,6 +46,10 @@ func (p *paymentExecutor) executePaymentPlan(ctx context.Context, store storage.
execStep = &model.ExecutionStep{Code: stepID}
execSteps[stepID] = execStep
}
if step.Action == model.RailOperationRelease {
setExecutionStepStatus(execStep, executionStepStatusSkipped)
continue
}
status := executionStepStatus(execStep)
switch status {
case executionStepStatusConfirmed, executionStepStatusSkipped:
@@ -86,7 +90,11 @@ func (p *paymentExecutor) executePaymentPlan(ctx context.Context, store storage.
}
if asyncSubmitted && !executionPlanComplete(execPlan) {
if blockStepConfirmed(plan, execPlan) {
payment.State = model.PaymentStateFundsReserved
} else {
payment.State = model.PaymentStateSubmitted
}
return p.persistPayment(ctx, store, payment)
}
payment.State = model.PaymentStateSettled

View File

@@ -12,6 +12,7 @@ import (
"github.com/tech/sendico/pkg/payments/rail"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
@@ -64,7 +65,7 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
deps: serviceDependencies{
railGateways: buildRailGatewayDependency(map[string]rail.RailGateway{
"crypto-default": railGateway,
}, nil, nil, nil),
}, nil, nil, nil, nil),
ledger: ledgerDependency{
client: ledgerFake,
internal: ledgerFake,
@@ -196,3 +197,146 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
t.Fatalf("expected ledger debit after payout confirmation, debit=%d credit=%d", debitCalls, creditCalls)
}
}
func TestExecutePaymentPlan_BlockThenDebitFromHold(t *testing.T) {
ctx := context.Background()
store := newStubPaymentsStore()
repo := &stubRepository{store: store}
blockCalls := 0
var blockReq *ledgerv1.TransferRequest
debitCalls := 0
var debitTx rail.LedgerTx
ledgerFake := &ledgerclient.Fake{
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
blockCalls++
blockReq = req
return &ledgerv1.PostResponse{JournalEntryRef: "hold-1"}, nil
},
CreateTransactionFn: func(ctx context.Context, tx rail.LedgerTx) (string, error) {
debitCalls++
debitTx = tx
return "debit-1", nil
},
}
mntxFake := &mntxclient.Fake{
CreateCardPayoutFn: func(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
return &mntxv1.CardPayoutResponse{Payout: &mntxv1.CardPayoutState{PayoutId: "payout-1"}}, nil
},
}
svc := &Service{
logger: zap.NewNop(),
storage: repo,
deps: serviceDependencies{
ledger: ledgerDependency{
client: ledgerFake,
internal: ledgerFake,
},
mntx: mntxDependency{client: mntxFake},
cardRoutes: map[string]CardGatewayRoute{
defaultCardGateway: {
FundingAddress: "funding-address",
FeeWalletRef: "fee-wallet",
},
},
},
}
executor := newPaymentExecutor(&svc.deps, svc.logger, svc)
payment := &model.Payment{
PaymentRef: "pay-block-1",
IdempotencyKey: "pay-block-1",
OrganizationBoundBase: mo.OrganizationBoundBase{
OrganizationRef: primitive.NewObjectID(),
},
Intent: model.PaymentIntent{
Kind: model.PaymentKindPayout,
Source: model.PaymentEndpoint{
Type: model.EndpointTypeManagedWallet,
ManagedWallet: &model.ManagedWalletEndpoint{
ManagedWalletRef: "wallet-src",
},
},
Destination: model.PaymentEndpoint{
Type: model.EndpointTypeCard,
Card: &model.CardEndpoint{
Pan: "4111111111111111",
Cardholder: "Ada",
CardholderSurname: "Lovelace",
ExpMonth: 1,
ExpYear: 2030,
MaskedPan: "4111",
},
},
Attributes: map[string]string{
"ledger_debit_account_ref": "ledger:debit",
"ledger_block_account_ref": "ledger:block",
},
Customer: &model.Customer{
ID: "cust-1",
FirstName: "Ada",
LastName: "Lovelace",
IP: "1.2.3.4",
},
},
PaymentPlan: &model.PaymentPlan{
ID: "pay-block-1",
IdempotencyKey: "pay-block-1",
Steps: []*model.PaymentStep{
{StepID: "ledger_block", Rail: model.RailLedger, Action: model.RailOperationBlock, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "card_payout", Rail: model.RailCardPayout, Action: model.RailOperationSend, DependsOn: []string{"ledger_block"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "ledger_debit", Rail: model.RailLedger, Action: model.RailOperationDebit, DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "ledger_release", Rail: model.RailLedger, Action: model.RailOperationRelease, DependsOn: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
},
},
}
store.payments[payment.PaymentRef] = payment
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan error: %v", err)
}
if blockCalls != 1 || blockReq == nil {
t.Fatalf("expected ledger block transfer, calls=%d", blockCalls)
}
if blockReq.GetFromLedgerAccountRef() != "ledger:debit" {
t.Fatalf("unexpected block from account: %s", blockReq.GetFromLedgerAccountRef())
}
if blockReq.GetToLedgerAccountRef() != "ledger:block" {
t.Fatalf("unexpected block to account: %s", blockReq.GetToLedgerAccountRef())
}
if debitCalls != 0 {
t.Fatalf("expected no debit before payout confirmation, got %d", debitCalls)
}
if payment.State != model.PaymentStateFundsReserved {
t.Fatalf("expected funds reserved state, got %s", payment.State)
}
steps := executionStepsByCode(payment.ExecutionPlan)
cardStep := steps["card_payout"]
if cardStep == nil {
t.Fatalf("expected card payout step in execution plan")
}
setExecutionStepStatus(cardStep, executionStepStatusConfirmed)
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan resume error: %v", err)
}
if debitCalls != 1 {
t.Fatalf("expected ledger debit after payout confirmation, got %d", debitCalls)
}
if debitTx.LedgerAccountRef != "ledger:block" {
t.Fatalf("expected debit from block account, got %s", debitTx.LedgerAccountRef)
}
if debitTx.ContraLedgerAccountRef != "" {
t.Fatalf("expected contra to be cleared after block, got %s", debitTx.ContraLedgerAccountRef)
}
if payment.State != model.PaymentStateSettled {
t.Fatalf("expected settled state, got %s", payment.State)
}
}

View File

@@ -80,6 +80,26 @@ func executionPlanComplete(plan *model.ExecutionPlan) bool {
return true
}
func blockStepConfirmed(plan *model.PaymentPlan, execPlan *model.ExecutionPlan) bool {
if plan == nil || execPlan == nil || len(plan.Steps) == 0 {
return false
}
execSteps := executionStepsByCode(execPlan)
for idx, step := range plan.Steps {
if step == nil || step.Action != model.RailOperationBlock {
continue
}
execStep := execSteps[planStepID(step, idx)]
if execStep == nil {
continue
}
if executionStepStatus(execStep) == executionStepStatusConfirmed {
return true
}
}
return false
}
func planStepID(step *model.PaymentStep, idx int) string {
if step != nil {
if val := strings.TrimSpace(step.StepID); val != "" {

View File

@@ -11,28 +11,167 @@ import (
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
)
func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
}
if p.deps.ledger.internal == nil {
p.logger.Error("Ledger client unavailable", zap.String("action", "debit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
tx, err := p.ledgerTxForAction(payment, amount, charges, idempotencyKey, idx, model.RailOperationDebit, quote)
if err != nil {
p.logger.Warn("Ledger debit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
}
return p.deps.ledger.internal.CreateTransaction(ctx, tx)
ref, err := p.deps.ledger.internal.CreateTransaction(ctx, tx)
if err != nil {
p.logger.Warn("Ledger debit failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
}
return ref, nil
}
func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
}
if p.deps.ledger.internal == nil {
p.logger.Error("Ledger client unavailable", zap.String("action", "credit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
tx, err := p.ledgerTxForAction(payment, amount, nil, idempotencyKey, idx, model.RailOperationCredit, quote)
if err != nil {
p.logger.Warn("Ledger credit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
}
return p.deps.ledger.internal.CreateTransaction(ctx, tx)
ref, err := p.deps.ledger.internal.CreateTransaction(ctx, tx)
if err != nil {
p.logger.Warn("Ledger credit failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
}
return ref, nil
}
func (p *paymentExecutor) postLedgerBlock(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
}
if p.deps.ledger.internal == nil {
p.logger.Error("Ledger client unavailable", zap.String("action", "block"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
if payment == nil {
return "", merrors.InvalidArgument("ledger: payment is required")
}
if payment.OrganizationRef == primitive.NilObjectID {
return "", merrors.InvalidArgument("ledger: organization_ref is required")
}
if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" {
return "", merrors.InvalidArgument("ledger: amount is required")
}
sourceAccount, err := ledgerDebitAccountRef(payment)
if err != nil {
return "", err
}
blockAccount, err := ledgerBlockAccount(payment)
if err != nil {
return "", err
}
resp, err := p.deps.ledger.internal.TransferInternal(ctx, &ledgerv1.TransferRequest{
IdempotencyKey: strings.TrimSpace(idempotencyKey),
OrganizationRef: payment.OrganizationRef.Hex(),
FromLedgerAccountRef: strings.TrimSpace(sourceAccount),
ToLedgerAccountRef: strings.TrimSpace(blockAccount),
Money: cloneProtoMoney(amount),
Description: paymentDescription(payment),
Metadata: cloneMetadata(payment.Metadata),
})
if err != nil {
p.logger.Warn("Ledger block failed",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("from_account", strings.TrimSpace(sourceAccount)),
zap.String("to_account", strings.TrimSpace(blockAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", strings.TrimSpace(amount.GetCurrency())),
zap.Error(err))
return "", err
}
entryRef := strings.TrimSpace(resp.GetJournalEntryRef())
p.logger.Info("Ledger block posted",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("entry_ref", entryRef),
zap.String("from_account", strings.TrimSpace(sourceAccount)),
zap.String("to_account", strings.TrimSpace(blockAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", strings.TrimSpace(amount.GetCurrency())))
return entryRef, nil
}
func (p *paymentExecutor) postLedgerRelease(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int) (string, error) {
paymentRef := ""
if payment != nil {
paymentRef = strings.TrimSpace(payment.PaymentRef)
}
if p.deps.ledger.internal == nil {
p.logger.Error("Ledger client unavailable", zap.String("action", "release"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
if payment == nil {
return "", merrors.InvalidArgument("ledger: payment is required")
}
if payment.OrganizationRef == primitive.NilObjectID {
return "", merrors.InvalidArgument("ledger: organization_ref is required")
}
if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" {
return "", merrors.InvalidArgument("ledger: amount is required")
}
sourceAccount, err := ledgerDebitAccountRef(payment)
if err != nil {
return "", err
}
blockAccount, err := ledgerBlockAccount(payment)
if err != nil {
return "", err
}
resp, err := p.deps.ledger.internal.TransferInternal(ctx, &ledgerv1.TransferRequest{
IdempotencyKey: strings.TrimSpace(idempotencyKey),
OrganizationRef: payment.OrganizationRef.Hex(),
FromLedgerAccountRef: strings.TrimSpace(blockAccount),
ToLedgerAccountRef: strings.TrimSpace(sourceAccount),
Money: cloneProtoMoney(amount),
Description: paymentDescription(payment),
Metadata: cloneMetadata(payment.Metadata),
})
if err != nil {
p.logger.Warn("Ledger release failed",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("from_account", strings.TrimSpace(blockAccount)),
zap.String("to_account", strings.TrimSpace(sourceAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", strings.TrimSpace(amount.GetCurrency())),
zap.Error(err))
return "", err
}
entryRef := strings.TrimSpace(resp.GetJournalEntryRef())
p.logger.Info("Ledger release posted",
zap.String("payment_ref", paymentRef),
zap.Int("step_index", idx),
zap.String("entry_ref", entryRef),
zap.String("from_account", strings.TrimSpace(blockAccount)),
zap.String("to_account", strings.TrimSpace(sourceAccount)),
zap.String("amount", strings.TrimSpace(amount.GetAmount())),
zap.String("currency", strings.TrimSpace(amount.GetCurrency())))
return entryRef, nil
}
func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (rail.LedgerTx, error) {
@@ -66,6 +205,12 @@ func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *mone
fromRail = model.RailLedger
toRail = ledgerStepToRail(payment.PaymentPlan, idx, destRail)
accountRef, contraRef, err = ledgerDebitAccount(payment)
if err == nil {
if blockRef := ledgerBlockAccountIfConfirmed(payment); blockRef != "" {
accountRef = blockRef
contraRef = ""
}
}
case model.RailOperationCredit:
fromRail = ledgerStepFromRail(payment.PaymentPlan, idx, sourceRail)
toRail = model.RailLedger
@@ -190,6 +335,48 @@ func ledgerDebitAccount(payment *model.Payment) (string, string, error) {
return "", "", merrors.InvalidArgument("ledger: source account is required")
}
func ledgerDebitAccountRef(payment *model.Payment) (string, error) {
account, _, err := ledgerDebitAccount(payment)
return account, err
}
func ledgerBlockAccount(payment *model.Payment) (string, error) {
if payment == nil {
return "", merrors.InvalidArgument("ledger: payment is required")
}
intent := payment.Intent
if intent.Source.Ledger != nil {
if ref := strings.TrimSpace(intent.Source.Ledger.ContraLedgerAccountRef); ref != "" {
return ref, nil
}
}
if ref := attributeLookup(intent.Attributes,
"ledger_block_account_ref",
"ledgerBlockAccountRef",
"ledger_hold_account_ref",
"ledgerHoldAccountRef",
"ledger_debit_contra_account_ref",
"ledgerDebitContraAccountRef",
); ref != "" {
return ref, nil
}
return "", merrors.InvalidArgument("ledger: block account is required")
}
func ledgerBlockAccountIfConfirmed(payment *model.Payment) string {
if payment == nil {
return ""
}
if !blockStepConfirmed(payment.PaymentPlan, payment.ExecutionPlan) {
return ""
}
ref, err := ledgerBlockAccount(payment)
if err != nil {
return ""
}
return ref
}
func ledgerCreditAccount(payment *model.Payment) (string, string, error) {
if payment == nil {
return "", "", merrors.InvalidArgument("ledger: payment is required")

View File

@@ -0,0 +1,51 @@
package orchestrator
import (
"context"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"go.uber.org/zap"
)
func (p *paymentExecutor) releasePaymentHold(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error {
if store == nil {
return errStorageUnavailable
}
if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 {
return nil
}
execPlan := ensureExecutionPlanForPlan(payment, payment.PaymentPlan)
if execPlan == nil || !blockStepConfirmed(payment.PaymentPlan, execPlan) {
return nil
}
execSteps := executionStepsByCode(execPlan)
execQuote := executionQuote(payment, nil)
for idx, step := range payment.PaymentPlan.Steps {
if step == nil || step.Action != model.RailOperationRelease {
continue
}
stepID := planStepID(step, idx)
execStep := execSteps[stepID]
if execStep == nil {
execStep = &model.ExecutionStep{Code: stepID}
execSteps[stepID] = execStep
if idx < len(execPlan.Steps) {
execPlan.Steps[idx] = execStep
}
}
status := executionStepStatus(execStep)
if status == executionStepStatusConfirmed {
p.logger.Debug("Payment step already confirmed, skipping", zap.String("step_id", stepID), zap.String("quutation", execQuote.QuoteRef))
continue
}
if _, err := p.executePlanStep(ctx, payment, step, execStep, execQuote, nil, idx); err != nil {
p.logger.Warn("Failed to execute payment step", zap.Error(err),
zap.String("step_id", stepID), zap.String("quutation", execQuote.QuoteRef))
return err
}
}
return p.persistPayment(ctx, store, payment)
}

View File

@@ -0,0 +1,107 @@
package orchestrator
import (
"context"
"testing"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
mo "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
)
func TestReleasePaymentHold_TransfersFromHoldAccount(t *testing.T) {
ctx := context.Background()
store := newStubPaymentsStore()
repo := &stubRepository{store: store}
var releaseReq *ledgerv1.TransferRequest
ledgerFake := &ledgerclient.Fake{
TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
releaseReq = req
return &ledgerv1.PostResponse{JournalEntryRef: "release-1"}, nil
},
}
svc := &Service{
logger: zap.NewNop(),
storage: repo,
deps: serviceDependencies{
ledger: ledgerDependency{
client: ledgerFake,
internal: ledgerFake,
},
},
}
executor := newPaymentExecutor(&svc.deps, svc.logger, svc)
payment := &model.Payment{
PaymentRef: "pay-release-1",
IdempotencyKey: "pay-release-1",
OrganizationBoundBase: mo.OrganizationBoundBase{
OrganizationRef: primitive.NewObjectID(),
},
Intent: model.PaymentIntent{
Kind: model.PaymentKindPayout,
Source: model.PaymentEndpoint{
Type: model.EndpointTypeManagedWallet,
ManagedWallet: &model.ManagedWalletEndpoint{
ManagedWalletRef: "wallet-src",
},
},
Attributes: map[string]string{
"ledger_debit_account_ref": "ledger:debit",
"ledger_block_account_ref": "ledger:block",
},
},
PaymentPlan: &model.PaymentPlan{
ID: "pay-release-1",
IdempotencyKey: "pay-release-1",
Steps: []*model.PaymentStep{
{StepID: "ledger_block", Rail: model.RailLedger, Action: model.RailOperationBlock, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
{StepID: "ledger_release", Rail: model.RailLedger, Action: model.RailOperationRelease, DependsOn: []string{"ledger_block"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}},
},
},
}
store.payments[payment.PaymentRef] = payment
execPlan := ensureExecutionPlanForPlan(payment, payment.PaymentPlan)
steps := executionStepsByCode(execPlan)
blockStep := steps["ledger_block"]
if blockStep == nil {
t.Fatalf("expected block step in execution plan")
}
setExecutionStepStatus(blockStep, executionStepStatusConfirmed)
if err := executor.releasePaymentHold(ctx, store, payment); err != nil {
t.Fatalf("releasePaymentHold error: %v", err)
}
if releaseReq == nil {
t.Fatalf("expected ledger release transfer")
}
if releaseReq.GetFromLedgerAccountRef() != "ledger:block" {
t.Fatalf("unexpected release from account: %s", releaseReq.GetFromLedgerAccountRef())
}
if releaseReq.GetToLedgerAccountRef() != "ledger:debit" {
t.Fatalf("unexpected release to account: %s", releaseReq.GetToLedgerAccountRef())
}
steps = executionStepsByCode(payment.ExecutionPlan)
releaseStep := steps["ledger_release"]
if releaseStep == nil {
t.Fatalf("expected release step in execution plan")
}
if executionStepStatus(releaseStep) != executionStepStatusConfirmed {
t.Fatalf("expected release step confirmed, got %s", executionStepStatus(releaseStep))
}
if releaseStep.TransferRef != "release-1" {
t.Fatalf("expected release transfer ref set, got %s", releaseStep.TransferRef)
}
}

View File

@@ -40,6 +40,36 @@ func (p *paymentExecutor) executePlanStep(ctx context.Context, payment *model.Pa
ensureExecutionRefs(payment).CreditEntryRef = ref
setExecutionStepStatus(execStep, executionStepStatusConfirmed)
return false, nil
case model.RailOperationBlock:
if step.Rail != model.RailLedger {
return false, merrors.InvalidArgument("payment plan: block requires ledger rail")
}
amount, err := requireMoney(cloneMoney(step.Amount), "ledger block amount")
if err != nil {
return false, err
}
ref, err := p.postLedgerBlock(ctx, payment, protoMoney(amount), planStepIdempotencyKey(payment, idx, step), idx)
if err != nil {
return false, err
}
execStep.TransferRef = strings.TrimSpace(ref)
setExecutionStepStatus(execStep, executionStepStatusConfirmed)
return false, nil
case model.RailOperationRelease:
if step.Rail != model.RailLedger {
return false, merrors.InvalidArgument("payment plan: release requires ledger rail")
}
amount, err := requireMoney(cloneMoney(step.Amount), "ledger release amount")
if err != nil {
return false, err
}
ref, err := p.postLedgerRelease(ctx, payment, protoMoney(amount), planStepIdempotencyKey(payment, idx, step), idx)
if err != nil {
return false, err
}
execStep.TransferRef = strings.TrimSpace(ref)
setExecutionStepStatus(execStep, executionStepStatusConfirmed)
return false, nil
case model.RailOperationFXConvert:
if err := p.applyFX(ctx, payment, quote, charges, paymentDescription(payment), cloneMetadata(payment.Metadata), ensureExecutionRefs(payment)); err != nil {
return false, err

View File

@@ -125,7 +125,7 @@ func TestDefaultPlanBuilder_BuildsPlanFromRoutes_CryptoToCard(t *testing.T) {
t.Fatalf("expected 6 steps, got %d", len(plan.Steps))
}
assertPlanStep(t, plan.Steps[0], "crypto_send", model.RailCrypto, model.RailOperationSend, "crypto-tron", "crypto-tron-1", "USDT", "100")
assertPlanStep(t, plan.Steps[0], "crypto_send", model.RailCrypto, model.RailOperationSend, "crypto-tron", "crypto-tron-1", "USDT", "95")
assertPlanStep(t, plan.Steps[1], "crypto_fee", model.RailCrypto, model.RailOperationFee, "crypto-tron", "crypto-tron-1", "USDT", "5")
assertPlanStep(t, plan.Steps[2], "crypto_observe", model.RailCrypto, model.RailOperationObserveConfirm, "crypto-tron", "crypto-tron-1", "", "")
assertPlanStep(t, plan.Steps[3], "ledger_credit", model.RailLedger, model.RailOperationCredit, "", "", "USDT", "95")

View File

@@ -160,6 +160,10 @@ func capabilityAllowsAction(cap model.RailCapabilities, action model.RailOperati
return cap.CanSendFee
case model.RailOperationObserveConfirm:
return cap.RequiresObserveConfirm
case model.RailOperationBlock:
return cap.CanBlock
case model.RailOperationRelease:
return cap.CanRelease
default:
return true
}

View File

@@ -69,6 +69,16 @@ func resolveSettlementAmount(payment *model.Payment, quote *orchestratorv1.Payme
return cloneMoney(fallback)
}
func resolveDebitAmount(payment *model.Payment, quote *orchestratorv1.PaymentQuote, fallback *paymenttypes.Money) *paymenttypes.Money {
if quote != nil && quote.GetDebitAmount() != nil {
return moneyFromProto(quote.GetDebitAmount())
}
if payment != nil && payment.LastQuote != nil {
return cloneMoney(payment.LastQuote.DebitAmount)
}
return cloneMoney(fallback)
}
func resolveFeeAmount(payment *model.Payment, quote *orchestratorv1.PaymentQuote) *paymenttypes.Money {
if quote != nil && quote.GetExpectedFeeTotal() != nil {
return moneyFromProto(quote.GetExpectedFeeTotal())

View File

@@ -15,7 +15,11 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
return nil, merrors.InvalidArgument("plan builder: plan template is required")
}
sourceAmount, err := requireMoney(cloneMoney(payment.Intent.Amount), "amount")
intentAmount, err := requireMoney(cloneMoney(payment.Intent.Amount), "amount")
if err != nil {
return nil, err
}
sourceAmount, err := requireMoney(resolveDebitAmount(payment, quote, intentAmount), "debit amount")
if err != nil {
return nil, err
}
@@ -81,7 +85,11 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment
Amount: cloneMoney(amount),
}
if action == model.RailOperationSend || action == model.RailOperationFee || action == model.RailOperationObserveConfirm {
needsGateway := action == model.RailOperationSend || action == model.RailOperationFee || action == model.RailOperationObserveConfirm
if (action == model.RailOperationBlock || action == model.RailOperationRelease) && tpl.Rail != model.RailLedger {
needsGateway = true
}
if needsGateway {
network := gatewayNetworkForRail(tpl.Rail, sourceRail, destRail, sourceNetwork, destNetwork)
instanceID := stepInstanceIDForRail(payment.Intent, tpl.Rail, sourceRail, destRail)
checkAmount := amount
@@ -129,6 +137,10 @@ func actionForOperation(operation string) (model.RailOperation, error) {
return model.RailOperationFee, nil
case "send", "payout.card", "payout.crypto", "payout.fiat", "payin.crypto", "payin.fiat", "fund.crypto", "fund.card":
return model.RailOperationSend, nil
case "block", "hold", "reserve", "ledger.block", "ledger.hold", "ledger.reserve":
return model.RailOperationBlock, nil
case "release", "unblock", "ledger.release":
return model.RailOperationRelease, nil
}
switch strings.ToUpper(strings.TrimSpace(operation)) {
@@ -144,6 +156,10 @@ func actionForOperation(operation string) (model.RailOperation, error) {
return model.RailOperationObserveConfirm, nil
case string(model.RailOperationFXConvert):
return model.RailOperationFXConvert, nil
case string(model.RailOperationBlock):
return model.RailOperationBlock, nil
case string(model.RailOperationRelease):
return model.RailOperationRelease, nil
}
return model.RailOperationUnspecified, merrors.InvalidArgument("plan builder: unsupported operation")
@@ -164,12 +180,20 @@ func stepAmountForAction(action model.RailOperation, rail, sourceRail, destRail
case model.RailOperationSend:
switch rail {
case sourceRail:
if feeRequired {
return cloneMoney(settlementAmount), nil
}
return cloneMoney(sourceAmount), nil
case destRail:
return cloneMoney(payoutAmount), nil
default:
return cloneMoney(settlementAmount), nil
}
case model.RailOperationBlock, model.RailOperationRelease:
if rail == model.RailLedger {
return cloneMoney(ledgerDebitAmount), nil
}
return cloneMoney(settlementAmount), nil
case model.RailOperationFee:
if !feeRequired {
return nil, nil
@@ -197,6 +221,9 @@ func stepInstanceIDForRail(intent model.PaymentIntent, rail, sourceRail, destRai
func observeAmountForRail(rail model.Rail, source, settlement, payout *paymenttypes.Money) *paymenttypes.Money {
switch rail {
case model.RailCrypto, model.RailFiatOnRamp:
if settlement != nil {
return settlement
}
if source != nil {
return source
}

View File

@@ -123,6 +123,14 @@ func (g *providerSettlementGateway) Observe(ctx context.Context, referenceID str
}, nil
}
func (g *providerSettlementGateway) Block(ctx context.Context, req rail.BlockRequest) (rail.RailResult, error) {
return rail.RailResult{}, merrors.NotImplemented("provider settlement gateway: block not supported")
}
func (g *providerSettlementGateway) Release(ctx context.Context, req rail.ReleaseRequest) (rail.RailResult, error) {
return rail.RailResult{}, merrors.NotImplemented("provider settlement gateway: release not supported")
}
func buildProviderSettlementDestination(req rail.TransferRequest) *chainv1.TransferDestination {
destRef := strings.TrimSpace(req.ToAccountID)
memo := strings.TrimSpace(req.DestinationMemo)

View File

@@ -2,6 +2,7 @@ package orchestrator
import (
"context"
"errors"
"strings"
"time"
@@ -132,10 +133,6 @@ func (s *Service) quoteFees(ctx context.Context, orgRef string, req *orchestrato
}
func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1.PaymentIntent) (*chainv1.EstimateTransferFeeResponse, error) {
if !s.deps.gateway.available() {
return nil, nil
}
req := &chainv1.EstimateTransferFeeRequest{
Amount: cloneProtoMoney(intent.GetAmount()),
}
@@ -160,7 +157,28 @@ func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1
}
}
resp, err := s.deps.gateway.client.EstimateTransferFee(ctx, req)
network := ""
if req.Asset != nil {
network = strings.ToUpper(strings.TrimSpace(req.Asset.GetChain()))
}
instanceID := strings.TrimSpace(intent.GetSource().GetInstanceId())
if instanceID == "" {
instanceID = strings.TrimSpace(intent.GetDestination().GetInstanceId())
}
client, _, err := s.resolveChainGatewayClient(ctx, network, moneyFromProto(req.Amount), []model.RailOperation{model.RailOperationSend}, instanceID, "")
if err != nil {
if errors.Is(err, merrors.ErrNoData) {
s.logger.Debug("network fee estimation skipped: gateway unavailable", zap.Error(err))
return nil, nil
}
s.logger.Warn("chain gateway resolution failed", zap.Error(err))
return nil, err
}
if client == nil {
return nil, nil
}
resp, err := client.EstimateTransferFee(ctx, req)
if err != nil {
s.logger.Warn("chain gateway fee estimation failed", zap.Error(err))
return nil, merrors.Internal("chain_gateway_fee_estimation_failed")

View File

@@ -12,6 +12,8 @@ type fakeRailGateway struct {
capabilities rail.RailCapabilities
sendFn func(context.Context, rail.TransferRequest) (rail.RailResult, error)
observeFn func(context.Context, string) (rail.ObserveResult, error)
blockFn func(context.Context, rail.BlockRequest) (rail.RailResult, error)
releaseFn func(context.Context, rail.ReleaseRequest) (rail.RailResult, error)
}
func (f *fakeRailGateway) Rail() string {
@@ -39,3 +41,17 @@ func (f *fakeRailGateway) Observe(ctx context.Context, referenceID string) (rail
}
return rail.ObserveResult{ReferenceID: referenceID, Status: rail.TransferStatusPending}, nil
}
func (f *fakeRailGateway) Block(ctx context.Context, req rail.BlockRequest) (rail.RailResult, error) {
if f.blockFn != nil {
return f.blockFn(ctx, req)
}
return rail.RailResult{ReferenceID: req.IdempotencyKey, Status: rail.TransferStatusPending}, nil
}
func (f *fakeRailGateway) Release(ctx context.Context, req rail.ReleaseRequest) (rail.RailResult, error) {
if f.releaseFn != nil {
return f.releaseFn(ctx, req)
}
return rail.RailResult{ReferenceID: req.ReferenceID, Status: rail.TransferStatusPending}, nil
}

View File

@@ -54,6 +54,7 @@ type serviceDependencies struct {
oracle oracleDependency
mntx mntxDependency
gatewayRegistry GatewayRegistry
gatewayInvokeResolver GatewayInvokeResolver
cardRoutes map[string]CardGatewayRoute
feeLedgerAccounts map[string]string
planBuilder PlanBuilder
@@ -92,7 +93,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option)
engine := defaultPaymentEngine{svc: svc}
svc.h.commands = newPaymentCommandFactory(engine, svc.logger)
svc.h.queries = newPaymentQueryHandler(svc.storage, svc.ensureRepository, svc.logger.Named("queries"))
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger.Named("events"), svc.submitCardPayout, svc.resumePaymentPlan)
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger.Named("events"), svc.submitCardPayout, svc.resumePaymentPlan, svc.releasePaymentHold)
svc.comp.executor = newPaymentExecutor(&svc.deps, svc.logger.Named("payment_executor"), svc)
svc.startGatewayConsumers()
@@ -107,7 +108,7 @@ func (s *Service) ensureHandlers() {
s.h.queries = newPaymentQueryHandler(s.storage, s.ensureRepository, s.logger.Named("queries"))
}
if s.h.events == nil {
s.h.events = newPaymentEventHandler(s.storage, s.ensureRepository, s.logger.Named("events"), s.submitCardPayout, s.resumePaymentPlan)
s.h.events = newPaymentEventHandler(s.storage, s.ensureRepository, s.logger.Named("events"), s.submitCardPayout, s.resumePaymentPlan, s.releasePaymentHold)
}
if s.comp.executor == nil {
s.comp.executor = newPaymentExecutor(&s.deps, s.logger.Named("payment_executor"), s)
@@ -199,3 +200,11 @@ func (s *Service) resumePaymentPlan(ctx context.Context, store storage.PaymentsS
s.ensureHandlers()
return s.comp.executor.executePaymentPlan(ctx, store, payment, nil)
}
func (s *Service) releasePaymentHold(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error {
if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 {
return nil
}
s.ensureHandlers()
return s.comp.executor.releasePaymentHold(ctx, store, payment)
}

View File

@@ -125,7 +125,7 @@ func TestExecutePayment_ChainFailure(t *testing.T) {
return rail.RailResult{}, errors.New("chain failure")
},
},
}, nil, nil, nil),
}, nil, nil, nil, nil),
gatewayRegistry: &stubGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
@@ -204,7 +204,7 @@ func TestProcessTransferUpdateHandler_Settled(t *testing.T) {
clock: testClock{now: time.Now()},
storage: &stubRepository{store: store},
}
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil)
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil, nil)
req := &orchestratorv1.ProcessTransferUpdateRequest{
Event: &chainv1.TransferStatusChangedEvent{
@@ -278,7 +278,7 @@ func TestProcessTransferUpdateHandler_CardFundingWaitsForSources(t *testing.T) {
setExecutionStepStatus(step, executionStepStatusSubmitted)
return nil
}
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, submit, nil)
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, submit, nil, nil)
req := &orchestratorv1.ProcessTransferUpdateRequest{
Event: &chainv1.TransferStatusChangedEvent{
@@ -349,7 +349,7 @@ func TestProcessDepositObservedHandler_MatchesPayment(t *testing.T) {
clock: testClock{now: time.Now()},
storage: &stubRepository{store: store},
}
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil)
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil, nil)
req := &orchestratorv1.ProcessDepositObservedRequest{
Event: &chainv1.WalletDepositObservedEvent{

View File

@@ -87,6 +87,8 @@ const (
RailOperationFee RailOperation = "FEE"
RailOperationObserveConfirm RailOperation = "OBSERVE_CONFIRM"
RailOperationFXConvert RailOperation = "FX_CONVERT"
RailOperationBlock RailOperation = "BLOCK"
RailOperationRelease RailOperation = "RELEASE"
)
// RailCapabilities are declared per gateway instance.
@@ -96,6 +98,8 @@ type RailCapabilities struct {
CanReadBalance bool `bson:"canReadBalance,omitempty" json:"canReadBalance,omitempty"`
CanSendFee bool `bson:"canSendFee,omitempty" json:"canSendFee,omitempty"`
RequiresObserveConfirm bool `bson:"requiresObserveConfirm,omitempty" json:"requiresObserveConfirm,omitempty"`
CanBlock bool `bson:"canBlock,omitempty" json:"canBlock,omitempty"`
CanRelease bool `bson:"canRelease,omitempty" json:"canRelease,omitempty"`
}
// LimitsOverride applies per-currency overrides for limits.
@@ -125,6 +129,7 @@ type GatewayInstanceDescriptor struct {
InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"`
Rail Rail `bson:"rail" json:"rail"`
Network string `bson:"network,omitempty" json:"network,omitempty"`
InvokeURI string `bson:"invokeUri,omitempty" json:"invokeUri,omitempty"`
Currencies []string `bson:"currencies,omitempty" json:"currencies,omitempty"`
Capabilities RailCapabilities `bson:"capabilities,omitempty" json:"capabilities,omitempty"`
Limits Limits `bson:"limits,omitempty" json:"limits,omitempty"`

View File

@@ -24,6 +24,8 @@ type RailCapabilities struct {
CanReadBalance bool
CanSendFee bool
RequiresObserveConfirm bool
CanBlock bool
CanRelease bool
}
// FeeBreakdown provides a gateway-level fee description.
@@ -49,6 +51,25 @@ type TransferRequest struct {
DestinationMemo string
}
// BlockRequest defines the inputs for reserving value through a rail gateway.
type BlockRequest struct {
OrganizationRef string
AccountID string
Currency string
Amount string
IdempotencyKey string
Metadata map[string]string
ClientReference string
}
// ReleaseRequest defines the inputs for releasing a prior block.
type ReleaseRequest struct {
ReferenceID string
IdempotencyKey string
Metadata map[string]string
ClientReference string
}
// RailResult reports the outcome of a rail gateway operation.
type RailResult struct {
ReferenceID string
@@ -80,4 +101,6 @@ type RailGateway interface {
Capabilities() RailCapabilities
Send(ctx context.Context, req TransferRequest) (RailResult, error)
Observe(ctx context.Context, referenceID string) (ObserveResult, error)
Block(ctx context.Context, req BlockRequest) (RailResult, error)
Release(ctx context.Context, req ReleaseRequest) (RailResult, error)
}

View File

@@ -12,6 +12,7 @@ import (
type InternalLedger interface {
ReadBalance(ctx context.Context, accountID string) (*moneyv1.Money, error)
CreateTransaction(ctx context.Context, tx LedgerTx) (string, error)
TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error)
HoldBalance(ctx context.Context, accountID string, amount string) error
}

View File

@@ -45,6 +45,8 @@ enum RailOperation {
RAIL_OPERATION_FEE = 4;
RAIL_OPERATION_OBSERVE_CONFIRM = 5;
RAIL_OPERATION_FX_CONVERT = 6;
RAIL_OPERATION_BLOCK = 7;
RAIL_OPERATION_RELEASE = 8;
}
// Limits in minor units, e.g. cents
@@ -124,6 +126,8 @@ message RailCapabilities {
bool can_read_balance = 3;
bool can_send_fee = 4;
bool requires_observe_confirm = 5;
bool can_block = 6;
bool can_release = 7;
}
message LimitsOverride {

View File

@@ -7,6 +7,7 @@ environment:
# Add regular dependencies here.
dependencies:
analyzer: ^10.0.0
json_annotation: ^4.9.0
http: ^1.1.0
provider: ^6.0.5

View File

@@ -13,6 +13,7 @@ import 'package:pshared/provider/permissions.dart';
import 'package:pshared/provider/account.dart';
import 'package:pshared/provider/organizations.dart';
import 'package:pshared/provider/accounts/employees.dart';
import 'package:pshared/provider/recipient/pmethods.dart';
import 'package:pshared/provider/recipient/provider.dart';
import 'package:pshared/provider/payment/wallets.dart';
import 'package:pshared/provider/invitations.dart';

View File

@@ -75,7 +75,7 @@ class InvitationFormFields extends StatelessWidget {
SizedBox(
width: 260,
child: DropdownButtonFormField<String>(
value: selectedRoleRef,
initialValue: selectedRoleRef,
items: roles.map((role) => DropdownMenuItem(
value: role.storable.id,
child: Text(role.describable.name),

View File

@@ -28,6 +28,7 @@ environment:
# the latest version available on pub.dev. To see which dependencies have newer
# versions available, run `flutter pub outdated`.
dependencies:
analyzer: 9.0.0
amplitude_flutter: ^4.0.1
flutter:
sdk: flutter
@@ -140,8 +141,3 @@ flutter_launcher_icons:
web:
generate: true
image_path: "resources/logo.png"
# temporary
dependency_overrides:
analyzer: ">=9.0.0 <10.0.0"