diff --git a/api/gateway/chain/client/rail_gateway.go b/api/gateway/chain/client/rail_gateway.go index 2f427f1..a3ead4f 100644 --- a/api/gateway/chain/client/rail_gateway.go +++ b/api/gateway/chain/client/rail_gateway.go @@ -147,6 +147,14 @@ func (g *chainRailGateway) Observe(ctx context.Context, referenceID string) (rai }, nil } +func (g *chainRailGateway) Block(ctx context.Context, req rail.BlockRequest) (rail.RailResult, error) { + return rail.RailResult{}, merrors.NotImplemented("chain gateway: block not supported") +} + +func (g *chainRailGateway) Release(ctx context.Context, req rail.ReleaseRequest) (rail.RailResult, error) { + return rail.RailResult{}, merrors.NotImplemented("chain gateway: release not supported") +} + func (g *chainRailGateway) resolveDestination(ctx context.Context, destRef, memo string) (*chainv1.TransferDestination, error) { managed, err := g.isManagedWallet(ctx, destRef) if err != nil { diff --git a/api/ledger/client/client_test.go b/api/ledger/client/client_test.go new file mode 100644 index 0000000..1e64006 --- /dev/null +++ b/api/ledger/client/client_test.go @@ -0,0 +1,90 @@ +package client + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" + connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" + ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" + "google.golang.org/grpc" +) + +type stubConnector struct { + submitFn func(ctx context.Context, req *connectorv1.SubmitOperationRequest) (*connectorv1.SubmitOperationResponse, error) +} + +func (s *stubConnector) OpenAccount(context.Context, *connectorv1.OpenAccountRequest, ...grpc.CallOption) (*connectorv1.OpenAccountResponse, error) { + return nil, nil +} + +func (s *stubConnector) GetAccount(context.Context, *connectorv1.GetAccountRequest, ...grpc.CallOption) (*connectorv1.GetAccountResponse, error) { + return nil, nil +} + +func (s *stubConnector) ListAccounts(context.Context, *connectorv1.ListAccountsRequest, ...grpc.CallOption) (*connectorv1.ListAccountsResponse, error) { + return nil, nil +} + +func (s *stubConnector) GetBalance(context.Context, *connectorv1.GetBalanceRequest, ...grpc.CallOption) (*connectorv1.GetBalanceResponse, error) { + return nil, nil +} + +func (s *stubConnector) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOperationRequest, _ ...grpc.CallOption) (*connectorv1.SubmitOperationResponse, error) { + if s.submitFn != nil { + return s.submitFn(ctx, req) + } + return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{OperationId: "op-1"}}, nil +} + +func (s *stubConnector) GetOperation(context.Context, *connectorv1.GetOperationRequest, ...grpc.CallOption) (*connectorv1.GetOperationResponse, error) { + return nil, nil +} + +func (s *stubConnector) ListOperations(context.Context, *connectorv1.ListOperationsRequest, ...grpc.CallOption) (*connectorv1.ListOperationsResponse, error) { + return nil, nil +} + +func TestTransferInternal_SubmitsTransferOperation(t *testing.T) { + ctx := context.Background() + + var captured *connectorv1.Operation + stub := &stubConnector{ + submitFn: func(ctx context.Context, req *connectorv1.SubmitOperationRequest) (*connectorv1.SubmitOperationResponse, error) { + captured = req.GetOperation() + return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{OperationId: "op-1"}}, nil + }, + } + + client := NewWithClient(Config{}, stub) + resp, err := client.TransferInternal(ctx, &ledgerv1.TransferRequest{ + IdempotencyKey: "id-1", + OrganizationRef: "org-1", + FromLedgerAccountRef: "acct-from", + ToLedgerAccountRef: "acct-to", + Money: &moneyv1.Money{Currency: "USD", Amount: "10"}, + Description: "hold", + }) + + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, captured) + + assert.Equal(t, connectorv1.OperationType_TRANSFER, captured.GetType()) + assert.Equal(t, "id-1", captured.GetIdempotencyKey()) + assert.Equal(t, "acct-from", captured.GetFrom().GetAccount().GetAccountId()) + assert.Equal(t, "acct-to", captured.GetTo().GetAccount().GetAccountId()) + assert.Equal(t, ledgerConnectorID, captured.GetFrom().GetAccount().GetConnectorId()) + assert.Equal(t, ledgerConnectorID, captured.GetTo().GetAccount().GetConnectorId()) + assert.Equal(t, "USD", captured.GetMoney().GetCurrency()) + assert.Equal(t, "10", captured.GetMoney().GetAmount()) + + params := captured.GetParams().AsMap() + assert.Equal(t, "org-1", params["organization_ref"]) + assert.Equal(t, "hold", params["description"]) + + assert.Equal(t, "op-1", resp.GetJournalEntryRef()) + assert.Equal(t, ledgerv1.EntryType_ENTRY_TRANSFER, resp.GetEntryType()) +} diff --git a/api/payments/orchestrator/config.yml b/api/payments/orchestrator/config.yml index 5f00e3a..fff2c07 100644 --- a/api/payments/orchestrator/config.yml +++ b/api/payments/orchestrator/config.yml @@ -34,42 +34,7 @@ messaging: reconnect_wait: 5 buffer_size: 1024 -fees: - address: "sendico_billing_fees:50060" - dial_timeout_seconds: 5 - call_timeout_seconds: 3 - insecure: true - -ledger: - address: "sendico_ledger:50052" - dial_timeout_seconds: 5 - call_timeout_seconds: 3 - insecure: true - -gateway: - address: "sendico_chain_gateway:50070" - dial_timeout_seconds: 5 - call_timeout_seconds: 3 - insecure: true - -payment_gateway: - address: "sendico_tgsettle_gateway:50080" - dial_timeout_seconds: 5 - call_timeout_seconds: 3 - insecure: true - -mntx: - address: "sendico_mntx_gateway:50075" - dial_timeout_seconds: 5 - call_timeout_seconds: 15 - insecure: true - -oracle: - address: "sendico_fx_oracle:50051" - dial_timeout_seconds: 5 - call_timeout_seconds: 3 - insecure: true - +# Service endpoints are sourced from discovery; no static overrides. card_gateways: monetix: funding_address: "TGBDXEg9rxSqGFJDcb889zqTjDwx1bmLRF" @@ -78,14 +43,4 @@ card_gateways: fee_ledger_accounts: monetix: "ledger:fees:monetix" -# gateway_instances: -# - id: "crypto-tron" -# rail: "CRYPTO" -# network: "TRON" -# currencies: ["USDT"] -# capabilities: -# can_pay_out: true -# can_send_fee: true -# limits: -# min_amount: "0" -# max_amount: "100000" +# Gateway instances and capabilities are sourced from service discovery. diff --git a/api/payments/orchestrator/internal/server/internal/builders.go b/api/payments/orchestrator/internal/server/internal/builders.go index 636dde8..1b5c892 100644 --- a/api/payments/orchestrator/internal/server/internal/builders.go +++ b/api/payments/orchestrator/internal/server/internal/builders.go @@ -48,10 +48,19 @@ func buildFeeLedgerAccounts(src map[string]string) map[string]string { } func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, registry *discovery.Registry) orchestrator.GatewayRegistry { - static := buildGatewayInstances(logger, src) - staticRegistry := orchestrator.NewGatewayRegistry(logger, static) - discoveryRegistry := orchestrator.NewDiscoveryGatewayRegistry(logger, registry) - return orchestrator.NewCompositeGatewayRegistry(logger, staticRegistry, discoveryRegistry) + if logger != nil { + logger = logger.Named("gateway_registry") + } + if len(src) > 0 && logger != nil { + logger.Warn("Static gateway configuration ignored; using discovery registry only") + } + if registry == nil { + if logger != nil { + logger.Warn("Discovery registry unavailable; gateway routing disabled") + } + return nil + } + return orchestrator.NewDiscoveryGatewayRegistry(logger, registry) } func buildRailGateways(chainClient chainclient.Client, paymentGatewayClient chainclient.Client, src []gatewayInstanceConfig) map[string]rail.RailGateway { @@ -76,6 +85,8 @@ func buildRailGateways(chainClient chainclient.Client, paymentGatewayClient chai CanReadBalance: inst.Capabilities.CanReadBalance, CanSendFee: inst.Capabilities.CanSendFee, RequiresObserveConfirm: inst.Capabilities.RequiresObserveConfirm, + CanBlock: inst.Capabilities.CanBlock, + CanRelease: inst.Capabilities.CanRelease, }, } switch inst.Rail { @@ -135,6 +146,8 @@ func buildGatewayInstances(logger mlogger.Logger, src []gatewayInstanceConfig) [ CanReadBalance: cfg.Capabilities.CanReadBalance, CanSendFee: cfg.Capabilities.CanSendFee, RequiresObserveConfirm: cfg.Capabilities.RequiresObserveConfirm, + CanBlock: cfg.Capabilities.CanBlock, + CanRelease: cfg.Capabilities.CanRelease, }, Limits: buildGatewayLimits(cfg.Limits), Version: strings.TrimSpace(cfg.Version), diff --git a/api/payments/orchestrator/internal/server/internal/clients.go b/api/payments/orchestrator/internal/server/internal/clients.go index afd8652..d0ff991 100644 --- a/api/payments/orchestrator/internal/server/internal/clients.go +++ b/api/payments/orchestrator/internal/server/internal/clients.go @@ -155,22 +155,8 @@ func (i *Imp) initOracleClient(cfg clientConfig) oracleclient.Client { } func (i *Imp) closeClients() { - if i.ledgerClient != nil { - _ = i.ledgerClient.Close() - } - if i.gatewayClient != nil { - _ = i.gatewayClient.Close() - } - if i.paymentGatewayClient != nil { - _ = i.paymentGatewayClient.Close() - } - if i.mntxClient != nil { - _ = i.mntxClient.Close() - } - if i.oracleClient != nil { - _ = i.oracleClient.Close() - } - if i.feesConn != nil { - _ = i.feesConn.Close() + if i.discoveryClients != nil { + i.discoveryClients.Close() + i.discoveryClients = nil } } diff --git a/api/payments/orchestrator/internal/server/internal/config.go b/api/payments/orchestrator/internal/server/internal/config.go index 9ee1f0a..121ef82 100644 --- a/api/payments/orchestrator/internal/server/internal/config.go +++ b/api/payments/orchestrator/internal/server/internal/config.go @@ -54,6 +54,8 @@ type gatewayCapabilitiesConfig struct { CanReadBalance bool `yaml:"can_read_balance"` CanSendFee bool `yaml:"can_send_fee"` RequiresObserveConfirm bool `yaml:"requires_observe_confirm"` + CanBlock bool `yaml:"can_block"` + CanRelease bool `yaml:"can_release"` } type limitsConfig struct { diff --git a/api/payments/orchestrator/internal/server/internal/dependencies.go b/api/payments/orchestrator/internal/server/internal/dependencies.go index 37259c9..15b948b 100644 --- a/api/payments/orchestrator/internal/server/internal/dependencies.go +++ b/api/payments/orchestrator/internal/server/internal/dependencies.go @@ -2,7 +2,6 @@ package serverimp import ( oracleclient "github.com/tech/sendico/fx/oracle/client" - chainclient "github.com/tech/sendico/gateway/chain/client" mntxclient "github.com/tech/sendico/gateway/mntx/client" ledgerclient "github.com/tech/sendico/ledger/client" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator" @@ -10,47 +9,28 @@ import ( ) type orchestratorDeps struct { - feesClient feesv1.FeeEngineClient - ledgerClient ledgerclient.Client - gatewayClient chainclient.Client - paymentGatewayClient chainclient.Client - mntxClient mntxclient.Client - oracleClient oracleclient.Client + feesClient feesv1.FeeEngineClient + ledgerClient ledgerclient.Client + mntxClient mntxclient.Client + oracleClient oracleclient.Client + gatewayInvokeResolver orchestrator.GatewayInvokeResolver } func (i *Imp) initDependencies(cfg *config) *orchestratorDeps { deps := &orchestratorDeps{} - if cfg == nil { + if i.discoveryReg == nil { + if i.logger != nil { + i.logger.Warn("Discovery registry unavailable; downstream clients disabled") + } return deps } - deps.feesClient, i.feesConn = i.initFeesClient(cfg.Fees) - - deps.ledgerClient = i.initLedgerClient(cfg.Ledger) - if deps.ledgerClient != nil { - i.ledgerClient = deps.ledgerClient - } - - deps.gatewayClient = i.initGatewayClient(cfg.Gateway) - if deps.gatewayClient != nil { - i.gatewayClient = deps.gatewayClient - } - - deps.paymentGatewayClient = i.initPaymentGatewayClient(cfg.PaymentGateway) - if deps.paymentGatewayClient != nil { - i.paymentGatewayClient = deps.paymentGatewayClient - } - - deps.mntxClient = i.initMntxClient(cfg.Mntx) - if deps.mntxClient != nil { - i.mntxClient = deps.mntxClient - } - - deps.oracleClient = i.initOracleClient(cfg.Oracle) - if deps.oracleClient != nil { - i.oracleClient = deps.oracleClient - } - + i.discoveryClients = newDiscoveryClientResolver(i.logger, i.discoveryReg) + deps.feesClient = &discoveryFeeClient{resolver: i.discoveryClients} + deps.ledgerClient = &discoveryLedgerClient{resolver: i.discoveryClients} + deps.oracleClient = &discoveryOracleClient{resolver: i.discoveryClients} + deps.mntxClient = &discoveryMntxClient{resolver: i.discoveryClients} + deps.gatewayInvokeResolver = discoveryGatewayInvokeResolver{resolver: i.discoveryClients} return deps } @@ -65,21 +45,15 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest if deps.ledgerClient != nil { opts = append(opts, orchestrator.WithLedgerClient(deps.ledgerClient)) } - if deps.gatewayClient != nil { - opts = append(opts, orchestrator.WithChainGatewayClient(deps.gatewayClient)) - } - if deps.paymentGatewayClient != nil { - opts = append(opts, orchestrator.WithProviderSettlementGatewayClient(deps.paymentGatewayClient)) - } - if railGateways := buildRailGateways(deps.gatewayClient, deps.paymentGatewayClient, cfg.GatewayInstances); len(railGateways) > 0 { - opts = append(opts, orchestrator.WithRailGateways(railGateways)) - } if deps.mntxClient != nil { opts = append(opts, orchestrator.WithMntxGateway(deps.mntxClient)) } if deps.oracleClient != nil { opts = append(opts, orchestrator.WithOracleClient(deps.oracleClient)) } + if deps.gatewayInvokeResolver != nil { + opts = append(opts, orchestrator.WithGatewayInvokeResolver(deps.gatewayInvokeResolver)) + } if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 { opts = append(opts, orchestrator.WithCardGatewayRoutes(routes)) } diff --git a/api/payments/orchestrator/internal/server/internal/discovery_clients.go b/api/payments/orchestrator/internal/server/internal/discovery_clients.go new file mode 100644 index 0000000..9d7264d --- /dev/null +++ b/api/payments/orchestrator/internal/server/internal/discovery_clients.go @@ -0,0 +1,477 @@ +package serverimp + +import ( + "context" + "fmt" + "net" + "net/url" + "sort" + "strings" + "sync" + "time" + + oracleclient "github.com/tech/sendico/fx/oracle/client" + chainclient "github.com/tech/sendico/gateway/chain/client" + mntxclient "github.com/tech/sendico/gateway/mntx/client" + ledgerclient "github.com/tech/sendico/ledger/client" + "github.com/tech/sendico/pkg/discovery" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" + "github.com/tech/sendico/pkg/mservice" + feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +const discoveryLogThrottle = 30 * time.Second + +var ( + feesServiceNames = []string{"BILLING_FEES", string(mservice.FeePlans)} + ledgerServiceNames = []string{"LEDGER", string(mservice.Ledger)} + oracleServiceNames = []string{"FX_ORACLE", string(mservice.FXOracle)} + mntxServiceNames = []string{"CARD_PAYOUT_RAIL_GATEWAY", string(mservice.MntxGateway)} +) + +type discoveryEndpoint struct { + address string + insecure bool + raw string +} + +func (e discoveryEndpoint) key() string { + return fmt.Sprintf("%s|%t", e.address, e.insecure) +} + +type discoveryClientResolver struct { + logger mlogger.Logger + registry *discovery.Registry + + mu sync.Mutex + + feesConn *grpc.ClientConn + feesEndpoint discoveryEndpoint + + ledgerClient ledgerclient.Client + ledgerEndpoint discoveryEndpoint + + oracleClient oracleclient.Client + oracleEndpoint discoveryEndpoint + + mntxClient mntxclient.Client + mntxEndpoint discoveryEndpoint + + chainClients map[string]chainclient.Client + + lastSelection map[string]string + lastMissing map[string]time.Time +} + +func newDiscoveryClientResolver(logger mlogger.Logger, registry *discovery.Registry) *discoveryClientResolver { + if logger != nil { + logger = logger.Named("discovery_clients") + } + return &discoveryClientResolver{ + logger: logger, + registry: registry, + chainClients: map[string]chainclient.Client{}, + lastSelection: map[string]string{}, + lastMissing: map[string]time.Time{}, + } +} + +func (r *discoveryClientResolver) Close() { + r.mu.Lock() + defer r.mu.Unlock() + if r.feesConn != nil { + _ = r.feesConn.Close() + r.feesConn = nil + } + if r.ledgerClient != nil { + _ = r.ledgerClient.Close() + r.ledgerClient = nil + } + if r.oracleClient != nil { + _ = r.oracleClient.Close() + r.oracleClient = nil + } + if r.mntxClient != nil { + _ = r.mntxClient.Close() + r.mntxClient = nil + } + for key, client := range r.chainClients { + if client != nil { + _ = client.Close() + } + delete(r.chainClients, key) + } +} + +func (r *discoveryClientResolver) FeesAvailable() bool { + _, ok := r.findEntry("fees", feesServiceNames, "", "") + return ok +} + +func (r *discoveryClientResolver) LedgerAvailable() bool { + _, ok := r.findEntry("ledger", ledgerServiceNames, "", "") + return ok +} + +func (r *discoveryClientResolver) OracleAvailable() bool { + _, ok := r.findEntry("oracle", oracleServiceNames, "", "") + return ok +} + +func (r *discoveryClientResolver) MntxAvailable() bool { + _, ok := r.findEntry("mntx", mntxServiceNames, "", "") + return ok +} + +func (r *discoveryClientResolver) FeesClient(ctx context.Context) (feesv1.FeeEngineClient, error) { + entry, ok := r.findEntry("fees", feesServiceNames, "", "") + if !ok { + return nil, merrors.NoData("discovery: fees service unavailable") + } + endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI) + if err != nil { + r.logMissing("fees", "invalid fees invoke uri", entry.InvokeURI, err) + return nil, err + } + + r.mu.Lock() + defer r.mu.Unlock() + + if r.feesConn == nil || r.feesEndpoint.key() != endpoint.key() || r.feesEndpoint.address != endpoint.address { + if r.feesConn != nil { + _ = r.feesConn.Close() + r.feesConn = nil + } + conn, dialErr := dialGrpc(ctx, endpoint) + if dialErr != nil { + r.logMissing("fees", "failed to dial fees service", endpoint.raw, dialErr) + return nil, dialErr + } + r.feesConn = conn + r.feesEndpoint = endpoint + } + + return feesv1.NewFeeEngineClient(r.feesConn), nil +} + +func (r *discoveryClientResolver) LedgerClient(ctx context.Context) (ledgerclient.Client, error) { + entry, ok := r.findEntry("ledger", ledgerServiceNames, "", "") + if !ok { + return nil, merrors.NoData("discovery: ledger service unavailable") + } + endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI) + if err != nil { + r.logMissing("ledger", "invalid ledger invoke uri", entry.InvokeURI, err) + return nil, err + } + + r.mu.Lock() + defer r.mu.Unlock() + + if r.ledgerClient == nil || r.ledgerEndpoint.key() != endpoint.key() || r.ledgerEndpoint.address != endpoint.address { + if r.ledgerClient != nil { + _ = r.ledgerClient.Close() + r.ledgerClient = nil + } + client, dialErr := ledgerclient.New(ctx, ledgerclient.Config{ + Address: endpoint.address, + Insecure: endpoint.insecure, + }) + if dialErr != nil { + r.logMissing("ledger", "failed to dial ledger service", endpoint.raw, dialErr) + return nil, dialErr + } + r.ledgerClient = client + r.ledgerEndpoint = endpoint + } + + return r.ledgerClient, nil +} + +func (r *discoveryClientResolver) OracleClient(ctx context.Context) (oracleclient.Client, error) { + entry, ok := r.findEntry("oracle", oracleServiceNames, "", "") + if !ok { + return nil, merrors.NoData("discovery: oracle service unavailable") + } + endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI) + if err != nil { + r.logMissing("oracle", "invalid oracle invoke uri", entry.InvokeURI, err) + return nil, err + } + + r.mu.Lock() + defer r.mu.Unlock() + + if r.oracleClient == nil || r.oracleEndpoint.key() != endpoint.key() || r.oracleEndpoint.address != endpoint.address { + if r.oracleClient != nil { + _ = r.oracleClient.Close() + r.oracleClient = nil + } + client, dialErr := oracleclient.New(ctx, oracleclient.Config{ + Address: endpoint.address, + Insecure: endpoint.insecure, + }) + if dialErr != nil { + r.logMissing("oracle", "failed to dial oracle service", endpoint.raw, dialErr) + return nil, dialErr + } + r.oracleClient = client + r.oracleEndpoint = endpoint + } + + return r.oracleClient, nil +} + +func (r *discoveryClientResolver) MntxClient(ctx context.Context) (mntxclient.Client, error) { + entry, ok := r.findEntry("mntx", mntxServiceNames, "", "") + if !ok { + return nil, merrors.NoData("discovery: mntx service unavailable") + } + endpoint, err := parseDiscoveryEndpoint(entry.InvokeURI) + if err != nil { + r.logMissing("mntx", "invalid mntx invoke uri", entry.InvokeURI, err) + return nil, err + } + + r.mu.Lock() + defer r.mu.Unlock() + + if r.mntxClient == nil || r.mntxEndpoint.key() != endpoint.key() || r.mntxEndpoint.address != endpoint.address { + if r.mntxClient != nil { + _ = r.mntxClient.Close() + r.mntxClient = nil + } + if !endpoint.insecure && r.logger != nil { + r.logger.Warn("Mntx gateway does not support TLS, falling back to insecure transport", + zap.String("invoke_uri", endpoint.raw)) + } + client, dialErr := mntxclient.New(ctx, mntxclient.Config{ + Address: endpoint.address, + }) + if dialErr != nil { + r.logMissing("mntx", "failed to dial mntx service", endpoint.raw, dialErr) + return nil, dialErr + } + r.mntxClient = client + r.mntxEndpoint = endpoint + } + + return r.mntxClient, nil +} + +func (r *discoveryClientResolver) ChainClient(ctx context.Context, invokeURI string) (chainclient.Client, error) { + endpoint, err := parseDiscoveryEndpoint(invokeURI) + if err != nil { + r.logMissing("chain", "invalid chain gateway invoke uri", invokeURI, err) + return nil, err + } + + r.mu.Lock() + defer r.mu.Unlock() + + if client, ok := r.chainClients[endpoint.key()]; ok && client != nil { + return client, nil + } + + client, dialErr := chainclient.New(ctx, chainclient.Config{ + Address: endpoint.address, + Insecure: endpoint.insecure, + }) + if dialErr != nil { + r.logMissing("chain", "failed to dial chain gateway", endpoint.raw, dialErr) + return nil, dialErr + } + r.chainClients[endpoint.key()] = client + return client, nil +} + +func (r *discoveryClientResolver) PaymentGatewayClient(ctx context.Context, invokeURI string) (chainclient.Client, error) { + endpoint, err := parseDiscoveryEndpoint(invokeURI) + if err != nil { + r.logMissing("payment_gateway", "invalid payment gateway invoke uri", invokeURI, err) + return nil, err + } + + r.mu.Lock() + defer r.mu.Unlock() + + if client, ok := r.chainClients[endpoint.key()]; ok && client != nil { + return client, nil + } + + client, dialErr := chainclient.New(ctx, chainclient.Config{ + Address: endpoint.address, + Insecure: endpoint.insecure, + }) + if dialErr != nil { + r.logMissing("payment_gateway", "failed to dial payment gateway", endpoint.raw, dialErr) + return nil, dialErr + } + r.chainClients[endpoint.key()] = client + return client, nil +} + +func (r *discoveryClientResolver) findEntry(key string, services []string, rail string, network string) (*discovery.RegistryEntry, bool) { + if r == nil || r.registry == nil { + r.logMissing(key, "discovery registry unavailable", "", nil) + return nil, false + } + + entries := r.registry.List(time.Now(), true) + matches := make([]discovery.RegistryEntry, 0) + for _, entry := range entries { + if !matchesService(entry.Service, services) { + continue + } + if rail != "" && !strings.EqualFold(strings.TrimSpace(entry.Rail), rail) { + continue + } + if network != "" && !strings.EqualFold(strings.TrimSpace(entry.Network), network) { + continue + } + matches = append(matches, entry) + } + + if len(matches) == 0 { + r.logMissing(key, "discovery entry missing", "", nil) + return nil, false + } + + sort.Slice(matches, func(i, j int) bool { + if matches[i].RoutingPriority != matches[j].RoutingPriority { + return matches[i].RoutingPriority > matches[j].RoutingPriority + } + if matches[i].ID != matches[j].ID { + return matches[i].ID < matches[j].ID + } + return matches[i].InstanceID < matches[j].InstanceID + }) + + entry := matches[0] + entryKey := discoveryEntryKey(entry) + r.logSelection(key, entryKey, entry) + return &entry, true +} + +func (r *discoveryClientResolver) logSelection(key, entryKey string, entry discovery.RegistryEntry) { + if r.logger == nil { + return + } + r.mu.Lock() + last := r.lastSelection[key] + if last == entryKey { + r.mu.Unlock() + return + } + r.lastSelection[key] = entryKey + r.mu.Unlock() + r.logger.Info("Discovery endpoint selected", + zap.String("service_key", key), + zap.String("service", entry.Service), + zap.String("rail", entry.Rail), + zap.String("network", entry.Network), + zap.String("entry_id", entry.ID), + zap.String("instance_id", entry.InstanceID), + zap.String("invoke_uri", entry.InvokeURI)) +} + +func (r *discoveryClientResolver) logMissing(key, message, invokeURI string, err error) { + if r.logger == nil { + return + } + now := time.Now() + r.mu.Lock() + last := r.lastMissing[key] + if !last.IsZero() && now.Sub(last) < discoveryLogThrottle { + r.mu.Unlock() + return + } + r.lastMissing[key] = now + r.mu.Unlock() + fields := []zap.Field{zap.String("service_key", key)} + if invokeURI != "" { + fields = append(fields, zap.String("invoke_uri", strings.TrimSpace(invokeURI))) + } + if err != nil { + fields = append(fields, zap.Error(err)) + } + r.logger.Warn(message, fields...) +} + +func discoveryEntryKey(entry discovery.RegistryEntry) string { + return fmt.Sprintf("%s|%s|%s|%s|%s|%s", + strings.TrimSpace(entry.Service), + strings.TrimSpace(entry.ID), + strings.TrimSpace(entry.InstanceID), + strings.TrimSpace(entry.Rail), + strings.TrimSpace(entry.Network), + strings.TrimSpace(entry.InvokeURI)) +} + +func matchesService(service string, candidates []string) bool { + service = strings.TrimSpace(service) + if service == "" || len(candidates) == 0 { + return false + } + for _, candidate := range candidates { + if strings.EqualFold(service, strings.TrimSpace(candidate)) { + return true + } + } + return false +} + +func parseDiscoveryEndpoint(raw string) (discoveryEndpoint, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri is required") + } + parsed, err := url.Parse(raw) + if err != nil || parsed.Scheme == "" { + if _, _, splitErr := net.SplitHostPort(raw); splitErr != nil { + if err != nil { + return discoveryEndpoint{}, err + } + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") + } + return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil + } + + scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme)) + switch scheme { + case "grpc": + address := strings.TrimSpace(parsed.Host) + if _, _, splitErr := net.SplitHostPort(address); splitErr != nil { + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") + } + return discoveryEndpoint{address: address, insecure: true, raw: raw}, nil + case "grpcs": + address := strings.TrimSpace(parsed.Host) + if _, _, splitErr := net.SplitHostPort(address); splitErr != nil { + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") + } + return discoveryEndpoint{address: address, insecure: false, raw: raw}, nil + case "dns", "passthrough": + return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil + default: + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: unsupported invoke uri scheme") + } +} + +func dialGrpc(ctx context.Context, endpoint discoveryEndpoint) (*grpc.ClientConn, error) { + dialOpts := []grpc.DialOption{} + if endpoint.insecure { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(nil))) + } + if ctx == nil { + ctx = context.Background() + } + return grpc.DialContext(ctx, endpoint.address, dialOpts...) +} diff --git a/api/payments/orchestrator/internal/server/internal/discovery_resolvers.go b/api/payments/orchestrator/internal/server/internal/discovery_resolvers.go new file mode 100644 index 0000000..0aa1659 --- /dev/null +++ b/api/payments/orchestrator/internal/server/internal/discovery_resolvers.go @@ -0,0 +1,19 @@ +package serverimp + +import ( + "context" + + chainclient "github.com/tech/sendico/gateway/chain/client" + "github.com/tech/sendico/pkg/merrors" +) + +type discoveryGatewayInvokeResolver struct { + resolver *discoveryClientResolver +} + +func (r discoveryGatewayInvokeResolver) Resolve(ctx context.Context, invokeURI string) (chainclient.Client, error) { + if r.resolver == nil { + return nil, merrors.Internal("discovery gateway resolver unavailable") + } + return r.resolver.ChainClient(ctx, invokeURI) +} diff --git a/api/payments/orchestrator/internal/server/internal/discovery_wrappers.go b/api/payments/orchestrator/internal/server/internal/discovery_wrappers.go new file mode 100644 index 0000000..7652ad7 --- /dev/null +++ b/api/payments/orchestrator/internal/server/internal/discovery_wrappers.go @@ -0,0 +1,370 @@ +package serverimp + +import ( + "context" + + oracleclient "github.com/tech/sendico/fx/oracle/client" + chainclient "github.com/tech/sendico/gateway/chain/client" + mntxclient "github.com/tech/sendico/gateway/mntx/client" + ledgerclient "github.com/tech/sendico/ledger/client" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/payments/rail" + feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1" + moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" + chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1" + mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" + ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" + "google.golang.org/grpc" +) + +type discoveryFeeClient struct { + resolver *discoveryClientResolver +} + +func (c *discoveryFeeClient) Available() bool { + if c == nil || c.resolver == nil { + return false + } + return c.resolver.FeesAvailable() +} + +func (c *discoveryFeeClient) QuoteFees(ctx context.Context, req *feesv1.QuoteFeesRequest, opts ...grpc.CallOption) (*feesv1.QuoteFeesResponse, error) { + client, err := c.resolver.FeesClient(ctx) + if err != nil { + return nil, err + } + return client.QuoteFees(ctx, req, opts...) +} + +func (c *discoveryFeeClient) PrecomputeFees(ctx context.Context, req *feesv1.PrecomputeFeesRequest, opts ...grpc.CallOption) (*feesv1.PrecomputeFeesResponse, error) { + client, err := c.resolver.FeesClient(ctx) + if err != nil { + return nil, err + } + return client.PrecomputeFees(ctx, req, opts...) +} + +func (c *discoveryFeeClient) ValidateFeeToken(ctx context.Context, req *feesv1.ValidateFeeTokenRequest, opts ...grpc.CallOption) (*feesv1.ValidateFeeTokenResponse, error) { + client, err := c.resolver.FeesClient(ctx) + if err != nil { + return nil, err + } + return client.ValidateFeeToken(ctx, req, opts...) +} + +type discoveryLedgerClient struct { + resolver *discoveryClientResolver +} + +func (c *discoveryLedgerClient) Available() bool { + if c == nil || c.resolver == nil { + return false + } + return c.resolver.LedgerAvailable() +} + +func (c *discoveryLedgerClient) ReadBalance(ctx context.Context, accountID string) (*moneyv1.Money, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.ReadBalance(ctx, accountID) +} + +func (c *discoveryLedgerClient) CreateTransaction(ctx context.Context, tx rail.LedgerTx) (string, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return "", err + } + return client.CreateTransaction(ctx, tx) +} + +func (c *discoveryLedgerClient) TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.TransferInternal(ctx, req) +} + +func (c *discoveryLedgerClient) HoldBalance(ctx context.Context, accountID string, amount string) error { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return err + } + return client.HoldBalance(ctx, accountID, amount) +} + +func (c *discoveryLedgerClient) CreateAccount(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.CreateAccount(ctx, req) +} + +func (c *discoveryLedgerClient) ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.ListAccounts(ctx, req) +} + +func (c *discoveryLedgerClient) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.PostCreditWithCharges(ctx, req) +} + +func (c *discoveryLedgerClient) PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.PostDebitWithCharges(ctx, req) +} + +func (c *discoveryLedgerClient) ApplyFXWithCharges(ctx context.Context, req *ledgerv1.FXRequest) (*ledgerv1.PostResponse, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.ApplyFXWithCharges(ctx, req) +} + +func (c *discoveryLedgerClient) GetBalance(ctx context.Context, req *ledgerv1.GetBalanceRequest) (*ledgerv1.BalanceResponse, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.GetBalance(ctx, req) +} + +func (c *discoveryLedgerClient) GetJournalEntry(ctx context.Context, req *ledgerv1.GetEntryRequest) (*ledgerv1.JournalEntryResponse, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.GetJournalEntry(ctx, req) +} + +func (c *discoveryLedgerClient) GetStatement(ctx context.Context, req *ledgerv1.GetStatementRequest) (*ledgerv1.StatementResponse, error) { + client, err := c.resolver.LedgerClient(ctx) + if err != nil { + return nil, err + } + return client.GetStatement(ctx, req) +} + +func (c *discoveryLedgerClient) Close() error { + if c == nil || c.resolver == nil { + return nil + } + client, err := c.resolver.LedgerClient(context.Background()) + if err != nil { + return nil + } + return client.Close() +} + +type discoveryOracleClient struct { + resolver *discoveryClientResolver +} + +func (c *discoveryOracleClient) Available() bool { + if c == nil || c.resolver == nil { + return false + } + return c.resolver.OracleAvailable() +} + +func (c *discoveryOracleClient) LatestRate(ctx context.Context, req oracleclient.LatestRateParams) (*oracleclient.RateSnapshot, error) { + client, err := c.resolver.OracleClient(ctx) + if err != nil { + return nil, err + } + return client.LatestRate(ctx, req) +} + +func (c *discoveryOracleClient) GetQuote(ctx context.Context, req oracleclient.GetQuoteParams) (*oracleclient.Quote, error) { + client, err := c.resolver.OracleClient(ctx) + if err != nil { + return nil, err + } + return client.GetQuote(ctx, req) +} + +func (c *discoveryOracleClient) Close() error { + if c == nil || c.resolver == nil { + return nil + } + client, err := c.resolver.OracleClient(context.Background()) + if err != nil { + return nil + } + return client.Close() +} + +type discoveryMntxClient struct { + resolver *discoveryClientResolver +} + +func (c *discoveryMntxClient) Available() bool { + if c == nil || c.resolver == nil { + return false + } + return c.resolver.MntxAvailable() +} + +func (c *discoveryMntxClient) CreateCardPayout(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) { + client, err := c.resolver.MntxClient(ctx) + if err != nil { + return nil, err + } + return client.CreateCardPayout(ctx, req) +} + +func (c *discoveryMntxClient) CreateCardTokenPayout(ctx context.Context, req *mntxv1.CardTokenPayoutRequest) (*mntxv1.CardTokenPayoutResponse, error) { + client, err := c.resolver.MntxClient(ctx) + if err != nil { + return nil, err + } + return client.CreateCardTokenPayout(ctx, req) +} + +func (c *discoveryMntxClient) GetCardPayoutStatus(ctx context.Context, req *mntxv1.GetCardPayoutStatusRequest) (*mntxv1.GetCardPayoutStatusResponse, error) { + client, err := c.resolver.MntxClient(ctx) + if err != nil { + return nil, err + } + return client.GetCardPayoutStatus(ctx, req) +} + +func (c *discoveryMntxClient) ListGatewayInstances(ctx context.Context, req *mntxv1.ListGatewayInstancesRequest) (*mntxv1.ListGatewayInstancesResponse, error) { + client, err := c.resolver.MntxClient(ctx) + if err != nil { + return nil, err + } + return client.ListGatewayInstances(ctx, req) +} + +func (c *discoveryMntxClient) Close() error { + if c == nil || c.resolver == nil { + return nil + } + client, err := c.resolver.MntxClient(context.Background()) + if err != nil { + return nil + } + return client.Close() +} + +type discoveryChainClient struct { + resolver *discoveryClientResolver + invokeURI string +} + +func (c *discoveryChainClient) Available() bool { + return c != nil && c.resolver != nil && c.invokeURI != "" +} + +func (c *discoveryChainClient) client(ctx context.Context) (chainclient.Client, error) { + if c == nil || c.resolver == nil { + return nil, merrors.Internal("discovery chain client unavailable") + } + return c.resolver.ChainClient(ctx, c.invokeURI) +} + +func (c *discoveryChainClient) CreateManagedWallet(ctx context.Context, req *chainv1.CreateManagedWalletRequest) (*chainv1.CreateManagedWalletResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.CreateManagedWallet(ctx, req) +} + +func (c *discoveryChainClient) GetManagedWallet(ctx context.Context, req *chainv1.GetManagedWalletRequest) (*chainv1.GetManagedWalletResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.GetManagedWallet(ctx, req) +} + +func (c *discoveryChainClient) ListManagedWallets(ctx context.Context, req *chainv1.ListManagedWalletsRequest) (*chainv1.ListManagedWalletsResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.ListManagedWallets(ctx, req) +} + +func (c *discoveryChainClient) GetWalletBalance(ctx context.Context, req *chainv1.GetWalletBalanceRequest) (*chainv1.GetWalletBalanceResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.GetWalletBalance(ctx, req) +} + +func (c *discoveryChainClient) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.SubmitTransfer(ctx, req) +} + +func (c *discoveryChainClient) GetTransfer(ctx context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.GetTransfer(ctx, req) +} + +func (c *discoveryChainClient) ListTransfers(ctx context.Context, req *chainv1.ListTransfersRequest) (*chainv1.ListTransfersResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.ListTransfers(ctx, req) +} + +func (c *discoveryChainClient) EstimateTransferFee(ctx context.Context, req *chainv1.EstimateTransferFeeRequest) (*chainv1.EstimateTransferFeeResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.EstimateTransferFee(ctx, req) +} + +func (c *discoveryChainClient) ComputeGasTopUp(ctx context.Context, req *chainv1.ComputeGasTopUpRequest) (*chainv1.ComputeGasTopUpResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.ComputeGasTopUp(ctx, req) +} + +func (c *discoveryChainClient) EnsureGasTopUp(ctx context.Context, req *chainv1.EnsureGasTopUpRequest) (*chainv1.EnsureGasTopUpResponse, error) { + client, err := c.client(ctx) + if err != nil { + return nil, err + } + return client.EnsureGasTopUp(ctx, req) +} + +func (c *discoveryChainClient) Close() error { + if c == nil || c.resolver == nil { + return nil + } + client, err := c.resolver.ChainClient(context.Background(), c.invokeURI) + if err != nil { + return nil + } + return client.Close() +} diff --git a/api/payments/orchestrator/internal/server/internal/types.go b/api/payments/orchestrator/internal/server/internal/types.go index 315b39f..de5a155 100644 --- a/api/payments/orchestrator/internal/server/internal/types.go +++ b/api/payments/orchestrator/internal/server/internal/types.go @@ -1,16 +1,11 @@ package serverimp import ( - oracleclient "github.com/tech/sendico/fx/oracle/client" - chainclient "github.com/tech/sendico/gateway/chain/client" - mntxclient "github.com/tech/sendico/gateway/mntx/client" - ledgerclient "github.com/tech/sendico/ledger/client" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator" "github.com/tech/sendico/payments/orchestrator/storage" "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/server/grpcapp" - "google.golang.org/grpc" ) type Imp struct { @@ -23,11 +18,6 @@ type Imp struct { discoveryWatcher *discovery.RegistryWatcher discoveryReg *discovery.Registry discoveryAnnouncer *discovery.Announcer + discoveryClients *discoveryClientResolver service *orchestrator.Service - feesConn *grpc.ClientConn - ledgerClient ledgerclient.Client - gatewayClient chainclient.Client - paymentGatewayClient chainclient.Client - mntxClient mntxclient.Client - oracleClient oracleclient.Client } diff --git a/api/payments/orchestrator/internal/service/orchestrator/card_payout_funding.go b/api/payments/orchestrator/internal/service/orchestrator/card_payout_funding.go index 83abef0..51813e3 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/card_payout_funding.go +++ b/api/payments/orchestrator/internal/service/orchestrator/card_payout_funding.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/shopspring/decimal" + chainclient "github.com/tech/sendico/gateway/chain/client" "github.com/tech/sendico/payments/orchestrator/storage/model" "github.com/tech/sendico/pkg/merrors" paymenttypes "github.com/tech/sendico/pkg/payments/types" @@ -23,11 +24,6 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model if source == nil || strings.TrimSpace(source.ManagedWalletRef) == "" { return merrors.InvalidArgument("card funding: source managed wallet is required") } - if !s.deps.gateway.available() { - s.logger.Warn("card funding aborted: chain gateway unavailable") - return merrors.InvalidArgument("card funding: chain gateway unavailable") - } - route, err := s.cardRoute(defaultCardGateway) if err != nil { return err @@ -67,10 +63,22 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model feeRequired := feeDecimal.IsPositive() feeAmountProto := protoMoney(feeAmount) + network := networkFromEndpoint(intent.Source) + instanceID := strings.TrimSpace(intent.Source.InstanceID) + actions := []model.RailOperation{model.RailOperationSend} + if feeRequired { + actions = append(actions, model.RailOperationFee) + } + chainClient, _, err := s.resolveChainGatewayClient(ctx, network, intentAmount, actions, instanceID, payment.PaymentRef) + if err != nil { + s.logger.Warn("card funding gateway resolution failed", zap.Error(err), zap.String("payment_ref", payment.PaymentRef)) + return err + } + fundingDest := &chainv1.TransferDestination{ Destination: &chainv1.TransferDestination_ExternalAddress{ExternalAddress: fundingAddress}, } - fundingFee, err := s.estimateTransferNetworkFee(ctx, sourceWalletRef, fundingDest, intentAmountProto) + fundingFee, err := s.estimateTransferNetworkFee(ctx, chainClient, sourceWalletRef, fundingDest, intentAmountProto) if err != nil { return err } @@ -83,7 +91,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model feeDest := &chainv1.TransferDestination{ Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: feeWalletRef}, } - feeTransferFee, err = s.estimateTransferNetworkFee(ctx, sourceWalletRef, feeDest, feeAmountProto) + feeTransferFee, err = s.estimateTransferNetworkFee(ctx, chainClient, sourceWalletRef, feeDest, feeAmountProto) if err != nil { return err } @@ -103,7 +111,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model var topUpFee *moneyv1.Money topUpPositive := false if estimatedTotalFee != nil { - computeResp, err := s.deps.gateway.client.ComputeGasTopUp(ctx, &chainv1.ComputeGasTopUpRequest{ + computeResp, err := chainClient.ComputeGasTopUp(ctx, &chainv1.ComputeGasTopUpRequest{ WalletRef: sourceWalletRef, EstimatedTotalFee: estimatedTotalFee, }) @@ -131,7 +139,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model topUpDest := &chainv1.TransferDestination{ Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: sourceWalletRef}, } - topUpFee, err = s.estimateTransferNetworkFee(ctx, feeWalletRef, topUpDest, topUpMoney) + topUpFee, err = s.estimateTransferNetworkFee(ctx, chainClient, feeWalletRef, topUpDest, topUpMoney) if err != nil { return err } @@ -191,7 +199,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model } if topUpMoney != nil && topUpPositive { - ensureResp, gasErr := s.deps.gateway.client.EnsureGasTopUp(ctx, &chainv1.EnsureGasTopUpRequest{ + ensureResp, gasErr := chainClient.EnsureGasTopUp(ctx, &chainv1.EnsureGasTopUpRequest{ IdempotencyKey: payment.IdempotencyKey + ":card:gas", OrganizationRef: payment.OrganizationRef.Hex(), SourceWalletRef: feeWalletRef, @@ -228,7 +236,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model topUpDest := &chainv1.TransferDestination{ Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: sourceWalletRef}, } - topUpFee, err = s.estimateTransferNetworkFee(ctx, feeWalletRef, topUpDest, actual) + topUpFee, err = s.estimateTransferNetworkFee(ctx, chainClient, feeWalletRef, topUpDest, actual) if err != nil { return err } @@ -247,7 +255,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model updateExecutionPlanTotalNetworkFee(plan) } - fundResp, err := s.deps.gateway.client.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{ + fundResp, err := chainClient.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{ IdempotencyKey: payment.IdempotencyKey + ":card:fund", OrganizationRef: payment.OrganizationRef.Hex(), SourceWalletRef: sourceWalletRef, @@ -267,7 +275,7 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model updateExecutionPlanTotalNetworkFee(plan) if feeRequired { - feeResp, err := s.deps.gateway.client.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{ + feeResp, err := chainClient.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{ IdempotencyKey: payment.IdempotencyKey + ":card:fee", OrganizationRef: payment.OrganizationRef.Hex(), SourceWalletRef: sourceWalletRef, @@ -293,8 +301,8 @@ func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model return nil } -func (s *Service) estimateTransferNetworkFee(ctx context.Context, sourceWalletRef string, destination *chainv1.TransferDestination, amount *moneyv1.Money) (*moneyv1.Money, error) { - if !s.deps.gateway.available() { +func (s *Service) estimateTransferNetworkFee(ctx context.Context, client chainclient.Client, sourceWalletRef string, destination *chainv1.TransferDestination, amount *moneyv1.Money) (*moneyv1.Money, error) { + if client == nil { return nil, merrors.InvalidArgument("chain gateway unavailable") } sourceWalletRef = strings.TrimSpace(sourceWalletRef) @@ -305,7 +313,7 @@ func (s *Service) estimateTransferNetworkFee(ctx context.Context, sourceWalletRe return nil, merrors.InvalidArgument("amount is required") } - resp, err := s.deps.gateway.client.EstimateTransferFee(ctx, &chainv1.EstimateTransferFeeRequest{ + resp, err := client.EstimateTransferFee(ctx, &chainv1.EstimateTransferFeeRequest{ SourceWalletRef: sourceWalletRef, Destination: destination, Amount: cloneProtoMoney(amount), diff --git a/api/payments/orchestrator/internal/service/orchestrator/card_payout_test.go b/api/payments/orchestrator/internal/service/orchestrator/card_payout_test.go index 6cb4d2b..3da1ee7 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/card_payout_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/card_payout_test.go @@ -74,7 +74,7 @@ func TestSubmitCardFundingTransfers_PlansTopUpAndFunding(t *testing.T) { svc := &Service{ logger: zap.NewNop(), deps: serviceDependencies{ - gateway: gatewayDependency{client: gateway}, + gateway: gatewayDependency{resolver: staticChainGatewayResolver{client: gateway}}, cardRoutes: map[string]CardGatewayRoute{ defaultCardGateway: { FundingAddress: fundingAddress, @@ -241,7 +241,7 @@ func TestSubmitCardPayout_UsesSettlementAmount(t *testing.T) { svc := &Service{ logger: zap.NewNop(), deps: serviceDependencies{ - gateway: gatewayDependency{client: gateway}, + gateway: gatewayDependency{resolver: staticChainGatewayResolver{client: gateway}}, mntx: mntxDependency{client: mntx}, }, } @@ -326,7 +326,7 @@ func TestSubmitCardFundingTransfers_RequiresFeeWalletRef(t *testing.T) { svc := &Service{ logger: zap.NewNop(), deps: serviceDependencies{ - gateway: gatewayDependency{client: gateway}, + gateway: gatewayDependency{resolver: staticChainGatewayResolver{client: gateway}}, cardRoutes: map[string]CardGatewayRoute{ defaultCardGateway: { FundingAddress: "0xfunding", diff --git a/api/payments/orchestrator/internal/service/orchestrator/convert.go b/api/payments/orchestrator/internal/service/orchestrator/convert.go index 0a45891..b23f500 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/convert.go +++ b/api/payments/orchestrator/internal/service/orchestrator/convert.go @@ -475,6 +475,10 @@ func protoRailOperationFromModel(action model.RailOperation) gatewayv1.RailOpera return gatewayv1.RailOperation_RAIL_OPERATION_OBSERVE_CONFIRM case string(model.RailOperationFXConvert): return gatewayv1.RailOperation_RAIL_OPERATION_FX_CONVERT + case string(model.RailOperationBlock): + return gatewayv1.RailOperation_RAIL_OPERATION_BLOCK + case string(model.RailOperationRelease): + return gatewayv1.RailOperation_RAIL_OPERATION_RELEASE default: return gatewayv1.RailOperation_RAIL_OPERATION_UNSPECIFIED } diff --git a/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry.go b/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry.go index 606ae52..1fe95a2 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry.go +++ b/api/payments/orchestrator/internal/service/orchestrator/discovery_gateway_registry.go @@ -48,6 +48,7 @@ func (r *discoveryGatewayRegistry) List(_ context.Context) ([]*model.GatewayInst InstanceID: entry.InstanceID, Rail: rail, Network: entry.Network, + InvokeURI: strings.TrimSpace(entry.InvokeURI), Currencies: normalizeCurrencies(entry.Currencies), Capabilities: capabilitiesFromOps(entry.Operations), Limits: limitsFromDiscovery(entry.Limits), @@ -92,6 +93,10 @@ func capabilitiesFromOps(ops []string) model.RailCapabilities { cap.CanSendFee = true case "observe.confirm", "observe.confirmation": cap.RequiresObserveConfirm = true + case "block", "funds.block", "balance.block", "ledger.block": + cap.CanBlock = true + case "release", "funds.release", "balance.release", "ledger.release": + cap.CanRelease = true } } return cap diff --git a/api/payments/orchestrator/internal/service/orchestrator/execution_plan.go b/api/payments/orchestrator/internal/service/orchestrator/execution_plan.go index b40976e..2419450 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/execution_plan.go +++ b/api/payments/orchestrator/internal/service/orchestrator/execution_plan.go @@ -60,10 +60,6 @@ func isSourceExecutionStep(step *model.ExecutionStep) bool { return executionStepRole(step) == executionStepRoleSource } -func isConsumerExecutionStep(step *model.ExecutionStep) bool { - return executionStepRole(step) == executionStepRoleConsumer -} - func sourceStepsConfirmed(plan *model.ExecutionPlan) bool { if plan == nil || len(plan.Steps) == 0 { return false diff --git a/api/payments/orchestrator/internal/service/orchestrator/gateway_registry.go b/api/payments/orchestrator/internal/service/orchestrator/gateway_registry.go index 86a0bae..ac0e65f 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/gateway_registry.go +++ b/api/payments/orchestrator/internal/service/orchestrator/gateway_registry.go @@ -7,7 +7,6 @@ import ( "github.com/tech/sendico/payments/orchestrator/storage/model" "github.com/tech/sendico/pkg/mlogger" - gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1" ) type gatewayRegistry struct { @@ -52,108 +51,6 @@ func (r *gatewayRegistry) List(ctx context.Context) ([]*model.GatewayInstanceDes return result, nil } -func modelGatewayFromProto(src *gatewayv1.GatewayInstanceDescriptor) *model.GatewayInstanceDescriptor { - if src == nil { - return nil - } - limits := modelLimitsFromProto(src.GetLimits()) - return &model.GatewayInstanceDescriptor{ - ID: strings.TrimSpace(src.GetId()), - Rail: modelRailFromProto(src.GetRail()), - Network: strings.ToUpper(strings.TrimSpace(src.GetNetwork())), - Currencies: normalizeCurrencies(src.GetCurrencies()), - Capabilities: modelCapabilitiesFromProto(src.GetCapabilities()), - Limits: limits, - Version: strings.TrimSpace(src.GetVersion()), - IsEnabled: src.GetIsEnabled(), - } -} - -func modelRailFromProto(rail gatewayv1.Rail) model.Rail { - switch rail { - case gatewayv1.Rail_RAIL_CRYPTO: - return model.RailCrypto - case gatewayv1.Rail_RAIL_PROVIDER_SETTLEMENT: - return model.RailProviderSettlement - case gatewayv1.Rail_RAIL_LEDGER: - return model.RailLedger - case gatewayv1.Rail_RAIL_CARD_PAYOUT: - return model.RailCardPayout - case gatewayv1.Rail_RAIL_FIAT_ONRAMP: - return model.RailFiatOnRamp - default: - return model.RailUnspecified - } -} - -func modelCapabilitiesFromProto(src *gatewayv1.RailCapabilities) model.RailCapabilities { - if src == nil { - return model.RailCapabilities{} - } - return model.RailCapabilities{ - CanPayIn: src.GetCanPayIn(), - CanPayOut: src.GetCanPayOut(), - CanReadBalance: src.GetCanReadBalance(), - CanSendFee: src.GetCanSendFee(), - RequiresObserveConfirm: src.GetRequiresObserveConfirm(), - } -} - -func modelLimitsFromProto(src *gatewayv1.Limits) model.Limits { - if src == nil { - return model.Limits{} - } - limits := model.Limits{ - MinAmount: strings.TrimSpace(src.GetMinAmount()), - MaxAmount: strings.TrimSpace(src.GetMaxAmount()), - PerTxMaxFee: strings.TrimSpace(src.GetPerTxMaxFee()), - PerTxMinAmount: strings.TrimSpace(src.GetPerTxMinAmount()), - PerTxMaxAmount: strings.TrimSpace(src.GetPerTxMaxAmount()), - } - - if len(src.GetVolumeLimit()) > 0 { - limits.VolumeLimit = map[string]string{} - for key, value := range src.GetVolumeLimit() { - bucket := strings.TrimSpace(key) - amount := strings.TrimSpace(value) - if bucket == "" || amount == "" { - continue - } - limits.VolumeLimit[bucket] = amount - } - } - - if len(src.GetVelocityLimit()) > 0 { - limits.VelocityLimit = map[string]int{} - for key, value := range src.GetVelocityLimit() { - bucket := strings.TrimSpace(key) - if bucket == "" { - continue - } - limits.VelocityLimit[bucket] = int(value) - } - } - - if len(src.GetCurrencyLimits()) > 0 { - limits.CurrencyLimits = map[string]model.LimitsOverride{} - for key, override := range src.GetCurrencyLimits() { - currency := strings.ToUpper(strings.TrimSpace(key)) - if currency == "" || override == nil { - continue - } - limits.CurrencyLimits[currency] = model.LimitsOverride{ - MaxVolume: strings.TrimSpace(override.GetMaxVolume()), - MinAmount: strings.TrimSpace(override.GetMinAmount()), - MaxAmount: strings.TrimSpace(override.GetMaxAmount()), - MaxFee: strings.TrimSpace(override.GetMaxFee()), - MaxOps: int(override.GetMaxOps()), - } - } - } - - return limits -} - func normalizeCurrencies(values []string) []string { if len(values) == 0 { return nil diff --git a/api/payments/orchestrator/internal/service/orchestrator/gateway_resolution.go b/api/payments/orchestrator/internal/service/orchestrator/gateway_resolution.go new file mode 100644 index 0000000..2527a0b --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrator/gateway_resolution.go @@ -0,0 +1,133 @@ +package orchestrator + +import ( + "context" + "sort" + "strings" + + "github.com/shopspring/decimal" + chainclient "github.com/tech/sendico/gateway/chain/client" + "github.com/tech/sendico/payments/orchestrator/storage/model" + "github.com/tech/sendico/pkg/merrors" + paymenttypes "github.com/tech/sendico/pkg/payments/types" + "go.uber.org/zap" +) + +func (s *Service) resolveChainGatewayClient(ctx context.Context, network string, amount *paymenttypes.Money, actions []model.RailOperation, instanceID string, paymentRef string) (chainclient.Client, *model.GatewayInstanceDescriptor, error) { + if s.deps.gatewayRegistry != nil && s.deps.gatewayInvokeResolver != nil { + entry, err := selectGatewayForActions(ctx, s.deps.gatewayRegistry, model.RailCrypto, network, amount, actions, instanceID, sendDirectionForRail(model.RailCrypto)) + if err != nil { + return nil, nil, err + } + invokeURI := strings.TrimSpace(entry.InvokeURI) + if invokeURI == "" { + return nil, nil, merrors.InvalidArgument("chain gateway: invoke uri is required") + } + client, err := s.deps.gatewayInvokeResolver.Resolve(ctx, invokeURI) + if err != nil { + return nil, nil, err + } + if s.logger != nil { + fields := []zap.Field{ + zap.String("gateway_id", entry.ID), + zap.String("instance_id", entry.InstanceID), + zap.String("rail", string(entry.Rail)), + zap.String("network", entry.Network), + zap.String("invoke_uri", invokeURI), + } + if paymentRef != "" { + fields = append(fields, zap.String("payment_ref", paymentRef)) + } + if len(actions) > 0 { + fields = append(fields, zap.Strings("actions", railActionNames(actions))) + } + s.logger.Info("Chain gateway selected", fields...) + } + return client, entry, nil + } + if s.deps.gateway.resolver != nil { + client, err := s.deps.gateway.resolver.Resolve(ctx, network) + if err != nil { + return nil, nil, err + } + return client, nil, nil + } + return nil, nil, merrors.NoData("chain gateway unavailable") +} + +func selectGatewayForActions(ctx context.Context, registry GatewayRegistry, rail model.Rail, network string, amount *paymenttypes.Money, actions []model.RailOperation, instanceID string, dir sendDirection) (*model.GatewayInstanceDescriptor, error) { + if registry == nil { + return nil, merrors.NoData("gateway registry unavailable") + } + all, err := registry.List(ctx) + if err != nil { + return nil, err + } + if len(all) == 0 { + return nil, merrors.NoData("no gateway instances available") + } + if len(actions) == 0 { + actions = []model.RailOperation{model.RailOperationSend} + } + + currency := "" + amt := decimal.Zero + if amount != nil && strings.TrimSpace(amount.GetAmount()) != "" { + amt, err = decimalFromMoney(amount) + if err != nil { + return nil, err + } + currency = strings.ToUpper(strings.TrimSpace(amount.GetCurrency())) + } + network = strings.ToUpper(strings.TrimSpace(network)) + + eligible := make([]*model.GatewayInstanceDescriptor, 0) + for _, entry := range all { + if entry == nil || !entry.IsEnabled { + continue + } + if entry.Rail != rail { + continue + } + if instanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(instanceID)) { + continue + } + ok := true + for _, action := range actions { + if !isGatewayEligible(entry, rail, network, currency, action, dir, amt) { + ok = false + break + } + } + if !ok { + continue + } + eligible = append(eligible, entry) + } + + if len(eligible) == 0 { + return nil, merrors.NoData("no eligible gateway instance found") + } + sort.Slice(eligible, func(i, j int) bool { + return eligible[i].ID < eligible[j].ID + }) + return eligible[0], nil +} + +func railActionNames(actions []model.RailOperation) []string { + if len(actions) == 0 { + return nil + } + names := make([]string, 0, len(actions)) + for _, action := range actions { + name := strings.TrimSpace(string(action)) + if name == "" { + continue + } + names = append(names, name) + } + if len(names) == 0 { + return nil + } + return names +} diff --git a/api/payments/orchestrator/internal/service/orchestrator/handlers_events.go b/api/payments/orchestrator/internal/service/orchestrator/handlers_events.go index acc23da..92617c6 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/handlers_events.go +++ b/api/payments/orchestrator/internal/service/orchestrator/handlers_events.go @@ -11,6 +11,7 @@ import ( "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mservice" chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1" + mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1" "go.uber.org/zap" ) @@ -21,15 +22,17 @@ type paymentEventHandler struct { logger mlogger.Logger submitCardPayout func(ctx context.Context, payment *model.Payment) error resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error + releaseHold func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error } -func newPaymentEventHandler(repo storage.Repository, ensure func(ctx context.Context) error, logger mlogger.Logger, submitCardPayout func(ctx context.Context, payment *model.Payment) error, resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error) *paymentEventHandler { +func newPaymentEventHandler(repo storage.Repository, ensure func(ctx context.Context) error, logger mlogger.Logger, submitCardPayout func(ctx context.Context, payment *model.Payment) error, resumePlan func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error, releaseHold func(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error) *paymentEventHandler { return &paymentEventHandler{ repo: repo, ensureRepo: ensure, logger: logger, submitCardPayout: submitCardPayout, resumePlan: resumePlan, + releaseHold: releaseHold, } } @@ -248,6 +251,21 @@ func (h *paymentEventHandler) processCardPayoutUpdate(ctx context.Context, req * } applyCardPayoutUpdate(payment, payout) + switch payout.GetStatus() { + case mntxv1.PayoutStatus_PAYOUT_STATUS_PROCESSED: + if h.resumePlan != nil && payment.PaymentPlan != nil && len(payment.PaymentPlan.Steps) > 0 { + if err := h.resumePlan(ctx, store, payment); err != nil { + return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err) + } + } + case mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED: + if h.releaseHold != nil && payment.PaymentPlan != nil && len(payment.PaymentPlan.Steps) > 0 { + if err := h.releaseHold(ctx, store, payment); err != nil { + return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err) + } + } + } + if err := store.Update(ctx, payment); err != nil { return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err) } diff --git a/api/payments/orchestrator/internal/service/orchestrator/options.go b/api/payments/orchestrator/internal/service/orchestrator/options.go index 91ef614..3503f10 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/options.go +++ b/api/payments/orchestrator/internal/service/orchestrator/options.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/shopspring/decimal" oracleclient "github.com/tech/sendico/fx/oracle/client" chainclient "github.com/tech/sendico/gateway/chain/client" mntxclient "github.com/tech/sendico/gateway/mntx/client" @@ -14,20 +15,38 @@ import ( clockpkg "github.com/tech/sendico/pkg/clock" "github.com/tech/sendico/pkg/merrors" mb "github.com/tech/sendico/pkg/messaging/broker" + "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/payments/rail" feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1" + "go.uber.org/zap" ) // Option configures service dependencies. type Option func(*Service) +// GatewayInvokeResolver resolves gateway invoke URIs into chain gateway clients. +type GatewayInvokeResolver interface { + Resolve(ctx context.Context, invokeURI string) (chainclient.Client, error) +} + +// ChainGatewayResolver resolves chain gateway clients by network. +type ChainGatewayResolver interface { + Resolve(ctx context.Context, network string) (chainclient.Client, error) +} + type feesDependency struct { client feesv1.FeeEngineClient timeout time.Duration } func (f feesDependency) available() bool { - return f.client != nil + if f.client == nil { + return false + } + if checker, ok := f.client.(interface{ Available() bool }); ok { + return checker.Available() + } + return true } type ledgerDependency struct { @@ -35,28 +54,25 @@ type ledgerDependency struct { internal rail.InternalLedger } -func (l ledgerDependency) available() bool { - return l.client != nil -} - type gatewayDependency struct { - client chainclient.Client + resolver ChainGatewayResolver } func (g gatewayDependency) available() bool { - return g.client != nil + return g.resolver != nil } type railGatewayDependency struct { - byID map[string]rail.RailGateway - byRail map[model.Rail][]rail.RailGateway - registry GatewayRegistry - chainClient chainclient.Client - providerClient chainclient.Client + byID map[string]rail.RailGateway + byRail map[model.Rail][]rail.RailGateway + registry GatewayRegistry + chainResolver GatewayInvokeResolver + providerResolver GatewayInvokeResolver + logger mlogger.Logger } func (g railGatewayDependency) available() bool { - return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && (g.chainClient != nil || g.providerClient != nil)) + return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && (g.chainResolver != nil || g.providerResolver != nil)) } func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) { @@ -64,11 +80,10 @@ func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentS return nil, merrors.InvalidArgument("rail gateway: step is required") } if id := strings.TrimSpace(step.GatewayID); id != "" { - gw, ok := g.byID[id] - if !ok { - return nil, merrors.InvalidArgument("rail gateway: unknown gateway id") + if gw, ok := g.byID[id]; ok { + return gw, nil } - return gw, nil + return g.resolveDynamic(ctx, step) } if len(g.byRail) == 0 { return g.resolveDynamic(ctx, step) @@ -81,13 +96,32 @@ func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentS } func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) { - if g.registry == nil || (g.chainClient == nil && g.providerClient == nil) { - return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail") + if g.registry == nil { + return nil, merrors.InvalidArgument("rail gateway: registry is required") + } + if g.chainResolver == nil && g.providerResolver == nil { + return nil, merrors.InvalidArgument("rail gateway: gateway resolver is required") } items, err := g.registry.List(ctx) if err != nil { return nil, err } + if len(items) == 0 { + return nil, merrors.InvalidArgument("rail gateway: no gateway instances available") + } + + currency := "" + amount := decimal.Zero + if step.Amount != nil && strings.TrimSpace(step.Amount.GetAmount()) != "" { + value, err := decimalFromMoney(step.Amount) + if err != nil { + return nil, err + } + amount = value + currency = strings.ToUpper(strings.TrimSpace(step.Amount.GetCurrency())) + } + + candidates := make([]*model.GatewayInstanceDescriptor, 0) for _, entry := range items { if entry == nil || !entry.IsEnabled { continue @@ -98,31 +132,73 @@ func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.P if step.GatewayID != "" && entry.ID != step.GatewayID { continue } - cfg := chainclient.RailGatewayConfig{ - Rail: string(entry.Rail), - Network: entry.Network, - Capabilities: rail.RailCapabilities{ - CanPayIn: entry.Capabilities.CanPayIn, - CanPayOut: entry.Capabilities.CanPayOut, - CanReadBalance: entry.Capabilities.CanReadBalance, - CanSendFee: entry.Capabilities.CanSendFee, - RequiresObserveConfirm: entry.Capabilities.RequiresObserveConfirm, - }, + if step.InstanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(step.InstanceID)) { + continue } - switch entry.Rail { - case model.RailProviderSettlement: - if g.providerClient == nil { - return nil, merrors.InvalidArgument("rail gateway: missing provider settlement client") + if step.Action != model.RailOperationUnspecified { + if !isGatewayEligible(entry, step.Rail, "", currency, step.Action, sendDirectionForRail(step.Rail), amount) { + continue } - return NewProviderSettlementGateway(g.providerClient, cfg), nil - default: - if g.chainClient == nil { - return nil, merrors.InvalidArgument("rail gateway: missing gateway client") - } - return chainclient.NewRailGateway(g.chainClient, cfg), nil } + candidates = append(candidates, entry) + } + if len(candidates) == 0 { + return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail") + } + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].ID < candidates[j].ID + }) + entry := candidates[0] + invokeURI := strings.TrimSpace(entry.InvokeURI) + if invokeURI == "" { + return nil, merrors.InvalidArgument("rail gateway: invoke uri is required") + } + + cfg := chainclient.RailGatewayConfig{ + Rail: string(entry.Rail), + Network: entry.Network, + Capabilities: rail.RailCapabilities{ + CanPayIn: entry.Capabilities.CanPayIn, + CanPayOut: entry.Capabilities.CanPayOut, + CanReadBalance: entry.Capabilities.CanReadBalance, + CanSendFee: entry.Capabilities.CanSendFee, + RequiresObserveConfirm: entry.Capabilities.RequiresObserveConfirm, + CanBlock: entry.Capabilities.CanBlock, + CanRelease: entry.Capabilities.CanRelease, + }, + } + + if g.logger != nil { + g.logger.Info("Rail gateway resolved", + zap.String("step_id", strings.TrimSpace(step.StepID)), + zap.String("action", string(step.Action)), + zap.String("gateway_id", entry.ID), + zap.String("instance_id", entry.InstanceID), + zap.String("rail", string(entry.Rail)), + zap.String("network", entry.Network), + zap.String("invoke_uri", invokeURI)) + } + + switch entry.Rail { + case model.RailProviderSettlement: + if g.providerResolver == nil { + return nil, merrors.InvalidArgument("rail gateway: provider settlement resolver required") + } + client, err := g.providerResolver.Resolve(ctx, invokeURI) + if err != nil { + return nil, err + } + return NewProviderSettlementGateway(client, cfg), nil + default: + if g.chainResolver == nil { + return nil, merrors.InvalidArgument("rail gateway: chain gateway resolver required") + } + client, err := g.chainResolver.Resolve(ctx, invokeURI) + if err != nil { + return nil, err + } + return chainclient.NewRailGateway(client, cfg), nil } - return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail") } type oracleDependency struct { @@ -130,7 +206,13 @@ type oracleDependency struct { } func (o oracleDependency) available() bool { - return o.client != nil + if o.client == nil { + return false + } + if checker, ok := o.client.(interface{ Available() bool }); ok { + return checker.Available() + } + return true } type mntxDependency struct { @@ -138,23 +220,32 @@ type mntxDependency struct { } func (m mntxDependency) available() bool { - return m.client != nil -} - -type gatewayRegistryDependency struct { - registry GatewayRegistry -} - -func (g gatewayRegistryDependency) available() bool { - return g.registry != nil + if m.client == nil { + return false + } + if checker, ok := m.client.(interface{ Available() bool }); ok { + return checker.Available() + } + return true } type providerGatewayDependency struct { - client chainclient.Client + resolver ChainGatewayResolver } func (p providerGatewayDependency) available() bool { - return p.client != nil + return p.resolver != nil +} + +type staticChainGatewayResolver struct { + client chainclient.Client +} + +func (r staticChainGatewayResolver) Resolve(ctx context.Context, _ string) (chainclient.Client, error) { + if r.client == nil { + return nil, merrors.InvalidArgument("chain gateway client is required") + } + return r.client, nil } // CardGatewayRoute maps a gateway to its funding and fee destinations. @@ -195,14 +286,44 @@ func WithLedgerClient(client ledgerclient.Client) Option { // WithChainGatewayClient wires the chain gateway client. func WithChainGatewayClient(client chainclient.Client) Option { return func(s *Service) { - s.deps.gateway = gatewayDependency{client: client} + s.deps.gateway = gatewayDependency{resolver: staticChainGatewayResolver{client: client}} + } +} + +// WithChainGatewayResolver wires a resolver for chain gateway clients. +func WithChainGatewayResolver(resolver ChainGatewayResolver) Option { + return func(s *Service) { + if resolver != nil { + s.deps.gateway = gatewayDependency{resolver: resolver} + } } } // WithProviderSettlementGatewayClient wires the provider settlement gateway client. func WithProviderSettlementGatewayClient(client chainclient.Client) Option { return func(s *Service) { - s.deps.providerGateway = providerGatewayDependency{client: client} + s.deps.providerGateway = providerGatewayDependency{resolver: staticChainGatewayResolver{client: client}} + } +} + +// WithProviderSettlementGatewayResolver wires a resolver for provider settlement gateway clients. +func WithProviderSettlementGatewayResolver(resolver ChainGatewayResolver) Option { + return func(s *Service) { + if resolver != nil { + s.deps.providerGateway = providerGatewayDependency{resolver: resolver} + } + } +} + +// WithGatewayInvokeResolver wires a resolver for gateway invoke URIs. +func WithGatewayInvokeResolver(resolver GatewayInvokeResolver) Option { + return func(s *Service) { + if resolver == nil { + return + } + s.deps.gatewayInvokeResolver = resolver + s.deps.railGateways.chainResolver = resolver + s.deps.railGateways.providerResolver = resolver } } @@ -212,7 +333,7 @@ func WithRailGateways(gateways map[string]rail.RailGateway) Option { if len(gateways) == 0 { return } - s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gateway.client, s.deps.providerGateway.client) + s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gatewayInvokeResolver, s.deps.gatewayInvokeResolver, s.logger) } } @@ -276,8 +397,9 @@ func WithGatewayRegistry(registry GatewayRegistry) Option { if registry != nil { s.deps.gatewayRegistry = registry s.deps.railGateways.registry = registry - s.deps.railGateways.chainClient = s.deps.gateway.client - s.deps.railGateways.providerClient = s.deps.providerGateway.client + s.deps.railGateways.chainResolver = s.deps.gatewayInvokeResolver + s.deps.railGateways.providerResolver = s.deps.gatewayInvokeResolver + s.deps.railGateways.logger = s.logger.Named("rail_gateways") if s.deps.planBuilder == nil { s.deps.planBuilder = &defaultPlanBuilder{} } @@ -294,13 +416,14 @@ func WithClock(clock clockpkg.Clock) Option { } } -func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainClient chainclient.Client, providerClient chainclient.Client) railGatewayDependency { +func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainResolver GatewayInvokeResolver, providerResolver GatewayInvokeResolver, logger mlogger.Logger) railGatewayDependency { result := railGatewayDependency{ - byID: map[string]rail.RailGateway{}, - byRail: map[model.Rail][]rail.RailGateway{}, - registry: registry, - chainClient: chainClient, - providerClient: providerClient, + byID: map[string]rail.RailGateway{}, + byRail: map[model.Rail][]rail.RailGateway{}, + registry: registry, + chainResolver: chainResolver, + providerResolver: providerResolver, + logger: logger, } if len(gateways) == 0 { return result diff --git a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor.go b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor.go index f363996..276259c 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor.go +++ b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor.go @@ -46,6 +46,10 @@ func (p *paymentExecutor) executePaymentPlan(ctx context.Context, store storage. execStep = &model.ExecutionStep{Code: stepID} execSteps[stepID] = execStep } + if step.Action == model.RailOperationRelease { + setExecutionStepStatus(execStep, executionStepStatusSkipped) + continue + } status := executionStepStatus(execStep) switch status { case executionStepStatusConfirmed, executionStepStatusSkipped: @@ -86,7 +90,11 @@ func (p *paymentExecutor) executePaymentPlan(ctx context.Context, store storage. } if asyncSubmitted && !executionPlanComplete(execPlan) { - payment.State = model.PaymentStateSubmitted + if blockStepConfirmed(plan, execPlan) { + payment.State = model.PaymentStateFundsReserved + } else { + payment.State = model.PaymentStateSubmitted + } return p.persistPayment(ctx, store, payment) } payment.State = model.PaymentStateSettled diff --git a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor_test.go b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor_test.go index ecf226a..2204aa6 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_executor_test.go @@ -12,6 +12,7 @@ import ( "github.com/tech/sendico/pkg/payments/rail" paymenttypes "github.com/tech/sendico/pkg/payments/types" mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" + ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" @@ -64,7 +65,7 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) { deps: serviceDependencies{ railGateways: buildRailGatewayDependency(map[string]rail.RailGateway{ "crypto-default": railGateway, - }, nil, nil, nil), + }, nil, nil, nil, nil), ledger: ledgerDependency{ client: ledgerFake, internal: ledgerFake, @@ -196,3 +197,146 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) { t.Fatalf("expected ledger debit after payout confirmation, debit=%d credit=%d", debitCalls, creditCalls) } } + +func TestExecutePaymentPlan_BlockThenDebitFromHold(t *testing.T) { + ctx := context.Background() + + store := newStubPaymentsStore() + repo := &stubRepository{store: store} + + blockCalls := 0 + var blockReq *ledgerv1.TransferRequest + debitCalls := 0 + var debitTx rail.LedgerTx + ledgerFake := &ledgerclient.Fake{ + TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) { + blockCalls++ + blockReq = req + return &ledgerv1.PostResponse{JournalEntryRef: "hold-1"}, nil + }, + CreateTransactionFn: func(ctx context.Context, tx rail.LedgerTx) (string, error) { + debitCalls++ + debitTx = tx + return "debit-1", nil + }, + } + + mntxFake := &mntxclient.Fake{ + CreateCardPayoutFn: func(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) { + return &mntxv1.CardPayoutResponse{Payout: &mntxv1.CardPayoutState{PayoutId: "payout-1"}}, nil + }, + } + + svc := &Service{ + logger: zap.NewNop(), + storage: repo, + deps: serviceDependencies{ + ledger: ledgerDependency{ + client: ledgerFake, + internal: ledgerFake, + }, + mntx: mntxDependency{client: mntxFake}, + cardRoutes: map[string]CardGatewayRoute{ + defaultCardGateway: { + FundingAddress: "funding-address", + FeeWalletRef: "fee-wallet", + }, + }, + }, + } + + executor := newPaymentExecutor(&svc.deps, svc.logger, svc) + + payment := &model.Payment{ + PaymentRef: "pay-block-1", + IdempotencyKey: "pay-block-1", + OrganizationBoundBase: mo.OrganizationBoundBase{ + OrganizationRef: primitive.NewObjectID(), + }, + Intent: model.PaymentIntent{ + Kind: model.PaymentKindPayout, + Source: model.PaymentEndpoint{ + Type: model.EndpointTypeManagedWallet, + ManagedWallet: &model.ManagedWalletEndpoint{ + ManagedWalletRef: "wallet-src", + }, + }, + Destination: model.PaymentEndpoint{ + Type: model.EndpointTypeCard, + Card: &model.CardEndpoint{ + Pan: "4111111111111111", + Cardholder: "Ada", + CardholderSurname: "Lovelace", + ExpMonth: 1, + ExpYear: 2030, + MaskedPan: "4111", + }, + }, + Attributes: map[string]string{ + "ledger_debit_account_ref": "ledger:debit", + "ledger_block_account_ref": "ledger:block", + }, + Customer: &model.Customer{ + ID: "cust-1", + FirstName: "Ada", + LastName: "Lovelace", + IP: "1.2.3.4", + }, + }, + PaymentPlan: &model.PaymentPlan{ + ID: "pay-block-1", + IdempotencyKey: "pay-block-1", + Steps: []*model.PaymentStep{ + {StepID: "ledger_block", Rail: model.RailLedger, Action: model.RailOperationBlock, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}}, + {StepID: "card_payout", Rail: model.RailCardPayout, Action: model.RailOperationSend, DependsOn: []string{"ledger_block"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}}, + {StepID: "ledger_debit", Rail: model.RailLedger, Action: model.RailOperationDebit, DependsOn: []string{"card_payout"}, CommitPolicy: model.CommitPolicyAfterSuccess, CommitAfter: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}}, + {StepID: "ledger_release", Rail: model.RailLedger, Action: model.RailOperationRelease, DependsOn: []string{"card_payout"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}}, + }, + }, + } + + store.payments[payment.PaymentRef] = payment + + if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil { + t.Fatalf("executePaymentPlan error: %v", err) + } + + if blockCalls != 1 || blockReq == nil { + t.Fatalf("expected ledger block transfer, calls=%d", blockCalls) + } + if blockReq.GetFromLedgerAccountRef() != "ledger:debit" { + t.Fatalf("unexpected block from account: %s", blockReq.GetFromLedgerAccountRef()) + } + if blockReq.GetToLedgerAccountRef() != "ledger:block" { + t.Fatalf("unexpected block to account: %s", blockReq.GetToLedgerAccountRef()) + } + if debitCalls != 0 { + t.Fatalf("expected no debit before payout confirmation, got %d", debitCalls) + } + if payment.State != model.PaymentStateFundsReserved { + t.Fatalf("expected funds reserved state, got %s", payment.State) + } + + steps := executionStepsByCode(payment.ExecutionPlan) + cardStep := steps["card_payout"] + if cardStep == nil { + t.Fatalf("expected card payout step in execution plan") + } + setExecutionStepStatus(cardStep, executionStepStatusConfirmed) + + if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil { + t.Fatalf("executePaymentPlan resume error: %v", err) + } + if debitCalls != 1 { + t.Fatalf("expected ledger debit after payout confirmation, got %d", debitCalls) + } + if debitTx.LedgerAccountRef != "ledger:block" { + t.Fatalf("expected debit from block account, got %s", debitTx.LedgerAccountRef) + } + if debitTx.ContraLedgerAccountRef != "" { + t.Fatalf("expected contra to be cleared after block, got %s", debitTx.ContraLedgerAccountRef) + } + if payment.State != model.PaymentStateSettled { + t.Fatalf("expected settled state, got %s", payment.State) + } +} diff --git a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_helpers.go b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_helpers.go index 0565f4a..9a115b6 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_helpers.go +++ b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_helpers.go @@ -80,6 +80,26 @@ func executionPlanComplete(plan *model.ExecutionPlan) bool { return true } +func blockStepConfirmed(plan *model.PaymentPlan, execPlan *model.ExecutionPlan) bool { + if plan == nil || execPlan == nil || len(plan.Steps) == 0 { + return false + } + execSteps := executionStepsByCode(execPlan) + for idx, step := range plan.Steps { + if step == nil || step.Action != model.RailOperationBlock { + continue + } + execStep := execSteps[planStepID(step, idx)] + if execStep == nil { + continue + } + if executionStepStatus(execStep) == executionStepStatusConfirmed { + return true + } + } + return false +} + func planStepID(step *model.PaymentStep, idx int) string { if step != nil { if val := strings.TrimSpace(step.StepID); val != "" { diff --git a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_ledger.go b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_ledger.go index c32a86a..bc267f5 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_ledger.go +++ b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_ledger.go @@ -11,28 +11,167 @@ import ( ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1" "go.mongodb.org/mongo-driver/bson/primitive" + "go.uber.org/zap" ) func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) { + paymentRef := "" + if payment != nil { + paymentRef = strings.TrimSpace(payment.PaymentRef) + } if p.deps.ledger.internal == nil { + p.logger.Error("Ledger client unavailable", zap.String("action", "debit"), zap.String("payment_ref", paymentRef)) return "", merrors.Internal("ledger_client_unavailable") } tx, err := p.ledgerTxForAction(payment, amount, charges, idempotencyKey, idx, model.RailOperationDebit, quote) if err != nil { + p.logger.Warn("Ledger debit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err)) return "", err } - return p.deps.ledger.internal.CreateTransaction(ctx, tx) + ref, err := p.deps.ledger.internal.CreateTransaction(ctx, tx) + if err != nil { + p.logger.Warn("Ledger debit failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err)) + return "", err + } + return ref, nil } func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int, quote *orchestratorv1.PaymentQuote) (string, error) { + paymentRef := "" + if payment != nil { + paymentRef = strings.TrimSpace(payment.PaymentRef) + } if p.deps.ledger.internal == nil { + p.logger.Error("Ledger client unavailable", zap.String("action", "credit"), zap.String("payment_ref", paymentRef)) return "", merrors.Internal("ledger_client_unavailable") } tx, err := p.ledgerTxForAction(payment, amount, nil, idempotencyKey, idx, model.RailOperationCredit, quote) if err != nil { + p.logger.Warn("Ledger credit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err)) return "", err } - return p.deps.ledger.internal.CreateTransaction(ctx, tx) + ref, err := p.deps.ledger.internal.CreateTransaction(ctx, tx) + if err != nil { + p.logger.Warn("Ledger credit failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err)) + return "", err + } + return ref, nil +} + +func (p *paymentExecutor) postLedgerBlock(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int) (string, error) { + paymentRef := "" + if payment != nil { + paymentRef = strings.TrimSpace(payment.PaymentRef) + } + if p.deps.ledger.internal == nil { + p.logger.Error("Ledger client unavailable", zap.String("action", "block"), zap.String("payment_ref", paymentRef)) + return "", merrors.Internal("ledger_client_unavailable") + } + if payment == nil { + return "", merrors.InvalidArgument("ledger: payment is required") + } + if payment.OrganizationRef == primitive.NilObjectID { + return "", merrors.InvalidArgument("ledger: organization_ref is required") + } + if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" { + return "", merrors.InvalidArgument("ledger: amount is required") + } + sourceAccount, err := ledgerDebitAccountRef(payment) + if err != nil { + return "", err + } + blockAccount, err := ledgerBlockAccount(payment) + if err != nil { + return "", err + } + resp, err := p.deps.ledger.internal.TransferInternal(ctx, &ledgerv1.TransferRequest{ + IdempotencyKey: strings.TrimSpace(idempotencyKey), + OrganizationRef: payment.OrganizationRef.Hex(), + FromLedgerAccountRef: strings.TrimSpace(sourceAccount), + ToLedgerAccountRef: strings.TrimSpace(blockAccount), + Money: cloneProtoMoney(amount), + Description: paymentDescription(payment), + Metadata: cloneMetadata(payment.Metadata), + }) + if err != nil { + p.logger.Warn("Ledger block failed", + zap.String("payment_ref", paymentRef), + zap.Int("step_index", idx), + zap.String("from_account", strings.TrimSpace(sourceAccount)), + zap.String("to_account", strings.TrimSpace(blockAccount)), + zap.String("amount", strings.TrimSpace(amount.GetAmount())), + zap.String("currency", strings.TrimSpace(amount.GetCurrency())), + zap.Error(err)) + return "", err + } + entryRef := strings.TrimSpace(resp.GetJournalEntryRef()) + p.logger.Info("Ledger block posted", + zap.String("payment_ref", paymentRef), + zap.Int("step_index", idx), + zap.String("entry_ref", entryRef), + zap.String("from_account", strings.TrimSpace(sourceAccount)), + zap.String("to_account", strings.TrimSpace(blockAccount)), + zap.String("amount", strings.TrimSpace(amount.GetAmount())), + zap.String("currency", strings.TrimSpace(amount.GetCurrency()))) + return entryRef, nil +} + +func (p *paymentExecutor) postLedgerRelease(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, idempotencyKey string, idx int) (string, error) { + paymentRef := "" + if payment != nil { + paymentRef = strings.TrimSpace(payment.PaymentRef) + } + if p.deps.ledger.internal == nil { + p.logger.Error("Ledger client unavailable", zap.String("action", "release"), zap.String("payment_ref", paymentRef)) + return "", merrors.Internal("ledger_client_unavailable") + } + if payment == nil { + return "", merrors.InvalidArgument("ledger: payment is required") + } + if payment.OrganizationRef == primitive.NilObjectID { + return "", merrors.InvalidArgument("ledger: organization_ref is required") + } + if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" { + return "", merrors.InvalidArgument("ledger: amount is required") + } + sourceAccount, err := ledgerDebitAccountRef(payment) + if err != nil { + return "", err + } + blockAccount, err := ledgerBlockAccount(payment) + if err != nil { + return "", err + } + resp, err := p.deps.ledger.internal.TransferInternal(ctx, &ledgerv1.TransferRequest{ + IdempotencyKey: strings.TrimSpace(idempotencyKey), + OrganizationRef: payment.OrganizationRef.Hex(), + FromLedgerAccountRef: strings.TrimSpace(blockAccount), + ToLedgerAccountRef: strings.TrimSpace(sourceAccount), + Money: cloneProtoMoney(amount), + Description: paymentDescription(payment), + Metadata: cloneMetadata(payment.Metadata), + }) + if err != nil { + p.logger.Warn("Ledger release failed", + zap.String("payment_ref", paymentRef), + zap.Int("step_index", idx), + zap.String("from_account", strings.TrimSpace(blockAccount)), + zap.String("to_account", strings.TrimSpace(sourceAccount)), + zap.String("amount", strings.TrimSpace(amount.GetAmount())), + zap.String("currency", strings.TrimSpace(amount.GetCurrency())), + zap.Error(err)) + return "", err + } + entryRef := strings.TrimSpace(resp.GetJournalEntryRef()) + p.logger.Info("Ledger release posted", + zap.String("payment_ref", paymentRef), + zap.Int("step_index", idx), + zap.String("entry_ref", entryRef), + zap.String("from_account", strings.TrimSpace(blockAccount)), + zap.String("to_account", strings.TrimSpace(sourceAccount)), + zap.String("amount", strings.TrimSpace(amount.GetAmount())), + zap.String("currency", strings.TrimSpace(amount.GetCurrency()))) + return entryRef, nil } func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (rail.LedgerTx, error) { @@ -66,6 +205,12 @@ func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *mone fromRail = model.RailLedger toRail = ledgerStepToRail(payment.PaymentPlan, idx, destRail) accountRef, contraRef, err = ledgerDebitAccount(payment) + if err == nil { + if blockRef := ledgerBlockAccountIfConfirmed(payment); blockRef != "" { + accountRef = blockRef + contraRef = "" + } + } case model.RailOperationCredit: fromRail = ledgerStepFromRail(payment.PaymentPlan, idx, sourceRail) toRail = model.RailLedger @@ -190,6 +335,48 @@ func ledgerDebitAccount(payment *model.Payment) (string, string, error) { return "", "", merrors.InvalidArgument("ledger: source account is required") } +func ledgerDebitAccountRef(payment *model.Payment) (string, error) { + account, _, err := ledgerDebitAccount(payment) + return account, err +} + +func ledgerBlockAccount(payment *model.Payment) (string, error) { + if payment == nil { + return "", merrors.InvalidArgument("ledger: payment is required") + } + intent := payment.Intent + if intent.Source.Ledger != nil { + if ref := strings.TrimSpace(intent.Source.Ledger.ContraLedgerAccountRef); ref != "" { + return ref, nil + } + } + if ref := attributeLookup(intent.Attributes, + "ledger_block_account_ref", + "ledgerBlockAccountRef", + "ledger_hold_account_ref", + "ledgerHoldAccountRef", + "ledger_debit_contra_account_ref", + "ledgerDebitContraAccountRef", + ); ref != "" { + return ref, nil + } + return "", merrors.InvalidArgument("ledger: block account is required") +} + +func ledgerBlockAccountIfConfirmed(payment *model.Payment) string { + if payment == nil { + return "" + } + if !blockStepConfirmed(payment.PaymentPlan, payment.ExecutionPlan) { + return "" + } + ref, err := ledgerBlockAccount(payment) + if err != nil { + return "" + } + return ref +} + func ledgerCreditAccount(payment *model.Payment) (string, string, error) { if payment == nil { return "", "", merrors.InvalidArgument("ledger: payment is required") diff --git a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_release.go b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_release.go new file mode 100644 index 0000000..a023ec2 --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_release.go @@ -0,0 +1,51 @@ +package orchestrator + +import ( + "context" + + "github.com/tech/sendico/payments/orchestrator/storage" + "github.com/tech/sendico/payments/orchestrator/storage/model" + "go.uber.org/zap" +) + +func (p *paymentExecutor) releasePaymentHold(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error { + if store == nil { + return errStorageUnavailable + } + if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 { + return nil + } + execPlan := ensureExecutionPlanForPlan(payment, payment.PaymentPlan) + if execPlan == nil || !blockStepConfirmed(payment.PaymentPlan, execPlan) { + return nil + } + execSteps := executionStepsByCode(execPlan) + execQuote := executionQuote(payment, nil) + + for idx, step := range payment.PaymentPlan.Steps { + if step == nil || step.Action != model.RailOperationRelease { + continue + } + stepID := planStepID(step, idx) + execStep := execSteps[stepID] + if execStep == nil { + execStep = &model.ExecutionStep{Code: stepID} + execSteps[stepID] = execStep + if idx < len(execPlan.Steps) { + execPlan.Steps[idx] = execStep + } + } + status := executionStepStatus(execStep) + if status == executionStepStatusConfirmed { + p.logger.Debug("Payment step already confirmed, skipping", zap.String("step_id", stepID), zap.String("quutation", execQuote.QuoteRef)) + continue + } + if _, err := p.executePlanStep(ctx, payment, step, execStep, execQuote, nil, idx); err != nil { + p.logger.Warn("Failed to execute payment step", zap.Error(err), + zap.String("step_id", stepID), zap.String("quutation", execQuote.QuoteRef)) + return err + } + } + + return p.persistPayment(ctx, store, payment) +} diff --git a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_release_test.go b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_release_test.go new file mode 100644 index 0000000..8d226af --- /dev/null +++ b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_release_test.go @@ -0,0 +1,107 @@ +package orchestrator + +import ( + "context" + "testing" + + ledgerclient "github.com/tech/sendico/ledger/client" + "github.com/tech/sendico/payments/orchestrator/storage/model" + mo "github.com/tech/sendico/pkg/model" + paymenttypes "github.com/tech/sendico/pkg/payments/types" + ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.uber.org/zap" +) + +func TestReleasePaymentHold_TransfersFromHoldAccount(t *testing.T) { + ctx := context.Background() + + store := newStubPaymentsStore() + repo := &stubRepository{store: store} + + var releaseReq *ledgerv1.TransferRequest + ledgerFake := &ledgerclient.Fake{ + TransferInternalFn: func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) { + releaseReq = req + return &ledgerv1.PostResponse{JournalEntryRef: "release-1"}, nil + }, + } + + svc := &Service{ + logger: zap.NewNop(), + storage: repo, + deps: serviceDependencies{ + ledger: ledgerDependency{ + client: ledgerFake, + internal: ledgerFake, + }, + }, + } + + executor := newPaymentExecutor(&svc.deps, svc.logger, svc) + + payment := &model.Payment{ + PaymentRef: "pay-release-1", + IdempotencyKey: "pay-release-1", + OrganizationBoundBase: mo.OrganizationBoundBase{ + OrganizationRef: primitive.NewObjectID(), + }, + Intent: model.PaymentIntent{ + Kind: model.PaymentKindPayout, + Source: model.PaymentEndpoint{ + Type: model.EndpointTypeManagedWallet, + ManagedWallet: &model.ManagedWalletEndpoint{ + ManagedWalletRef: "wallet-src", + }, + }, + Attributes: map[string]string{ + "ledger_debit_account_ref": "ledger:debit", + "ledger_block_account_ref": "ledger:block", + }, + }, + PaymentPlan: &model.PaymentPlan{ + ID: "pay-release-1", + IdempotencyKey: "pay-release-1", + Steps: []*model.PaymentStep{ + {StepID: "ledger_block", Rail: model.RailLedger, Action: model.RailOperationBlock, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}}, + {StepID: "ledger_release", Rail: model.RailLedger, Action: model.RailOperationRelease, DependsOn: []string{"ledger_block"}, Amount: &paymenttypes.Money{Currency: "USD", Amount: "100"}}, + }, + }, + } + + store.payments[payment.PaymentRef] = payment + + execPlan := ensureExecutionPlanForPlan(payment, payment.PaymentPlan) + steps := executionStepsByCode(execPlan) + blockStep := steps["ledger_block"] + if blockStep == nil { + t.Fatalf("expected block step in execution plan") + } + setExecutionStepStatus(blockStep, executionStepStatusConfirmed) + + if err := executor.releasePaymentHold(ctx, store, payment); err != nil { + t.Fatalf("releasePaymentHold error: %v", err) + } + + if releaseReq == nil { + t.Fatalf("expected ledger release transfer") + } + if releaseReq.GetFromLedgerAccountRef() != "ledger:block" { + t.Fatalf("unexpected release from account: %s", releaseReq.GetFromLedgerAccountRef()) + } + if releaseReq.GetToLedgerAccountRef() != "ledger:debit" { + t.Fatalf("unexpected release to account: %s", releaseReq.GetToLedgerAccountRef()) + } + + steps = executionStepsByCode(payment.ExecutionPlan) + releaseStep := steps["ledger_release"] + if releaseStep == nil { + t.Fatalf("expected release step in execution plan") + } + if executionStepStatus(releaseStep) != executionStepStatusConfirmed { + t.Fatalf("expected release step confirmed, got %s", executionStepStatus(releaseStep)) + } + if releaseStep.TransferRef != "release-1" { + t.Fatalf("expected release transfer ref set, got %s", releaseStep.TransferRef) + } +} diff --git a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_steps.go b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_steps.go index ae40ee9..b1edb7e 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/payment_plan_steps.go +++ b/api/payments/orchestrator/internal/service/orchestrator/payment_plan_steps.go @@ -40,6 +40,36 @@ func (p *paymentExecutor) executePlanStep(ctx context.Context, payment *model.Pa ensureExecutionRefs(payment).CreditEntryRef = ref setExecutionStepStatus(execStep, executionStepStatusConfirmed) return false, nil + case model.RailOperationBlock: + if step.Rail != model.RailLedger { + return false, merrors.InvalidArgument("payment plan: block requires ledger rail") + } + amount, err := requireMoney(cloneMoney(step.Amount), "ledger block amount") + if err != nil { + return false, err + } + ref, err := p.postLedgerBlock(ctx, payment, protoMoney(amount), planStepIdempotencyKey(payment, idx, step), idx) + if err != nil { + return false, err + } + execStep.TransferRef = strings.TrimSpace(ref) + setExecutionStepStatus(execStep, executionStepStatusConfirmed) + return false, nil + case model.RailOperationRelease: + if step.Rail != model.RailLedger { + return false, merrors.InvalidArgument("payment plan: release requires ledger rail") + } + amount, err := requireMoney(cloneMoney(step.Amount), "ledger release amount") + if err != nil { + return false, err + } + ref, err := p.postLedgerRelease(ctx, payment, protoMoney(amount), planStepIdempotencyKey(payment, idx, step), idx) + if err != nil { + return false, err + } + execStep.TransferRef = strings.TrimSpace(ref) + setExecutionStepStatus(execStep, executionStepStatusConfirmed) + return false, nil case model.RailOperationFXConvert: if err := p.applyFX(ctx, payment, quote, charges, paymentDescription(payment), cloneMetadata(payment.Metadata), ensureExecutionRefs(payment)); err != nil { return false, err diff --git a/api/payments/orchestrator/internal/service/orchestrator/plan_builder_default_test.go b/api/payments/orchestrator/internal/service/orchestrator/plan_builder_default_test.go index 1639889..5073c67 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/plan_builder_default_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/plan_builder_default_test.go @@ -125,7 +125,7 @@ func TestDefaultPlanBuilder_BuildsPlanFromRoutes_CryptoToCard(t *testing.T) { t.Fatalf("expected 6 steps, got %d", len(plan.Steps)) } - assertPlanStep(t, plan.Steps[0], "crypto_send", model.RailCrypto, model.RailOperationSend, "crypto-tron", "crypto-tron-1", "USDT", "100") + assertPlanStep(t, plan.Steps[0], "crypto_send", model.RailCrypto, model.RailOperationSend, "crypto-tron", "crypto-tron-1", "USDT", "95") assertPlanStep(t, plan.Steps[1], "crypto_fee", model.RailCrypto, model.RailOperationFee, "crypto-tron", "crypto-tron-1", "USDT", "5") assertPlanStep(t, plan.Steps[2], "crypto_observe", model.RailCrypto, model.RailOperationObserveConfirm, "crypto-tron", "crypto-tron-1", "", "") assertPlanStep(t, plan.Steps[3], "ledger_credit", model.RailLedger, model.RailOperationCredit, "", "", "USDT", "95") diff --git a/api/payments/orchestrator/internal/service/orchestrator/plan_builder_gateways.go b/api/payments/orchestrator/internal/service/orchestrator/plan_builder_gateways.go index a126820..45118a0 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/plan_builder_gateways.go +++ b/api/payments/orchestrator/internal/service/orchestrator/plan_builder_gateways.go @@ -160,6 +160,10 @@ func capabilityAllowsAction(cap model.RailCapabilities, action model.RailOperati return cap.CanSendFee case model.RailOperationObserveConfirm: return cap.RequiresObserveConfirm + case model.RailOperationBlock: + return cap.CanBlock + case model.RailOperationRelease: + return cap.CanRelease default: return true } diff --git a/api/payments/orchestrator/internal/service/orchestrator/plan_builder_plans.go b/api/payments/orchestrator/internal/service/orchestrator/plan_builder_plans.go index 96d88dd..5fdc054 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/plan_builder_plans.go +++ b/api/payments/orchestrator/internal/service/orchestrator/plan_builder_plans.go @@ -69,6 +69,16 @@ func resolveSettlementAmount(payment *model.Payment, quote *orchestratorv1.Payme return cloneMoney(fallback) } +func resolveDebitAmount(payment *model.Payment, quote *orchestratorv1.PaymentQuote, fallback *paymenttypes.Money) *paymenttypes.Money { + if quote != nil && quote.GetDebitAmount() != nil { + return moneyFromProto(quote.GetDebitAmount()) + } + if payment != nil && payment.LastQuote != nil { + return cloneMoney(payment.LastQuote.DebitAmount) + } + return cloneMoney(fallback) +} + func resolveFeeAmount(payment *model.Payment, quote *orchestratorv1.PaymentQuote) *paymenttypes.Money { if quote != nil && quote.GetExpectedFeeTotal() != nil { return moneyFromProto(quote.GetExpectedFeeTotal()) diff --git a/api/payments/orchestrator/internal/service/orchestrator/plan_builder_steps.go b/api/payments/orchestrator/internal/service/orchestrator/plan_builder_steps.go index 62ebe57..ff32449 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/plan_builder_steps.go +++ b/api/payments/orchestrator/internal/service/orchestrator/plan_builder_steps.go @@ -15,7 +15,11 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment return nil, merrors.InvalidArgument("plan builder: plan template is required") } - sourceAmount, err := requireMoney(cloneMoney(payment.Intent.Amount), "amount") + intentAmount, err := requireMoney(cloneMoney(payment.Intent.Amount), "amount") + if err != nil { + return nil, err + } + sourceAmount, err := requireMoney(resolveDebitAmount(payment, quote, intentAmount), "debit amount") if err != nil { return nil, err } @@ -81,7 +85,11 @@ func (b *defaultPlanBuilder) buildPlanFromTemplate(ctx context.Context, payment Amount: cloneMoney(amount), } - if action == model.RailOperationSend || action == model.RailOperationFee || action == model.RailOperationObserveConfirm { + needsGateway := action == model.RailOperationSend || action == model.RailOperationFee || action == model.RailOperationObserveConfirm + if (action == model.RailOperationBlock || action == model.RailOperationRelease) && tpl.Rail != model.RailLedger { + needsGateway = true + } + if needsGateway { network := gatewayNetworkForRail(tpl.Rail, sourceRail, destRail, sourceNetwork, destNetwork) instanceID := stepInstanceIDForRail(payment.Intent, tpl.Rail, sourceRail, destRail) checkAmount := amount @@ -129,6 +137,10 @@ func actionForOperation(operation string) (model.RailOperation, error) { return model.RailOperationFee, nil case "send", "payout.card", "payout.crypto", "payout.fiat", "payin.crypto", "payin.fiat", "fund.crypto", "fund.card": return model.RailOperationSend, nil + case "block", "hold", "reserve", "ledger.block", "ledger.hold", "ledger.reserve": + return model.RailOperationBlock, nil + case "release", "unblock", "ledger.release": + return model.RailOperationRelease, nil } switch strings.ToUpper(strings.TrimSpace(operation)) { @@ -144,6 +156,10 @@ func actionForOperation(operation string) (model.RailOperation, error) { return model.RailOperationObserveConfirm, nil case string(model.RailOperationFXConvert): return model.RailOperationFXConvert, nil + case string(model.RailOperationBlock): + return model.RailOperationBlock, nil + case string(model.RailOperationRelease): + return model.RailOperationRelease, nil } return model.RailOperationUnspecified, merrors.InvalidArgument("plan builder: unsupported operation") @@ -164,12 +180,20 @@ func stepAmountForAction(action model.RailOperation, rail, sourceRail, destRail case model.RailOperationSend: switch rail { case sourceRail: + if feeRequired { + return cloneMoney(settlementAmount), nil + } return cloneMoney(sourceAmount), nil case destRail: return cloneMoney(payoutAmount), nil default: return cloneMoney(settlementAmount), nil } + case model.RailOperationBlock, model.RailOperationRelease: + if rail == model.RailLedger { + return cloneMoney(ledgerDebitAmount), nil + } + return cloneMoney(settlementAmount), nil case model.RailOperationFee: if !feeRequired { return nil, nil @@ -197,6 +221,9 @@ func stepInstanceIDForRail(intent model.PaymentIntent, rail, sourceRail, destRai func observeAmountForRail(rail model.Rail, source, settlement, payout *paymenttypes.Money) *paymenttypes.Money { switch rail { case model.RailCrypto, model.RailFiatOnRamp: + if settlement != nil { + return settlement + } if source != nil { return source } diff --git a/api/payments/orchestrator/internal/service/orchestrator/provider_settlement_gateway.go b/api/payments/orchestrator/internal/service/orchestrator/provider_settlement_gateway.go index 40ef9d9..798ed95 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/provider_settlement_gateway.go +++ b/api/payments/orchestrator/internal/service/orchestrator/provider_settlement_gateway.go @@ -123,6 +123,14 @@ func (g *providerSettlementGateway) Observe(ctx context.Context, referenceID str }, nil } +func (g *providerSettlementGateway) Block(ctx context.Context, req rail.BlockRequest) (rail.RailResult, error) { + return rail.RailResult{}, merrors.NotImplemented("provider settlement gateway: block not supported") +} + +func (g *providerSettlementGateway) Release(ctx context.Context, req rail.ReleaseRequest) (rail.RailResult, error) { + return rail.RailResult{}, merrors.NotImplemented("provider settlement gateway: release not supported") +} + func buildProviderSettlementDestination(req rail.TransferRequest) *chainv1.TransferDestination { destRef := strings.TrimSpace(req.ToAccountID) memo := strings.TrimSpace(req.DestinationMemo) diff --git a/api/payments/orchestrator/internal/service/orchestrator/quote_engine.go b/api/payments/orchestrator/internal/service/orchestrator/quote_engine.go index bdaecb2..0f44aef 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/quote_engine.go +++ b/api/payments/orchestrator/internal/service/orchestrator/quote_engine.go @@ -2,6 +2,7 @@ package orchestrator import ( "context" + "errors" "strings" "time" @@ -132,10 +133,6 @@ func (s *Service) quoteFees(ctx context.Context, orgRef string, req *orchestrato } func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1.PaymentIntent) (*chainv1.EstimateTransferFeeResponse, error) { - if !s.deps.gateway.available() { - return nil, nil - } - req := &chainv1.EstimateTransferFeeRequest{ Amount: cloneProtoMoney(intent.GetAmount()), } @@ -160,7 +157,28 @@ func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1 } } - resp, err := s.deps.gateway.client.EstimateTransferFee(ctx, req) + network := "" + if req.Asset != nil { + network = strings.ToUpper(strings.TrimSpace(req.Asset.GetChain())) + } + instanceID := strings.TrimSpace(intent.GetSource().GetInstanceId()) + if instanceID == "" { + instanceID = strings.TrimSpace(intent.GetDestination().GetInstanceId()) + } + client, _, err := s.resolveChainGatewayClient(ctx, network, moneyFromProto(req.Amount), []model.RailOperation{model.RailOperationSend}, instanceID, "") + if err != nil { + if errors.Is(err, merrors.ErrNoData) { + s.logger.Debug("network fee estimation skipped: gateway unavailable", zap.Error(err)) + return nil, nil + } + s.logger.Warn("chain gateway resolution failed", zap.Error(err)) + return nil, err + } + if client == nil { + return nil, nil + } + + resp, err := client.EstimateTransferFee(ctx, req) if err != nil { s.logger.Warn("chain gateway fee estimation failed", zap.Error(err)) return nil, merrors.Internal("chain_gateway_fee_estimation_failed") diff --git a/api/payments/orchestrator/internal/service/orchestrator/rail_gateway_fake_test.go b/api/payments/orchestrator/internal/service/orchestrator/rail_gateway_fake_test.go index 3748f26..930319a 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/rail_gateway_fake_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/rail_gateway_fake_test.go @@ -12,6 +12,8 @@ type fakeRailGateway struct { capabilities rail.RailCapabilities sendFn func(context.Context, rail.TransferRequest) (rail.RailResult, error) observeFn func(context.Context, string) (rail.ObserveResult, error) + blockFn func(context.Context, rail.BlockRequest) (rail.RailResult, error) + releaseFn func(context.Context, rail.ReleaseRequest) (rail.RailResult, error) } func (f *fakeRailGateway) Rail() string { @@ -39,3 +41,17 @@ func (f *fakeRailGateway) Observe(ctx context.Context, referenceID string) (rail } return rail.ObserveResult{ReferenceID: referenceID, Status: rail.TransferStatusPending}, nil } + +func (f *fakeRailGateway) Block(ctx context.Context, req rail.BlockRequest) (rail.RailResult, error) { + if f.blockFn != nil { + return f.blockFn(ctx, req) + } + return rail.RailResult{ReferenceID: req.IdempotencyKey, Status: rail.TransferStatusPending}, nil +} + +func (f *fakeRailGateway) Release(ctx context.Context, req rail.ReleaseRequest) (rail.RailResult, error) { + if f.releaseFn != nil { + return f.releaseFn(ctx, req) + } + return rail.RailResult{ReferenceID: req.ReferenceID, Status: rail.TransferStatusPending}, nil +} diff --git a/api/payments/orchestrator/internal/service/orchestrator/service.go b/api/payments/orchestrator/internal/service/orchestrator/service.go index 93dbb15..0b23d20 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service.go @@ -46,17 +46,18 @@ type Service struct { } type serviceDependencies struct { - fees feesDependency - ledger ledgerDependency - gateway gatewayDependency - railGateways railGatewayDependency - providerGateway providerGatewayDependency - oracle oracleDependency - mntx mntxDependency - gatewayRegistry GatewayRegistry - cardRoutes map[string]CardGatewayRoute - feeLedgerAccounts map[string]string - planBuilder PlanBuilder + fees feesDependency + ledger ledgerDependency + gateway gatewayDependency + railGateways railGatewayDependency + providerGateway providerGatewayDependency + oracle oracleDependency + mntx mntxDependency + gatewayRegistry GatewayRegistry + gatewayInvokeResolver GatewayInvokeResolver + cardRoutes map[string]CardGatewayRoute + feeLedgerAccounts map[string]string + planBuilder PlanBuilder } type handlerSet struct { @@ -92,7 +93,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option) engine := defaultPaymentEngine{svc: svc} svc.h.commands = newPaymentCommandFactory(engine, svc.logger) svc.h.queries = newPaymentQueryHandler(svc.storage, svc.ensureRepository, svc.logger.Named("queries")) - svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger.Named("events"), svc.submitCardPayout, svc.resumePaymentPlan) + svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger.Named("events"), svc.submitCardPayout, svc.resumePaymentPlan, svc.releasePaymentHold) svc.comp.executor = newPaymentExecutor(&svc.deps, svc.logger.Named("payment_executor"), svc) svc.startGatewayConsumers() @@ -107,7 +108,7 @@ func (s *Service) ensureHandlers() { s.h.queries = newPaymentQueryHandler(s.storage, s.ensureRepository, s.logger.Named("queries")) } if s.h.events == nil { - s.h.events = newPaymentEventHandler(s.storage, s.ensureRepository, s.logger.Named("events"), s.submitCardPayout, s.resumePaymentPlan) + s.h.events = newPaymentEventHandler(s.storage, s.ensureRepository, s.logger.Named("events"), s.submitCardPayout, s.resumePaymentPlan, s.releasePaymentHold) } if s.comp.executor == nil { s.comp.executor = newPaymentExecutor(&s.deps, s.logger.Named("payment_executor"), s) @@ -199,3 +200,11 @@ func (s *Service) resumePaymentPlan(ctx context.Context, store storage.PaymentsS s.ensureHandlers() return s.comp.executor.executePaymentPlan(ctx, store, payment, nil) } + +func (s *Service) releasePaymentHold(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error { + if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 { + return nil + } + s.ensureHandlers() + return s.comp.executor.releasePaymentHold(ctx, store, payment) +} diff --git a/api/payments/orchestrator/internal/service/orchestrator/service_test.go b/api/payments/orchestrator/internal/service/orchestrator/service_test.go index a0d98ff..18ec092 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/service_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/service_test.go @@ -125,7 +125,7 @@ func TestExecutePayment_ChainFailure(t *testing.T) { return rail.RailResult{}, errors.New("chain failure") }, }, - }, nil, nil, nil), + }, nil, nil, nil, nil), gatewayRegistry: &stubGatewayRegistry{ items: []*model.GatewayInstanceDescriptor{ { @@ -204,7 +204,7 @@ func TestProcessTransferUpdateHandler_Settled(t *testing.T) { clock: testClock{now: time.Now()}, storage: &stubRepository{store: store}, } - svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil) + svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil, nil) req := &orchestratorv1.ProcessTransferUpdateRequest{ Event: &chainv1.TransferStatusChangedEvent{ @@ -278,7 +278,7 @@ func TestProcessTransferUpdateHandler_CardFundingWaitsForSources(t *testing.T) { setExecutionStepStatus(step, executionStepStatusSubmitted) return nil } - svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, submit, nil) + svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, submit, nil, nil) req := &orchestratorv1.ProcessTransferUpdateRequest{ Event: &chainv1.TransferStatusChangedEvent{ @@ -349,7 +349,7 @@ func TestProcessDepositObservedHandler_MatchesPayment(t *testing.T) { clock: testClock{now: time.Now()}, storage: &stubRepository{store: store}, } - svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil) + svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger, nil, nil, nil) req := &orchestratorv1.ProcessDepositObservedRequest{ Event: &chainv1.WalletDepositObservedEvent{ diff --git a/api/payments/orchestrator/storage/model/payment.go b/api/payments/orchestrator/storage/model/payment.go index 3560006..aca2ff5 100644 --- a/api/payments/orchestrator/storage/model/payment.go +++ b/api/payments/orchestrator/storage/model/payment.go @@ -87,6 +87,8 @@ const ( RailOperationFee RailOperation = "FEE" RailOperationObserveConfirm RailOperation = "OBSERVE_CONFIRM" RailOperationFXConvert RailOperation = "FX_CONVERT" + RailOperationBlock RailOperation = "BLOCK" + RailOperationRelease RailOperation = "RELEASE" ) // RailCapabilities are declared per gateway instance. @@ -96,6 +98,8 @@ type RailCapabilities struct { CanReadBalance bool `bson:"canReadBalance,omitempty" json:"canReadBalance,omitempty"` CanSendFee bool `bson:"canSendFee,omitempty" json:"canSendFee,omitempty"` RequiresObserveConfirm bool `bson:"requiresObserveConfirm,omitempty" json:"requiresObserveConfirm,omitempty"` + CanBlock bool `bson:"canBlock,omitempty" json:"canBlock,omitempty"` + CanRelease bool `bson:"canRelease,omitempty" json:"canRelease,omitempty"` } // LimitsOverride applies per-currency overrides for limits. @@ -125,6 +129,7 @@ type GatewayInstanceDescriptor struct { InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"` Rail Rail `bson:"rail" json:"rail"` Network string `bson:"network,omitempty" json:"network,omitempty"` + InvokeURI string `bson:"invokeUri,omitempty" json:"invokeUri,omitempty"` Currencies []string `bson:"currencies,omitempty" json:"currencies,omitempty"` Capabilities RailCapabilities `bson:"capabilities,omitempty" json:"capabilities,omitempty"` Limits Limits `bson:"limits,omitempty" json:"limits,omitempty"` diff --git a/api/pkg/payments/rail/gateway.go b/api/pkg/payments/rail/gateway.go index 1bf2aee..6676660 100644 --- a/api/pkg/payments/rail/gateway.go +++ b/api/pkg/payments/rail/gateway.go @@ -24,6 +24,8 @@ type RailCapabilities struct { CanReadBalance bool CanSendFee bool RequiresObserveConfirm bool + CanBlock bool + CanRelease bool } // FeeBreakdown provides a gateway-level fee description. @@ -49,6 +51,25 @@ type TransferRequest struct { DestinationMemo string } +// BlockRequest defines the inputs for reserving value through a rail gateway. +type BlockRequest struct { + OrganizationRef string + AccountID string + Currency string + Amount string + IdempotencyKey string + Metadata map[string]string + ClientReference string +} + +// ReleaseRequest defines the inputs for releasing a prior block. +type ReleaseRequest struct { + ReferenceID string + IdempotencyKey string + Metadata map[string]string + ClientReference string +} + // RailResult reports the outcome of a rail gateway operation. type RailResult struct { ReferenceID string @@ -80,4 +101,6 @@ type RailGateway interface { Capabilities() RailCapabilities Send(ctx context.Context, req TransferRequest) (RailResult, error) Observe(ctx context.Context, referenceID string) (ObserveResult, error) + Block(ctx context.Context, req BlockRequest) (RailResult, error) + Release(ctx context.Context, req ReleaseRequest) (RailResult, error) } diff --git a/api/pkg/payments/rail/ledger.go b/api/pkg/payments/rail/ledger.go index 988c5bc..ea3dc9f 100644 --- a/api/pkg/payments/rail/ledger.go +++ b/api/pkg/payments/rail/ledger.go @@ -12,6 +12,7 @@ import ( type InternalLedger interface { ReadBalance(ctx context.Context, accountID string) (*moneyv1.Money, error) CreateTransaction(ctx context.Context, tx LedgerTx) (string, error) + TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) HoldBalance(ctx context.Context, accountID string, amount string) error } diff --git a/api/proto/common/gateway/v1/gateway.proto b/api/proto/common/gateway/v1/gateway.proto index 303e7cc..01568b1 100644 --- a/api/proto/common/gateway/v1/gateway.proto +++ b/api/proto/common/gateway/v1/gateway.proto @@ -45,6 +45,8 @@ enum RailOperation { RAIL_OPERATION_FEE = 4; RAIL_OPERATION_OBSERVE_CONFIRM = 5; RAIL_OPERATION_FX_CONVERT = 6; + RAIL_OPERATION_BLOCK = 7; + RAIL_OPERATION_RELEASE = 8; } // Limits in minor units, e.g. cents @@ -124,6 +126,8 @@ message RailCapabilities { bool can_read_balance = 3; bool can_send_fee = 4; bool requires_observe_confirm = 5; + bool can_block = 6; + bool can_release = 7; } message LimitsOverride { diff --git a/frontend/pshared/pubspec.yaml b/frontend/pshared/pubspec.yaml index 4d5bed2..65f17a1 100644 --- a/frontend/pshared/pubspec.yaml +++ b/frontend/pshared/pubspec.yaml @@ -7,6 +7,7 @@ environment: # Add regular dependencies here. dependencies: + analyzer: 9.0.0 json_annotation: ^4.9.0 http: ^1.1.0 provider: ^6.0.5 diff --git a/frontend/pweb/lib/main.dart b/frontend/pweb/lib/main.dart index 4e9c5c4..cd36f95 100644 --- a/frontend/pweb/lib/main.dart +++ b/frontend/pweb/lib/main.dart @@ -13,6 +13,7 @@ import 'package:pshared/provider/permissions.dart'; import 'package:pshared/provider/account.dart'; import 'package:pshared/provider/organizations.dart'; import 'package:pshared/provider/accounts/employees.dart'; +import 'package:pshared/provider/recipient/pmethods.dart'; import 'package:pshared/provider/recipient/provider.dart'; import 'package:pshared/provider/payment/wallets.dart'; import 'package:pshared/provider/invitations.dart'; diff --git a/frontend/pweb/lib/pages/invitations/widgets/form/fields.dart b/frontend/pweb/lib/pages/invitations/widgets/form/fields.dart index e9116ae..4512932 100644 --- a/frontend/pweb/lib/pages/invitations/widgets/form/fields.dart +++ b/frontend/pweb/lib/pages/invitations/widgets/form/fields.dart @@ -75,7 +75,7 @@ class InvitationFormFields extends StatelessWidget { SizedBox( width: 260, child: DropdownButtonFormField( - value: selectedRoleRef, + initialValue: selectedRoleRef, items: roles.map((role) => DropdownMenuItem( value: role.storable.id, child: Text(role.describable.name), diff --git a/frontend/pweb/pubspec.yaml b/frontend/pweb/pubspec.yaml index 7835131..5683751 100644 --- a/frontend/pweb/pubspec.yaml +++ b/frontend/pweb/pubspec.yaml @@ -28,6 +28,7 @@ environment: # the latest version available on pub.dev. To see which dependencies have newer # versions available, run `flutter pub outdated`. dependencies: + analyzer: 9.0.0 amplitude_flutter: ^4.0.1 flutter: sdk: flutter @@ -140,8 +141,3 @@ flutter_launcher_icons: web: generate: true image_path: "resources/logo.png" - - -# temporary -dependency_overrides: - analyzer: ">=9.0.0 <10.0.0"