unified gateway interfaces
This commit is contained in:
@@ -51,6 +51,12 @@ gateway:
|
||||
call_timeout_seconds: 3
|
||||
insecure: true
|
||||
|
||||
payment_gateway:
|
||||
address: "sendico_tgsettle_gateway:50080"
|
||||
dial_timeout_seconds: 5
|
||||
call_timeout_seconds: 3
|
||||
insecure: true
|
||||
|
||||
mntx:
|
||||
address: "sendico_mntx_gateway:50075"
|
||||
dial_timeout_seconds: 5
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"strings"
|
||||
|
||||
chainclient "github.com/tech/sendico/gateway/chain/client"
|
||||
mntxclient "github.com/tech/sendico/gateway/mntx/client"
|
||||
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
|
||||
"github.com/tech/sendico/payments/orchestrator/storage/model"
|
||||
"github.com/tech/sendico/pkg/discovery"
|
||||
@@ -48,15 +47,15 @@ func buildFeeLedgerAccounts(src map[string]string) map[string]string {
|
||||
return result
|
||||
}
|
||||
|
||||
func buildGatewayRegistry(logger mlogger.Logger, mntxClient mntxclient.Client, src []gatewayInstanceConfig, registry *discovery.Registry) orchestrator.GatewayRegistry {
|
||||
func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, registry *discovery.Registry) orchestrator.GatewayRegistry {
|
||||
static := buildGatewayInstances(logger, src)
|
||||
staticRegistry := orchestrator.NewGatewayRegistry(logger, mntxClient, static)
|
||||
staticRegistry := orchestrator.NewGatewayRegistry(logger, static)
|
||||
discoveryRegistry := orchestrator.NewDiscoveryGatewayRegistry(logger, registry)
|
||||
return orchestrator.NewCompositeGatewayRegistry(logger, staticRegistry, discoveryRegistry)
|
||||
}
|
||||
|
||||
func buildRailGateways(chainClient chainclient.Client, src []gatewayInstanceConfig) map[string]rail.RailGateway {
|
||||
if chainClient == nil || len(src) == 0 {
|
||||
func buildRailGateways(chainClient chainclient.Client, paymentGatewayClient chainclient.Client, src []gatewayInstanceConfig) map[string]rail.RailGateway {
|
||||
if len(src) == 0 || (chainClient == nil && paymentGatewayClient == nil) {
|
||||
return nil
|
||||
}
|
||||
instances := buildGatewayInstances(nil, src)
|
||||
@@ -68,9 +67,6 @@ func buildRailGateways(chainClient chainclient.Client, src []gatewayInstanceConf
|
||||
if inst == nil || !inst.IsEnabled {
|
||||
continue
|
||||
}
|
||||
if inst.Rail != model.RailCrypto {
|
||||
continue
|
||||
}
|
||||
cfg := chainclient.RailGatewayConfig{
|
||||
Rail: string(inst.Rail),
|
||||
Network: inst.Network,
|
||||
@@ -82,7 +78,18 @@ func buildRailGateways(chainClient chainclient.Client, src []gatewayInstanceConf
|
||||
RequiresObserveConfirm: inst.Capabilities.RequiresObserveConfirm,
|
||||
},
|
||||
}
|
||||
result[inst.ID] = chainclient.NewRailGateway(chainClient, cfg)
|
||||
switch inst.Rail {
|
||||
case model.RailCrypto:
|
||||
if chainClient == nil {
|
||||
continue
|
||||
}
|
||||
result[inst.ID] = chainclient.NewRailGateway(chainClient, cfg)
|
||||
case model.RailProviderSettlement:
|
||||
if paymentGatewayClient == nil {
|
||||
continue
|
||||
}
|
||||
result[inst.ID] = orchestrator.NewProviderSettlementGateway(paymentGatewayClient, cfg)
|
||||
}
|
||||
}
|
||||
if len(result) == 0 {
|
||||
return nil
|
||||
|
||||
@@ -85,6 +85,29 @@ func (i *Imp) initGatewayClient(cfg clientConfig) chainclient.Client {
|
||||
return client
|
||||
}
|
||||
|
||||
func (i *Imp) initPaymentGatewayClient(cfg clientConfig) chainclient.Client {
|
||||
addr := cfg.address()
|
||||
if addr == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
|
||||
defer cancel()
|
||||
|
||||
client, err := chainclient.New(ctx, chainclient.Config{
|
||||
Address: addr,
|
||||
DialTimeout: cfg.dialTimeout(),
|
||||
CallTimeout: cfg.callTimeout(),
|
||||
Insecure: cfg.InsecureTransport,
|
||||
})
|
||||
if err != nil {
|
||||
i.logger.Warn("failed to connect to payment gateway service", zap.String("address", addr), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
i.logger.Info("connected to payment gateway service", zap.String("address", addr))
|
||||
return client
|
||||
}
|
||||
|
||||
func (i *Imp) initMntxClient(cfg clientConfig) mntxclient.Client {
|
||||
addr := cfg.address()
|
||||
if addr == "" {
|
||||
@@ -138,6 +161,9 @@ func (i *Imp) closeClients() {
|
||||
if i.gatewayClient != nil {
|
||||
_ = i.gatewayClient.Close()
|
||||
}
|
||||
if i.paymentGatewayClient != nil {
|
||||
_ = i.paymentGatewayClient.Close()
|
||||
}
|
||||
if i.mntxClient != nil {
|
||||
_ = i.mntxClient.Close()
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ type config struct {
|
||||
Fees clientConfig `yaml:"fees"`
|
||||
Ledger clientConfig `yaml:"ledger"`
|
||||
Gateway clientConfig `yaml:"gateway"`
|
||||
PaymentGateway clientConfig `yaml:"payment_gateway"`
|
||||
Mntx clientConfig `yaml:"mntx"`
|
||||
Oracle clientConfig `yaml:"oracle"`
|
||||
CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"`
|
||||
|
||||
@@ -10,11 +10,12 @@ import (
|
||||
)
|
||||
|
||||
type orchestratorDeps struct {
|
||||
feesClient feesv1.FeeEngineClient
|
||||
ledgerClient ledgerclient.Client
|
||||
gatewayClient chainclient.Client
|
||||
mntxClient mntxclient.Client
|
||||
oracleClient oracleclient.Client
|
||||
feesClient feesv1.FeeEngineClient
|
||||
ledgerClient ledgerclient.Client
|
||||
gatewayClient chainclient.Client
|
||||
paymentGatewayClient chainclient.Client
|
||||
mntxClient mntxclient.Client
|
||||
oracleClient oracleclient.Client
|
||||
}
|
||||
|
||||
func (i *Imp) initDependencies(cfg *config) *orchestratorDeps {
|
||||
@@ -35,6 +36,11 @@ func (i *Imp) initDependencies(cfg *config) *orchestratorDeps {
|
||||
i.gatewayClient = deps.gatewayClient
|
||||
}
|
||||
|
||||
deps.paymentGatewayClient = i.initPaymentGatewayClient(cfg.PaymentGateway)
|
||||
if deps.paymentGatewayClient != nil {
|
||||
i.paymentGatewayClient = deps.paymentGatewayClient
|
||||
}
|
||||
|
||||
deps.mntxClient = i.initMntxClient(cfg.Mntx)
|
||||
if deps.mntxClient != nil {
|
||||
i.mntxClient = deps.mntxClient
|
||||
@@ -62,7 +68,10 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest
|
||||
if deps.gatewayClient != nil {
|
||||
opts = append(opts, orchestrator.WithChainGatewayClient(deps.gatewayClient))
|
||||
}
|
||||
if railGateways := buildRailGateways(deps.gatewayClient, cfg.GatewayInstances); len(railGateways) > 0 {
|
||||
if deps.paymentGatewayClient != nil {
|
||||
opts = append(opts, orchestrator.WithProviderSettlementGatewayClient(deps.paymentGatewayClient))
|
||||
}
|
||||
if railGateways := buildRailGateways(deps.gatewayClient, deps.paymentGatewayClient, cfg.GatewayInstances); len(railGateways) > 0 {
|
||||
opts = append(opts, orchestrator.WithRailGateways(railGateways))
|
||||
}
|
||||
if deps.mntxClient != nil {
|
||||
@@ -77,7 +86,7 @@ func (i *Imp) buildServiceOptions(cfg *config, deps *orchestratorDeps) []orchest
|
||||
if feeAccounts := buildFeeLedgerAccounts(cfg.FeeAccounts); len(feeAccounts) > 0 {
|
||||
opts = append(opts, orchestrator.WithFeeLedgerAccounts(feeAccounts))
|
||||
}
|
||||
if registry := buildGatewayRegistry(i.logger, deps.mntxClient, cfg.GatewayInstances, i.discoveryReg); registry != nil {
|
||||
if registry := buildGatewayRegistry(i.logger, cfg.GatewayInstances, i.discoveryReg); registry != nil {
|
||||
opts = append(opts, orchestrator.WithGatewayRegistry(registry))
|
||||
}
|
||||
return opts
|
||||
|
||||
@@ -18,15 +18,16 @@ type Imp struct {
|
||||
file string
|
||||
debug bool
|
||||
|
||||
config *config
|
||||
app *grpcapp.App[storage.Repository]
|
||||
discoveryWatcher *discovery.RegistryWatcher
|
||||
discoveryReg *discovery.Registry
|
||||
discoveryAnnouncer *discovery.Announcer
|
||||
service *orchestrator.Service
|
||||
feesConn *grpc.ClientConn
|
||||
ledgerClient ledgerclient.Client
|
||||
gatewayClient chainclient.Client
|
||||
mntxClient mntxclient.Client
|
||||
oracleClient oracleclient.Client
|
||||
config *config
|
||||
app *grpcapp.App[storage.Repository]
|
||||
discoveryWatcher *discovery.RegistryWatcher
|
||||
discoveryReg *discovery.Registry
|
||||
discoveryAnnouncer *discovery.Announcer
|
||||
service *orchestrator.Service
|
||||
feesConn *grpc.ClientConn
|
||||
ledgerClient ledgerclient.Client
|
||||
gatewayClient chainclient.Client
|
||||
paymentGatewayClient chainclient.Client
|
||||
mntxClient mntxclient.Client
|
||||
oracleClient oracleclient.Client
|
||||
}
|
||||
|
||||
@@ -5,10 +5,10 @@ import (
|
||||
"strings"
|
||||
|
||||
paymodel "github.com/tech/sendico/payments/orchestrator/storage/model"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
cons "github.com/tech/sendico/pkg/messaging/consumer"
|
||||
paymentgateway "github.com/tech/sendico/pkg/messaging/notifications/paymentgateway"
|
||||
np "github.com/tech/sendico/pkg/messaging/notifications/processor"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/model"
|
||||
"github.com/tech/sendico/pkg/mservice"
|
||||
"go.uber.org/zap"
|
||||
@@ -47,7 +47,8 @@ func (s *Service) onGatewayExecution(ctx context.Context, exec *model.PaymentGat
|
||||
if s.storage == nil || s.storage.Payments() == nil {
|
||||
return errStorageUnavailable
|
||||
}
|
||||
payment, err := s.storage.Payments().GetByPaymentRef(ctx, paymentRef)
|
||||
store := s.storage.Payments()
|
||||
payment, err := store.GetByPaymentRef(ctx, paymentRef)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -66,8 +67,12 @@ func (s *Service) onGatewayExecution(ctx context.Context, exec *model.PaymentGat
|
||||
}
|
||||
payment.Metadata["gateway_confirmation_status"] = string(exec.Status)
|
||||
|
||||
updatedPlan := updateExecutionStepsFromGatewayExecution(payment, exec)
|
||||
switch exec.Status {
|
||||
case model.ConfirmationStatusConfirmed, model.ConfirmationStatusClarified:
|
||||
if payment.PaymentPlan != nil && updatedPlan && payment.ExecutionPlan != nil && !executionPlanComplete(payment.ExecutionPlan) {
|
||||
return s.resumePaymentPlan(ctx, store, payment)
|
||||
}
|
||||
payment.State = paymodel.PaymentStateSettled
|
||||
payment.FailureCode = paymodel.PaymentFailureCodeUnspecified
|
||||
payment.FailureReason = ""
|
||||
@@ -82,13 +87,69 @@ func (s *Service) onGatewayExecution(ctx context.Context, exec *model.PaymentGat
|
||||
default:
|
||||
s.logger.Warn("Unhandled gateway confirmation status", zap.String("status", string(exec.Status)), zap.String("payment_ref", paymentRef))
|
||||
}
|
||||
if err := s.storage.Payments().Update(ctx, payment); err != nil {
|
||||
if err := store.Update(ctx, payment); err != nil {
|
||||
return err
|
||||
}
|
||||
s.logger.Info("Payment gateway execution applied", zap.String("payment_ref", paymentRef), zap.String("status", string(exec.Status)), zap.String("service", string(mservice.PaymentGateway)))
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateExecutionStepsFromGatewayExecution(payment *paymodel.Payment, exec *model.PaymentGatewayExecution) bool {
|
||||
if payment == nil || exec == nil || payment.PaymentPlan == nil {
|
||||
return false
|
||||
}
|
||||
requestID := strings.TrimSpace(exec.RequestID)
|
||||
if requestID == "" {
|
||||
return false
|
||||
}
|
||||
execPlan := ensureExecutionPlanForPlan(payment, payment.PaymentPlan)
|
||||
if execPlan == nil {
|
||||
return false
|
||||
}
|
||||
status := executionStepStatusFromGatewayStatus(exec.Status)
|
||||
if status == "" {
|
||||
return false
|
||||
}
|
||||
updated := false
|
||||
for idx, planStep := range payment.PaymentPlan.Steps {
|
||||
if planStep == nil {
|
||||
continue
|
||||
}
|
||||
if idx >= len(execPlan.Steps) {
|
||||
continue
|
||||
}
|
||||
execStep := execPlan.Steps[idx]
|
||||
if execStep == nil {
|
||||
execStep = &paymodel.ExecutionStep{Code: planStepID(planStep, idx), Description: describePlanStep(planStep)}
|
||||
execPlan.Steps[idx] = execStep
|
||||
}
|
||||
if strings.EqualFold(strings.TrimSpace(execStep.TransferRef), requestID) {
|
||||
setExecutionStepStatus(execStep, status)
|
||||
updated = true
|
||||
continue
|
||||
}
|
||||
if execStep.TransferRef == "" && planStep.Rail == paymodel.RailProviderSettlement {
|
||||
if planStep.Action == paymodel.RailOperationObserveConfirm || planStep.Action == paymodel.RailOperationSend {
|
||||
execStep.TransferRef = requestID
|
||||
setExecutionStepStatus(execStep, status)
|
||||
updated = true
|
||||
}
|
||||
}
|
||||
}
|
||||
return updated
|
||||
}
|
||||
|
||||
func executionStepStatusFromGatewayStatus(status model.ConfirmationStatus) string {
|
||||
switch status {
|
||||
case model.ConfirmationStatusConfirmed, model.ConfirmationStatusClarified:
|
||||
return executionStepStatusConfirmed
|
||||
case model.ConfirmationStatusRejected, model.ConfirmationStatusTimeout:
|
||||
return executionStepStatusFailed
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) Shutdown() {
|
||||
if s == nil {
|
||||
return
|
||||
|
||||
@@ -5,23 +5,19 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
mntxclient "github.com/tech/sendico/gateway/mntx/client"
|
||||
"github.com/tech/sendico/payments/orchestrator/storage/model"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1"
|
||||
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type gatewayRegistry struct {
|
||||
logger mlogger.Logger
|
||||
mntx mntxclient.Client
|
||||
static []*model.GatewayInstanceDescriptor
|
||||
}
|
||||
|
||||
// NewGatewayRegistry aggregates static and remote gateway descriptors.
|
||||
func NewGatewayRegistry(logger mlogger.Logger, mntxClient mntxclient.Client, static []*model.GatewayInstanceDescriptor) GatewayRegistry {
|
||||
if mntxClient == nil && len(static) == 0 {
|
||||
// NewGatewayRegistry aggregates static gateway descriptors.
|
||||
func NewGatewayRegistry(logger mlogger.Logger, static []*model.GatewayInstanceDescriptor) GatewayRegistry {
|
||||
if len(static) == 0 {
|
||||
return nil
|
||||
}
|
||||
if logger != nil {
|
||||
@@ -29,7 +25,6 @@ func NewGatewayRegistry(logger mlogger.Logger, mntxClient mntxclient.Client, sta
|
||||
}
|
||||
return &gatewayRegistry{
|
||||
logger: logger,
|
||||
mntx: mntxClient,
|
||||
static: cloneGatewayDescriptors(static),
|
||||
}
|
||||
}
|
||||
@@ -47,27 +42,6 @@ func (r *gatewayRegistry) List(ctx context.Context) ([]*model.GatewayInstanceDes
|
||||
items[id] = cloneGatewayDescriptor(gw)
|
||||
}
|
||||
|
||||
if r.mntx != nil {
|
||||
resp, err := r.mntx.ListGatewayInstances(ctx, &mntxv1.ListGatewayInstancesRequest{})
|
||||
if err != nil {
|
||||
if r.logger != nil {
|
||||
r.logger.Warn("Failed to list Monetix gateway instances", zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
for _, gw := range resp.GetItems() {
|
||||
modelGw := modelGatewayFromProto(gw)
|
||||
if modelGw == nil {
|
||||
continue
|
||||
}
|
||||
id := strings.TrimSpace(modelGw.ID)
|
||||
if id == "" {
|
||||
continue
|
||||
}
|
||||
items[id] = modelGw
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result := make([]*model.GatewayInstanceDescriptor, 0, len(items))
|
||||
for _, gw := range items {
|
||||
result = append(result, gw)
|
||||
|
||||
@@ -48,14 +48,15 @@ func (g gatewayDependency) available() bool {
|
||||
}
|
||||
|
||||
type railGatewayDependency struct {
|
||||
byID map[string]rail.RailGateway
|
||||
byRail map[model.Rail][]rail.RailGateway
|
||||
registry GatewayRegistry
|
||||
chainClient chainclient.Client
|
||||
byID map[string]rail.RailGateway
|
||||
byRail map[model.Rail][]rail.RailGateway
|
||||
registry GatewayRegistry
|
||||
chainClient chainclient.Client
|
||||
providerClient chainclient.Client
|
||||
}
|
||||
|
||||
func (g railGatewayDependency) available() bool {
|
||||
return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && g.chainClient != nil)
|
||||
return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && (g.chainClient != nil || g.providerClient != nil))
|
||||
}
|
||||
|
||||
func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) {
|
||||
@@ -80,7 +81,7 @@ func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentS
|
||||
}
|
||||
|
||||
func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) {
|
||||
if g.registry == nil || g.chainClient == nil {
|
||||
if g.registry == nil || (g.chainClient == nil && g.providerClient == nil) {
|
||||
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail")
|
||||
}
|
||||
items, err := g.registry.List(ctx)
|
||||
@@ -108,7 +109,18 @@ func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.P
|
||||
RequiresObserveConfirm: entry.Capabilities.RequiresObserveConfirm,
|
||||
},
|
||||
}
|
||||
return chainclient.NewRailGateway(g.chainClient, cfg), nil
|
||||
switch entry.Rail {
|
||||
case model.RailProviderSettlement:
|
||||
if g.providerClient == nil {
|
||||
return nil, merrors.InvalidArgument("rail gateway: missing provider settlement client")
|
||||
}
|
||||
return NewProviderSettlementGateway(g.providerClient, cfg), nil
|
||||
default:
|
||||
if g.chainClient == nil {
|
||||
return nil, merrors.InvalidArgument("rail gateway: missing gateway client")
|
||||
}
|
||||
return chainclient.NewRailGateway(g.chainClient, cfg), nil
|
||||
}
|
||||
}
|
||||
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail")
|
||||
}
|
||||
@@ -137,6 +149,14 @@ func (g gatewayRegistryDependency) available() bool {
|
||||
return g.registry != nil
|
||||
}
|
||||
|
||||
type providerGatewayDependency struct {
|
||||
client chainclient.Client
|
||||
}
|
||||
|
||||
func (p providerGatewayDependency) available() bool {
|
||||
return p.client != nil
|
||||
}
|
||||
|
||||
// CardGatewayRoute maps a gateway to its funding and fee destinations.
|
||||
type CardGatewayRoute struct {
|
||||
FundingAddress string
|
||||
@@ -179,13 +199,20 @@ func WithChainGatewayClient(client chainclient.Client) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithProviderSettlementGatewayClient wires the provider settlement gateway client.
|
||||
func WithProviderSettlementGatewayClient(client chainclient.Client) Option {
|
||||
return func(s *Service) {
|
||||
s.deps.providerGateway = providerGatewayDependency{client: client}
|
||||
}
|
||||
}
|
||||
|
||||
// WithRailGateways wires rail gateway adapters by instance ID.
|
||||
func WithRailGateways(gateways map[string]rail.RailGateway) Option {
|
||||
return func(s *Service) {
|
||||
if len(gateways) == 0 {
|
||||
return
|
||||
}
|
||||
s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gateway.client)
|
||||
s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gateway.client, s.deps.providerGateway.client)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,6 +277,7 @@ func WithGatewayRegistry(registry GatewayRegistry) Option {
|
||||
s.deps.gatewayRegistry = registry
|
||||
s.deps.railGateways.registry = registry
|
||||
s.deps.railGateways.chainClient = s.deps.gateway.client
|
||||
s.deps.railGateways.providerClient = s.deps.providerGateway.client
|
||||
if s.deps.planBuilder == nil {
|
||||
s.deps.planBuilder = &defaultPlanBuilder{}
|
||||
}
|
||||
@@ -266,12 +294,13 @@ func WithClock(clock clockpkg.Clock) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainClient chainclient.Client) railGatewayDependency {
|
||||
func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainClient chainclient.Client, providerClient chainclient.Client) railGatewayDependency {
|
||||
result := railGatewayDependency{
|
||||
byID: map[string]rail.RailGateway{},
|
||||
byRail: map[model.Rail][]rail.RailGateway{},
|
||||
registry: registry,
|
||||
chainClient: chainClient,
|
||||
byID: map[string]rail.RailGateway{},
|
||||
byRail: map[model.Rail][]rail.RailGateway{},
|
||||
registry: registry,
|
||||
chainClient: chainClient,
|
||||
providerClient: providerClient,
|
||||
}
|
||||
if len(gateways) == 0 {
|
||||
return result
|
||||
|
||||
@@ -64,7 +64,7 @@ func TestExecutePaymentPlan_SourceBeforeDestination(t *testing.T) {
|
||||
deps: serviceDependencies{
|
||||
railGateways: buildRailGatewayDependency(map[string]rail.RailGateway{
|
||||
"crypto-default": railGateway,
|
||||
}, nil, nil),
|
||||
}, nil, nil, nil),
|
||||
ledger: ledgerDependency{
|
||||
client: ledgerFake,
|
||||
internal: ledgerFake,
|
||||
|
||||
@@ -100,6 +100,33 @@ func (p *paymentExecutor) executeSendStep(ctx context.Context, payment *model.Pa
|
||||
ensureExecutionRefs(payment).CardPayoutRef = ref
|
||||
setExecutionStepStatus(execStep, executionStepStatusSubmitted)
|
||||
return true, nil
|
||||
case model.RailProviderSettlement:
|
||||
amount, err := requireMoney(cloneMoney(step.Amount), "provider settlement amount")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !p.deps.railGateways.available() {
|
||||
return false, merrors.Internal("rail gateway unavailable")
|
||||
}
|
||||
req, err := p.buildProviderSettlementTransferRequest(payment, step, amount, quote, idx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
gw, err := p.deps.railGateways.resolve(ctx, step)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
result, err := gw.Send(ctx, req)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
execStep.TransferRef = strings.TrimSpace(result.ReferenceID)
|
||||
if execStep.TransferRef == "" {
|
||||
execStep.TransferRef = strings.TrimSpace(req.IdempotencyKey)
|
||||
}
|
||||
linkProviderSettlementObservation(payment, execStep.TransferRef)
|
||||
setExecutionStepStatus(execStep, executionStepStatusSubmitted)
|
||||
return true, nil
|
||||
case model.RailFiatOnRamp:
|
||||
return false, merrors.InvalidArgument("payment plan: fiat on-ramp execution not implemented")
|
||||
default:
|
||||
|
||||
@@ -0,0 +1,113 @@
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/tech/sendico/payments/orchestrator/storage/model"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/payments/rail"
|
||||
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
||||
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
providerSettlementMetaPaymentIntentID = "payment_intent_id"
|
||||
providerSettlementMetaQuoteRef = "quote_ref"
|
||||
providerSettlementMetaTargetChatID = "target_chat_id"
|
||||
providerSettlementMetaOutgoingLeg = "outgoing_leg"
|
||||
)
|
||||
|
||||
func (p *paymentExecutor) buildProviderSettlementTransferRequest(payment *model.Payment, step *model.PaymentStep, amount *paymenttypes.Money, quote *orchestratorv1.PaymentQuote, idx int) (rail.TransferRequest, error) {
|
||||
if payment == nil || step == nil {
|
||||
return rail.TransferRequest{}, merrors.InvalidArgument("provider settlement: payment and step are required")
|
||||
}
|
||||
if amount == nil {
|
||||
return rail.TransferRequest{}, merrors.InvalidArgument("provider settlement: amount is required")
|
||||
}
|
||||
requestID := planStepIdempotencyKey(payment, idx, step)
|
||||
if requestID == "" {
|
||||
return rail.TransferRequest{}, merrors.InvalidArgument("provider settlement: idempotency key is required")
|
||||
}
|
||||
paymentRef := strings.TrimSpace(payment.PaymentRef)
|
||||
if paymentRef == "" {
|
||||
return rail.TransferRequest{}, merrors.InvalidArgument("provider settlement: payment_ref is required")
|
||||
}
|
||||
metadata := cloneMetadata(payment.Metadata)
|
||||
if metadata == nil {
|
||||
metadata = map[string]string{}
|
||||
}
|
||||
metadata[providerSettlementMetaPaymentIntentID] = paymentRef
|
||||
if quoteRef := paymentGatewayQuoteRef(payment, quote); quoteRef != "" {
|
||||
metadata[providerSettlementMetaQuoteRef] = quoteRef
|
||||
}
|
||||
if chatID := paymentGatewayTargetChatID(payment); chatID != "" {
|
||||
metadata[providerSettlementMetaTargetChatID] = chatID
|
||||
}
|
||||
if strings.TrimSpace(metadata[providerSettlementMetaOutgoingLeg]) == "" {
|
||||
metadata[providerSettlementMetaOutgoingLeg] = strings.ToLower(strings.TrimSpace(string(step.Rail)))
|
||||
}
|
||||
return rail.TransferRequest{
|
||||
OrganizationRef: payment.OrganizationRef.Hex(),
|
||||
Currency: strings.TrimSpace(amount.GetCurrency()),
|
||||
Amount: strings.TrimSpace(amount.GetAmount()),
|
||||
IdempotencyKey: requestID,
|
||||
Metadata: metadata,
|
||||
ClientReference: paymentRef,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func paymentGatewayQuoteRef(payment *model.Payment, quote *orchestratorv1.PaymentQuote) string {
|
||||
if quote != nil {
|
||||
if ref := strings.TrimSpace(quote.GetQuoteRef()); ref != "" {
|
||||
return ref
|
||||
}
|
||||
}
|
||||
if payment != nil && payment.LastQuote != nil {
|
||||
return strings.TrimSpace(payment.LastQuote.QuoteRef)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func paymentGatewayTargetChatID(payment *model.Payment) string {
|
||||
if payment == nil {
|
||||
return ""
|
||||
}
|
||||
if payment.Intent.Attributes != nil {
|
||||
if chatID := strings.TrimSpace(payment.Intent.Attributes["target_chat_id"]); chatID != "" {
|
||||
return chatID
|
||||
}
|
||||
}
|
||||
if payment.Metadata != nil {
|
||||
return strings.TrimSpace(payment.Metadata["target_chat_id"])
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func linkProviderSettlementObservation(payment *model.Payment, requestID string) {
|
||||
if payment == nil || payment.PaymentPlan == nil || payment.ExecutionPlan == nil {
|
||||
return
|
||||
}
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return
|
||||
}
|
||||
for idx, planStep := range payment.PaymentPlan.Steps {
|
||||
if planStep == nil {
|
||||
continue
|
||||
}
|
||||
if planStep.Rail != model.RailProviderSettlement || planStep.Action != model.RailOperationObserveConfirm {
|
||||
continue
|
||||
}
|
||||
if idx >= len(payment.ExecutionPlan.Steps) {
|
||||
continue
|
||||
}
|
||||
execStep := payment.ExecutionPlan.Steps[idx]
|
||||
if execStep == nil {
|
||||
execStep = &model.ExecutionStep{Code: planStepID(planStep, idx), Description: describePlanStep(planStep)}
|
||||
payment.ExecutionPlan.Steps[idx] = execStep
|
||||
}
|
||||
if execStep.TransferRef == "" {
|
||||
execStep.TransferRef = requestID
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,164 @@
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
chainclient "github.com/tech/sendico/gateway/chain/client"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/payments/rail"
|
||||
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
|
||||
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
|
||||
)
|
||||
|
||||
type providerSettlementGateway struct {
|
||||
client chainclient.Client
|
||||
rail string
|
||||
network string
|
||||
capabilities rail.RailCapabilities
|
||||
}
|
||||
|
||||
func NewProviderSettlementGateway(client chainclient.Client, cfg chainclient.RailGatewayConfig) rail.RailGateway {
|
||||
railName := strings.ToUpper(strings.TrimSpace(cfg.Rail))
|
||||
if railName == "" {
|
||||
railName = "PROVIDER_SETTLEMENT"
|
||||
}
|
||||
return &providerSettlementGateway{
|
||||
client: client,
|
||||
rail: railName,
|
||||
network: strings.ToUpper(strings.TrimSpace(cfg.Network)),
|
||||
capabilities: cfg.Capabilities,
|
||||
}
|
||||
}
|
||||
|
||||
func (g *providerSettlementGateway) Rail() string {
|
||||
return g.rail
|
||||
}
|
||||
|
||||
func (g *providerSettlementGateway) Network() string {
|
||||
return g.network
|
||||
}
|
||||
|
||||
func (g *providerSettlementGateway) Capabilities() rail.RailCapabilities {
|
||||
return g.capabilities
|
||||
}
|
||||
|
||||
func (g *providerSettlementGateway) Send(ctx context.Context, req rail.TransferRequest) (rail.RailResult, error) {
|
||||
if g.client == nil {
|
||||
return rail.RailResult{}, merrors.Internal("provider settlement gateway: client is required")
|
||||
}
|
||||
idempotencyKey := strings.TrimSpace(req.IdempotencyKey)
|
||||
if idempotencyKey == "" {
|
||||
return rail.RailResult{}, merrors.InvalidArgument("provider settlement gateway: idempotency_key is required")
|
||||
}
|
||||
currency := strings.TrimSpace(req.Currency)
|
||||
amount := strings.TrimSpace(req.Amount)
|
||||
if currency == "" || amount == "" {
|
||||
return rail.RailResult{}, merrors.InvalidArgument("provider settlement gateway: amount is required")
|
||||
}
|
||||
metadata := cloneMetadata(req.Metadata)
|
||||
if metadata == nil {
|
||||
metadata = map[string]string{}
|
||||
}
|
||||
if strings.TrimSpace(metadata[providerSettlementMetaPaymentIntentID]) == "" {
|
||||
if ref := strings.TrimSpace(req.ClientReference); ref != "" {
|
||||
metadata[providerSettlementMetaPaymentIntentID] = ref
|
||||
}
|
||||
}
|
||||
if strings.TrimSpace(metadata[providerSettlementMetaPaymentIntentID]) == "" {
|
||||
return rail.RailResult{}, merrors.InvalidArgument("provider settlement gateway: payment_intent_id is required")
|
||||
}
|
||||
if strings.TrimSpace(metadata[providerSettlementMetaOutgoingLeg]) == "" && g.rail != "" {
|
||||
metadata[providerSettlementMetaOutgoingLeg] = strings.ToLower(strings.TrimSpace(g.rail))
|
||||
}
|
||||
submitReq := &chainv1.SubmitTransferRequest{
|
||||
IdempotencyKey: idempotencyKey,
|
||||
OrganizationRef: strings.TrimSpace(req.OrganizationRef),
|
||||
SourceWalletRef: strings.TrimSpace(req.FromAccountID),
|
||||
Amount: &moneyv1.Money{
|
||||
Currency: currency,
|
||||
Amount: amount,
|
||||
},
|
||||
Metadata: metadata,
|
||||
ClientReference: strings.TrimSpace(req.ClientReference),
|
||||
}
|
||||
if dest := buildProviderSettlementDestination(req); dest != nil {
|
||||
submitReq.Destination = dest
|
||||
}
|
||||
resp, err := g.client.SubmitTransfer(ctx, submitReq)
|
||||
if err != nil {
|
||||
return rail.RailResult{}, err
|
||||
}
|
||||
if resp == nil || resp.GetTransfer() == nil {
|
||||
return rail.RailResult{}, merrors.Internal("provider settlement gateway: missing transfer response")
|
||||
}
|
||||
transfer := resp.GetTransfer()
|
||||
return rail.RailResult{
|
||||
ReferenceID: strings.TrimSpace(transfer.GetTransferRef()),
|
||||
Status: providerSettlementStatusFromTransfer(transfer.GetStatus()),
|
||||
FinalAmount: railMoneyFromProto(transfer.GetNetAmount()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (g *providerSettlementGateway) Observe(ctx context.Context, referenceID string) (rail.ObserveResult, error) {
|
||||
if g.client == nil {
|
||||
return rail.ObserveResult{}, merrors.Internal("provider settlement gateway: client is required")
|
||||
}
|
||||
ref := strings.TrimSpace(referenceID)
|
||||
if ref == "" {
|
||||
return rail.ObserveResult{}, merrors.InvalidArgument("provider settlement gateway: reference_id is required")
|
||||
}
|
||||
resp, err := g.client.GetTransfer(ctx, &chainv1.GetTransferRequest{TransferRef: ref})
|
||||
if err != nil {
|
||||
return rail.ObserveResult{}, err
|
||||
}
|
||||
if resp == nil || resp.GetTransfer() == nil {
|
||||
return rail.ObserveResult{}, merrors.Internal("provider settlement gateway: missing transfer response")
|
||||
}
|
||||
transfer := resp.GetTransfer()
|
||||
return rail.ObserveResult{
|
||||
ReferenceID: ref,
|
||||
Status: providerSettlementStatusFromTransfer(transfer.GetStatus()),
|
||||
FinalAmount: railMoneyFromProto(transfer.GetNetAmount()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func buildProviderSettlementDestination(req rail.TransferRequest) *chainv1.TransferDestination {
|
||||
destRef := strings.TrimSpace(req.ToAccountID)
|
||||
memo := strings.TrimSpace(req.DestinationMemo)
|
||||
if destRef == "" && memo == "" {
|
||||
return nil
|
||||
}
|
||||
return &chainv1.TransferDestination{
|
||||
Destination: &chainv1.TransferDestination_ExternalAddress{ExternalAddress: destRef},
|
||||
Memo: memo,
|
||||
}
|
||||
}
|
||||
|
||||
func providerSettlementStatusFromTransfer(status chainv1.TransferStatus) string {
|
||||
switch status {
|
||||
case chainv1.TransferStatus_TRANSFER_CONFIRMED:
|
||||
return rail.TransferStatusSuccess
|
||||
case chainv1.TransferStatus_TRANSFER_FAILED:
|
||||
return rail.TransferStatusFailed
|
||||
case chainv1.TransferStatus_TRANSFER_CANCELLED:
|
||||
return rail.TransferStatusRejected
|
||||
default:
|
||||
return rail.TransferStatusPending
|
||||
}
|
||||
}
|
||||
|
||||
func railMoneyFromProto(src *moneyv1.Money) *rail.Money {
|
||||
if src == nil {
|
||||
return nil
|
||||
}
|
||||
currency := strings.TrimSpace(src.GetCurrency())
|
||||
amount := strings.TrimSpace(src.GetAmount())
|
||||
if currency == "" || amount == "" {
|
||||
return nil
|
||||
}
|
||||
return &rail.Money{
|
||||
Amount: amount,
|
||||
Currency: currency,
|
||||
}
|
||||
}
|
||||
@@ -50,6 +50,7 @@ type serviceDependencies struct {
|
||||
ledger ledgerDependency
|
||||
gateway gatewayDependency
|
||||
railGateways railGatewayDependency
|
||||
providerGateway providerGatewayDependency
|
||||
oracle oracleDependency
|
||||
mntx mntxDependency
|
||||
gatewayRegistry GatewayRegistry
|
||||
|
||||
@@ -125,7 +125,7 @@ func TestExecutePayment_ChainFailure(t *testing.T) {
|
||||
return rail.RailResult{}, errors.New("chain failure")
|
||||
},
|
||||
},
|
||||
}, nil, nil),
|
||||
}, nil, nil, nil),
|
||||
gatewayRegistry: &stubGatewayRegistry{
|
||||
items: []*model.GatewayInstanceDescriptor{
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user