fixed mntx discovery

This commit is contained in:
Stephan D
2026-03-03 13:15:42 +01:00
parent f9acb47ad7
commit 83745bcd10
24 changed files with 268 additions and 201 deletions

View File

@@ -3,8 +3,11 @@ package orchestrator
import (
"context"
"github.com/tech/sendico/pkg/discovery"
"net"
"net/url"
"strconv"
"strings"
"sync"
"github.com/shopspring/decimal"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
@@ -20,7 +23,12 @@ import (
)
type gatewayCardPayoutExecutor struct {
mntxClient mntxclient.Client
gatewayRegistry GatewayRegistry
mu sync.Mutex
clients map[string]mntxclient.Client
dialClient func(ctx context.Context, address string) (mntxclient.Client, error)
}
type cardPayoutCustomer struct {
@@ -40,12 +48,20 @@ func (e *gatewayCardPayoutExecutor) ExecuteCardPayout(ctx context.Context, req s
if req.Payment == nil {
return nil, merrors.InvalidArgument("card payout send: payment is required")
}
if e == nil || e.mntxClient == nil {
return nil, merrors.InvalidArgument("card payout send: mntx client is required")
if e == nil || e.gatewayRegistry == nil {
return nil, merrors.InvalidArgument("card payout send: gateway registry is required")
}
if model.ParseRailOperation(string(req.Step.Action)) != discovery.RailOperationSend {
return nil, merrors.InvalidArgument("card payout send: unsupported action")
}
gateway, err := e.resolveGateway(ctx, req.Step)
if err != nil {
return nil, err
}
client, err := e.client(ctx, strings.TrimSpace(gateway.InvokeURI))
if err != nil {
return nil, err
}
card, err := payoutDestinationCard(req.Payment, req.Step.Metadata)
if err != nil {
@@ -68,7 +84,7 @@ func (e *gatewayCardPayoutExecutor) ExecuteCardPayout(ctx context.Context, req s
var responsePayout *mntxv1.CardPayoutState
if token := strings.TrimSpace(card.Token); token != "" {
resp, createErr := e.mntxClient.CreateCardTokenPayout(ctx, &mntxv1.CardTokenPayoutRequest{
resp, createErr := client.CreateCardTokenPayout(ctx, &mntxv1.CardTokenPayoutRequest{
PayoutId: payoutRef,
ProjectId: projectID,
CustomerId: customer.id,
@@ -106,7 +122,7 @@ func (e *gatewayCardPayoutExecutor) ExecuteCardPayout(ctx context.Context, req s
if card.ExpMonth == 0 || card.ExpYear == 0 {
return nil, merrors.InvalidArgument("card payout send: card expiry is required")
}
resp, createErr := e.mntxClient.CreateCardPayout(ctx, &mntxv1.CardPayoutRequest{
resp, createErr := client.CreateCardPayout(ctx, &mntxv1.CardPayoutRequest{
PayoutId: payoutRef,
ProjectId: projectID,
CustomerId: customer.id,
@@ -141,7 +157,12 @@ func (e *gatewayCardPayoutExecutor) ExecuteCardPayout(ctx context.Context, req s
resolvedPayoutRef := firstNonEmpty(strings.TrimSpace(responsePayout.GetPayoutId()), payoutRef)
resolvedOperationRef := firstNonEmpty(strings.TrimSpace(responsePayout.GetOperationRef()), operationRef)
gatewayInstanceID := firstNonEmpty(strings.TrimSpace(req.Step.InstanceID), strings.TrimSpace(req.Step.Gateway))
gatewayInstanceID := firstNonEmpty(
strings.TrimSpace(req.Step.InstanceID),
strings.TrimSpace(gateway.InstanceID),
strings.TrimSpace(req.Step.Gateway),
strings.TrimSpace(gateway.ID),
)
externalRefs, refsErr := cardPayoutExternalRefs(resolvedPayoutRef, resolvedOperationRef, gatewayInstanceID)
if refsErr != nil {
return nil, refsErr
@@ -155,6 +176,122 @@ func (e *gatewayCardPayoutExecutor) ExecuteCardPayout(ctx context.Context, req s
return &sexec.ExecuteOutput{StepExecution: step}, nil
}
func (e *gatewayCardPayoutExecutor) resolveGateway(ctx context.Context, step xplan.Step) (*model.GatewayInstanceDescriptor, error) {
if e.gatewayRegistry == nil {
return nil, merrors.InvalidArgument("card payout send: gateway registry is required")
}
items, err := e.gatewayRegistry.List(ctx)
if err != nil {
return nil, err
}
stepGateway := strings.TrimSpace(step.Gateway)
stepInstance := strings.TrimSpace(step.InstanceID)
var byInstance *model.GatewayInstanceDescriptor
var byGateway *model.GatewayInstanceDescriptor
var single *model.GatewayInstanceDescriptor
cardCount := 0
for i := range items {
item := items[i]
if item == nil || model.ParseRail(string(item.Rail)) != discovery.RailCardPayout || !item.IsEnabled {
continue
}
cardCount++
single = item
if stepInstance != "" && (strings.EqualFold(strings.TrimSpace(item.InstanceID), stepInstance) || strings.EqualFold(strings.TrimSpace(item.ID), stepInstance)) {
byInstance = item
break
}
if stepGateway != "" && (strings.EqualFold(strings.TrimSpace(item.ID), stepGateway) || strings.EqualFold(strings.TrimSpace(item.InstanceID), stepGateway)) {
byGateway = item
}
}
switch {
case byInstance != nil:
if strings.TrimSpace(byInstance.InvokeURI) == "" {
return nil, merrors.InvalidArgument("card payout send: gateway invoke uri is missing")
}
return byInstance, nil
case byGateway != nil:
if strings.TrimSpace(byGateway.InvokeURI) == "" {
return nil, merrors.InvalidArgument("card payout send: gateway invoke uri is missing")
}
return byGateway, nil
case stepGateway == "" && stepInstance == "" && cardCount == 1:
if strings.TrimSpace(single.InvokeURI) == "" {
return nil, merrors.InvalidArgument("card payout send: gateway invoke uri is missing")
}
return single, nil
default:
return nil, merrors.InvalidArgument("card payout send: gateway instance not found")
}
}
func (e *gatewayCardPayoutExecutor) client(ctx context.Context, invokeURI string) (mntxclient.Client, error) {
address, err := parseGatewayAddress(invokeURI)
if err != nil {
return nil, err
}
e.mu.Lock()
defer e.mu.Unlock()
if e.clients == nil {
e.clients = map[string]mntxclient.Client{}
}
if c := e.clients[address]; c != nil {
return c, nil
}
dial := e.dialClient
if dial == nil {
dial = func(ctx context.Context, address string) (mntxclient.Client, error) {
return mntxclient.New(ctx, mntxclient.Config{Address: address})
}
}
client, dialErr := dial(ctx, address)
if dialErr != nil {
return nil, dialErr
}
e.clients[address] = client
return client, nil
}
func parseGatewayAddress(raw string) (string, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return "", merrors.InvalidArgument("card payout send: gateway invoke uri is required")
}
if !strings.Contains(raw, "://") {
if _, _, splitErr := net.SplitHostPort(raw); splitErr != nil {
return "", merrors.InvalidArgument("card payout send: gateway invoke uri must include host:port")
}
return raw, nil
}
parsed, err := url.Parse(raw)
if err != nil || parsed.Scheme == "" {
return "", merrors.InvalidArgument("card payout send: gateway invoke uri must include host:port")
}
switch strings.ToLower(strings.TrimSpace(parsed.Scheme)) {
case "grpc", "grpcs":
address := strings.TrimSpace(parsed.Host)
if _, _, splitErr := net.SplitHostPort(address); splitErr != nil {
return "", merrors.InvalidArgument("card payout send: gateway invoke uri must include host:port")
}
return address, nil
case "dns", "passthrough":
return raw, nil
default:
return "", merrors.InvalidArgument("card payout send: unsupported gateway invoke uri scheme")
}
}
func payoutDestinationCard(payment *agg.Payment, metadata map[string]string) (*model.CardEndpoint, error) {
if card, ok := batchmeta.CardFromMetadata(metadata); ok && card != nil {
return card, nil

View File

@@ -23,17 +23,32 @@ func TestGatewayCardPayoutExecutor_ExecuteCardPayout_SubmitsCardPayout(t *testin
orgID := bson.NewObjectID()
var payoutReq *mntxv1.CardPayoutRequest
var dialAddress string
executor := &gatewayCardPayoutExecutor{
mntxClient: &mntxclient.Fake{
CreateCardPayoutFn: func(_ context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
payoutReq = req
return &mntxv1.CardPayoutResponse{
Payout: &mntxv1.CardPayoutState{
PayoutId: "payout-remote-1",
},
}, nil
gatewayRegistry: &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: paymenttypes.DefaultCardsGatewayID,
InstanceID: paymenttypes.DefaultCardsGatewayID,
Rail: discovery.RailCardPayout,
InvokeURI: "grpc://mntx-gateway:50051",
IsEnabled: true,
},
},
},
dialClient: func(_ context.Context, address string) (mntxclient.Client, error) {
dialAddress = address
return &mntxclient.Fake{
CreateCardPayoutFn: func(_ context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
payoutReq = req
return &mntxv1.CardPayoutResponse{
Payout: &mntxv1.CardPayoutState{
PayoutId: "payout-remote-1",
},
}, nil
},
}, nil
},
}
req := sexec.StepRequest{
@@ -104,6 +119,9 @@ func TestGatewayCardPayoutExecutor_ExecuteCardPayout_SubmitsCardPayout(t *testin
if payoutReq == nil {
t.Fatal("expected payout request to be submitted")
}
if got, want := dialAddress, "mntx-gateway:50051"; got != want {
t.Fatalf("dial address mismatch: got=%q want=%q", got, want)
}
if got, want := payoutReq.GetPayoutId(), "payment-1"; got != want {
t.Fatalf("payout_id mismatch: got=%q want=%q", got, want)
}
@@ -146,17 +164,32 @@ func TestGatewayCardPayoutExecutor_ExecuteCardPayout_UsesStepMetadataOverrides(t
orgID := bson.NewObjectID()
var payoutReq *mntxv1.CardPayoutRequest
var dialAddress string
executor := &gatewayCardPayoutExecutor{
mntxClient: &mntxclient.Fake{
CreateCardPayoutFn: func(_ context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
payoutReq = req
return &mntxv1.CardPayoutResponse{
Payout: &mntxv1.CardPayoutState{
PayoutId: "payout-remote-2",
},
}, nil
gatewayRegistry: &fakeGatewayRegistry{
items: []*model.GatewayInstanceDescriptor{
{
ID: paymenttypes.DefaultCardsGatewayID,
InstanceID: paymenttypes.DefaultCardsGatewayID,
Rail: discovery.RailCardPayout,
InvokeURI: "grpc://mntx-gateway:50051",
IsEnabled: true,
},
},
},
dialClient: func(_ context.Context, address string) (mntxclient.Client, error) {
dialAddress = address
return &mntxclient.Fake{
CreateCardPayoutFn: func(_ context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
payoutReq = req
return &mntxv1.CardPayoutResponse{
Payout: &mntxv1.CardPayoutState{
PayoutId: "payout-remote-2",
},
}, nil
},
}, nil
},
}
req := sexec.StepRequest{
@@ -221,6 +254,9 @@ func TestGatewayCardPayoutExecutor_ExecuteCardPayout_UsesStepMetadataOverrides(t
if payoutReq == nil {
t.Fatal("expected payout request to be submitted")
}
if got, want := dialAddress, "mntx-gateway:50051"; got != want {
t.Fatalf("dial address mismatch: got=%q want=%q", got, want)
}
if got, want := payoutReq.GetAmountMinor(), int64(15000); got != want {
t.Fatalf("amount_minor mismatch: got=%d want=%d", got, want)
}
@@ -235,7 +271,7 @@ func TestGatewayCardPayoutExecutor_ExecuteCardPayout_UsesStepMetadataOverrides(t
}
}
func TestGatewayCardPayoutExecutor_ExecuteCardPayout_RequiresMntxClient(t *testing.T) {
func TestGatewayCardPayoutExecutor_ExecuteCardPayout_RequiresGatewayRegistry(t *testing.T) {
orgID := bson.NewObjectID()
executor := &gatewayCardPayoutExecutor{}
@@ -270,7 +306,7 @@ func TestGatewayCardPayoutExecutor_ExecuteCardPayout_RequiresMntxClient(t *testi
if err == nil {
t.Fatal("expected error")
}
if !strings.Contains(err.Error(), "mntx client is required") {
if !strings.Contains(err.Error(), "gateway registry is required") {
t.Fatalf("unexpected error: %v", err)
}
}

View File

@@ -6,7 +6,6 @@ import (
"time"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/storage/model"
clockpkg "github.com/tech/sendico/pkg/clock"
@@ -54,16 +53,6 @@ func WithLedgerClient(client ledgerclient.Client) Option {
}
}
// WithMntxGateway configures card payout execution for card-bound steps.
func WithMntxGateway(client mntxclient.Client) Option {
return func(s *Service) {
if s == nil {
return
}
s.mntxClient = client
}
}
// WithPaymentGatewayBroker wires broker subscription for payment gateway execution events.
func WithPaymentGatewayBroker(broker mb.Broker) Option {
return func(s *Service) {

View File

@@ -3,7 +3,6 @@ package orchestrator
import (
"context"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc"
@@ -26,7 +25,6 @@ type Service struct {
producer msg.Producer
ledgerClient ledgerclient.Client
mntxClient mntxclient.Client
gatewayInvokeResolver GatewayInvokeResolver
gatewayRegistry GatewayRegistry
cardGatewayRoutes map[string]CardGatewayRoute
@@ -56,7 +54,6 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
var err error
svc.v2, svc.paymentRepo, err = newOrchestrationV2Service(svc.logger, repo, v2RuntimeDeps{
LedgerClient: svc.ledgerClient,
MntxClient: svc.mntxClient,
GatewayInvokeResolver: svc.gatewayInvokeResolver,
GatewayRegistry: svc.gatewayRegistry,
CardGatewayRoutes: svc.cardGatewayRoutes,

View File

@@ -3,7 +3,6 @@ package orchestrator
import (
"context"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/oobs"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/pquery"
@@ -27,7 +26,6 @@ type v2MongoDBProvider interface {
type v2RuntimeDeps struct {
LedgerClient ledgerclient.Client
MntxClient mntxclient.Client
GatewayInvokeResolver GatewayInvokeResolver
GatewayRegistry GatewayRegistry
CardGatewayRoutes map[string]CardGatewayRoute
@@ -114,9 +112,9 @@ func buildOrchestrationV2Executors(logger mlogger.Logger, runtimeDeps v2RuntimeD
ledgerClient: runtimeDeps.LedgerClient,
}
var cardPayoutExecutor sexec.CardPayoutExecutor
if runtimeDeps.MntxClient != nil {
if runtimeDeps.GatewayRegistry != nil {
cardPayoutExecutor = &gatewayCardPayoutExecutor{
mntxClient: runtimeDeps.MntxClient,
gatewayRegistry: runtimeDeps.GatewayRegistry,
}
}
return psvc.NewDefaultExecutors(execLogger, sexec.Dependencies{