457 lines
13 KiB
Go
457 lines
13 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/shopspring/decimal"
|
|
oracleclient "github.com/tech/sendico/fx/oracle/client"
|
|
chainclient "github.com/tech/sendico/gateway/chain/client"
|
|
mntxclient "github.com/tech/sendico/gateway/mntx/client"
|
|
ledgerclient "github.com/tech/sendico/ledger/client"
|
|
"github.com/tech/sendico/payments/storage/model"
|
|
clockpkg "github.com/tech/sendico/pkg/clock"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
mb "github.com/tech/sendico/pkg/messaging/broker"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/payments/rail"
|
|
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Option configures service dependencies.
|
|
type Option func(*Service)
|
|
|
|
// GatewayInvokeResolver resolves gateway invoke URIs into chain gateway clients.
|
|
type GatewayInvokeResolver interface {
|
|
Resolve(ctx context.Context, invokeURI string) (chainclient.Client, error)
|
|
}
|
|
|
|
// ChainGatewayResolver resolves chain gateway clients by network.
|
|
type ChainGatewayResolver interface {
|
|
Resolve(ctx context.Context, network string) (chainclient.Client, error)
|
|
}
|
|
|
|
type feesDependency struct {
|
|
client feesv1.FeeEngineClient
|
|
timeout time.Duration
|
|
}
|
|
|
|
func (f feesDependency) available() bool {
|
|
if f.client == nil {
|
|
return false
|
|
}
|
|
if checker, ok := f.client.(interface{ Available() bool }); ok {
|
|
return checker.Available()
|
|
}
|
|
return true
|
|
}
|
|
|
|
type ledgerDependency struct {
|
|
client ledgerclient.Client
|
|
internal rail.InternalLedger
|
|
}
|
|
|
|
type gatewayDependency struct {
|
|
resolver ChainGatewayResolver
|
|
}
|
|
|
|
type railGatewayDependency struct {
|
|
byID map[string]rail.RailGateway
|
|
byRail map[model.Rail][]rail.RailGateway
|
|
registry GatewayRegistry
|
|
chainResolver GatewayInvokeResolver
|
|
providerResolver GatewayInvokeResolver
|
|
logger mlogger.Logger
|
|
}
|
|
|
|
func (g railGatewayDependency) available() bool {
|
|
return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && (g.chainResolver != nil || g.providerResolver != nil))
|
|
}
|
|
|
|
func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) {
|
|
if step == nil {
|
|
return nil, merrors.InvalidArgument("rail gateway: step is required")
|
|
}
|
|
if id := strings.TrimSpace(step.GatewayID); id != "" {
|
|
if gw, ok := g.byID[id]; ok {
|
|
return gw, nil
|
|
}
|
|
return g.resolveDynamic(ctx, step)
|
|
}
|
|
if len(g.byRail) == 0 {
|
|
return g.resolveDynamic(ctx, step)
|
|
}
|
|
list := g.byRail[step.Rail]
|
|
if len(list) == 0 {
|
|
return g.resolveDynamic(ctx, step)
|
|
}
|
|
return list[0], nil
|
|
}
|
|
|
|
func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) {
|
|
if g.registry == nil {
|
|
return nil, merrors.InvalidArgument("rail gateway: registry is required")
|
|
}
|
|
if g.chainResolver == nil && g.providerResolver == nil {
|
|
return nil, merrors.InvalidArgument("rail gateway: gateway resolver is required")
|
|
}
|
|
items, err := g.registry.List(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(items) == 0 {
|
|
return nil, merrors.InvalidArgument("rail gateway: no gateway instances available")
|
|
}
|
|
|
|
currency := ""
|
|
amount := decimal.Zero
|
|
if step.Amount != nil && strings.TrimSpace(step.Amount.GetAmount()) != "" {
|
|
value, err := decimalFromMoney(step.Amount)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
amount = value
|
|
currency = strings.ToUpper(strings.TrimSpace(step.Amount.GetCurrency()))
|
|
}
|
|
|
|
candidates := make([]*model.GatewayInstanceDescriptor, 0)
|
|
var lastErr error
|
|
for _, entry := range items {
|
|
if entry == nil || !entry.IsEnabled {
|
|
continue
|
|
}
|
|
if entry.Rail != step.Rail {
|
|
continue
|
|
}
|
|
if step.GatewayID != "" && entry.ID != step.GatewayID {
|
|
continue
|
|
}
|
|
if step.InstanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(step.InstanceID)) {
|
|
continue
|
|
}
|
|
if step.Action != model.RailOperationUnspecified {
|
|
if err := isGatewayEligible(entry, step.Rail, "", currency, step.Action, sendDirectionForRail(step.Rail), amount); err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
}
|
|
candidates = append(candidates, entry)
|
|
}
|
|
if len(candidates) == 0 {
|
|
if lastErr != nil {
|
|
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail: " + lastErr.Error())
|
|
}
|
|
return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail")
|
|
}
|
|
sort.Slice(candidates, func(i, j int) bool {
|
|
return candidates[i].ID < candidates[j].ID
|
|
})
|
|
entry := candidates[0]
|
|
invokeURI := strings.TrimSpace(entry.InvokeURI)
|
|
if invokeURI == "" {
|
|
return nil, merrors.InvalidArgument("rail gateway: invoke uri is required")
|
|
}
|
|
|
|
cfg := chainclient.RailGatewayConfig{
|
|
Rail: string(entry.Rail),
|
|
Network: entry.Network,
|
|
Capabilities: rail.RailCapabilities{
|
|
CanPayIn: entry.Capabilities.CanPayIn,
|
|
CanPayOut: entry.Capabilities.CanPayOut,
|
|
CanReadBalance: entry.Capabilities.CanReadBalance,
|
|
CanSendFee: entry.Capabilities.CanSendFee,
|
|
RequiresObserveConfirm: entry.Capabilities.RequiresObserveConfirm,
|
|
CanBlock: entry.Capabilities.CanBlock,
|
|
CanRelease: entry.Capabilities.CanRelease,
|
|
},
|
|
}
|
|
|
|
g.logger.Info("Rail gateway resolved",
|
|
zap.String("step_id", strings.TrimSpace(step.StepID)),
|
|
zap.String("action", string(step.Action)),
|
|
zap.String("gateway_id", entry.ID),
|
|
zap.String("instance_id", entry.InstanceID),
|
|
zap.String("rail", string(entry.Rail)),
|
|
zap.String("network", entry.Network),
|
|
zap.String("invoke_uri", invokeURI))
|
|
|
|
switch entry.Rail {
|
|
case model.RailProviderSettlement:
|
|
if g.providerResolver == nil {
|
|
return nil, merrors.InvalidArgument("rail gateway: provider settlement resolver required")
|
|
}
|
|
client, err := g.providerResolver.Resolve(ctx, invokeURI)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return NewProviderSettlementGateway(client, cfg), nil
|
|
default:
|
|
if g.chainResolver == nil {
|
|
return nil, merrors.InvalidArgument("rail gateway: chain gateway resolver required")
|
|
}
|
|
client, err := g.chainResolver.Resolve(ctx, invokeURI)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return chainclient.NewRailGateway(client, cfg), nil
|
|
}
|
|
}
|
|
|
|
type oracleDependency struct {
|
|
client oracleclient.Client
|
|
}
|
|
|
|
func (o oracleDependency) available() bool {
|
|
if o.client == nil {
|
|
return false
|
|
}
|
|
if checker, ok := o.client.(interface{ Available() bool }); ok {
|
|
return checker.Available()
|
|
}
|
|
return true
|
|
}
|
|
|
|
type mntxDependency struct {
|
|
client mntxclient.Client
|
|
}
|
|
|
|
func (m mntxDependency) available() bool {
|
|
if m.client == nil {
|
|
return false
|
|
}
|
|
if checker, ok := m.client.(interface{ Available() bool }); ok {
|
|
return checker.Available()
|
|
}
|
|
return true
|
|
}
|
|
|
|
type providerGatewayDependency struct {
|
|
resolver ChainGatewayResolver
|
|
}
|
|
|
|
type staticChainGatewayResolver struct {
|
|
client chainclient.Client
|
|
}
|
|
|
|
func (r staticChainGatewayResolver) Resolve(ctx context.Context, _ string) (chainclient.Client, error) {
|
|
if r.client == nil {
|
|
return nil, merrors.InvalidArgument("chain gateway client is required")
|
|
}
|
|
return r.client, nil
|
|
}
|
|
|
|
// CardGatewayRoute maps a gateway to its funding and fee destinations.
|
|
type CardGatewayRoute struct {
|
|
FundingAddress string
|
|
FeeAddress string
|
|
FeeWalletRef string
|
|
}
|
|
|
|
// WithFeeEngine wires the fee engine client.
|
|
func WithFeeEngine(client feesv1.FeeEngineClient, timeout time.Duration) Option {
|
|
return func(s *Service) {
|
|
s.deps.fees = feesDependency{
|
|
client: client,
|
|
timeout: timeout,
|
|
}
|
|
}
|
|
}
|
|
|
|
func WithPaymentGatewayBroker(broker mb.Broker) Option {
|
|
return func(s *Service) {
|
|
if broker != nil {
|
|
s.gatewayBroker = broker
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithLedgerClient wires the ledger client.
|
|
func WithLedgerClient(client ledgerclient.Client) Option {
|
|
return func(s *Service) {
|
|
s.deps.ledger = ledgerDependency{
|
|
client: client,
|
|
internal: client,
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithChainGatewayClient wires the chain gateway client.
|
|
func WithChainGatewayClient(client chainclient.Client) Option {
|
|
return func(s *Service) {
|
|
s.deps.gateway = gatewayDependency{resolver: staticChainGatewayResolver{client: client}}
|
|
}
|
|
}
|
|
|
|
// WithChainGatewayResolver wires a resolver for chain gateway clients.
|
|
func WithChainGatewayResolver(resolver ChainGatewayResolver) Option {
|
|
return func(s *Service) {
|
|
if resolver != nil {
|
|
s.deps.gateway = gatewayDependency{resolver: resolver}
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithProviderSettlementGatewayClient wires the provider settlement gateway client.
|
|
func WithProviderSettlementGatewayClient(client chainclient.Client) Option {
|
|
return func(s *Service) {
|
|
s.deps.providerGateway = providerGatewayDependency{resolver: staticChainGatewayResolver{client: client}}
|
|
}
|
|
}
|
|
|
|
// WithProviderSettlementGatewayResolver wires a resolver for provider settlement gateway clients.
|
|
func WithProviderSettlementGatewayResolver(resolver ChainGatewayResolver) Option {
|
|
return func(s *Service) {
|
|
if resolver != nil {
|
|
s.deps.providerGateway = providerGatewayDependency{resolver: resolver}
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithGatewayInvokeResolver wires a resolver for gateway invoke URIs.
|
|
func WithGatewayInvokeResolver(resolver GatewayInvokeResolver) Option {
|
|
return func(s *Service) {
|
|
if resolver == nil {
|
|
return
|
|
}
|
|
s.deps.gatewayInvokeResolver = resolver
|
|
s.deps.railGateways.chainResolver = resolver
|
|
s.deps.railGateways.providerResolver = resolver
|
|
}
|
|
}
|
|
|
|
// WithRailGateways wires rail gateway adapters by instance ID.
|
|
func WithRailGateways(gateways map[string]rail.RailGateway) Option {
|
|
return func(s *Service) {
|
|
if len(gateways) == 0 {
|
|
return
|
|
}
|
|
s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gatewayInvokeResolver, s.deps.gatewayInvokeResolver, s.logger)
|
|
}
|
|
}
|
|
|
|
// WithOracleClient wires the FX oracle client.
|
|
func WithOracleClient(client oracleclient.Client) Option {
|
|
return func(s *Service) {
|
|
s.deps.oracle = oracleDependency{client: client}
|
|
}
|
|
}
|
|
|
|
// WithMntxGateway wires the Monetix gateway client.
|
|
func WithMntxGateway(client mntxclient.Client) Option {
|
|
return func(s *Service) {
|
|
s.deps.mntx = mntxDependency{client: client}
|
|
}
|
|
}
|
|
|
|
// WithCardGatewayRoutes configures funding/fee wallet routing per gateway.
|
|
func WithCardGatewayRoutes(routes map[string]CardGatewayRoute) Option {
|
|
return func(s *Service) {
|
|
if len(routes) == 0 {
|
|
return
|
|
}
|
|
s.deps.cardRoutes = make(map[string]CardGatewayRoute, len(routes))
|
|
for k, v := range routes {
|
|
s.deps.cardRoutes[strings.ToLower(strings.TrimSpace(k))] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithFeeLedgerAccounts maps gateway identifiers to ledger accounts used for fees.
|
|
func WithFeeLedgerAccounts(routes map[string]string) Option {
|
|
return func(s *Service) {
|
|
if len(routes) == 0 {
|
|
return
|
|
}
|
|
s.deps.feeLedgerAccounts = make(map[string]string, len(routes))
|
|
for k, v := range routes {
|
|
key := strings.ToLower(strings.TrimSpace(k))
|
|
val := strings.TrimSpace(v)
|
|
if key == "" || val == "" {
|
|
continue
|
|
}
|
|
s.deps.feeLedgerAccounts[key] = val
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithPlanBuilder wires a payment plan builder implementation.
|
|
func WithPlanBuilder(builder PlanBuilder) Option {
|
|
return func(s *Service) {
|
|
if builder != nil {
|
|
s.deps.planBuilder = builder
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithGatewayRegistry wires a registry of gateway instances for routing.
|
|
func WithGatewayRegistry(registry GatewayRegistry) Option {
|
|
return func(s *Service) {
|
|
if registry != nil {
|
|
s.deps.gatewayRegistry = registry
|
|
s.deps.railGateways.registry = registry
|
|
s.deps.railGateways.chainResolver = s.deps.gatewayInvokeResolver
|
|
s.deps.railGateways.providerResolver = s.deps.gatewayInvokeResolver
|
|
s.deps.railGateways.logger = s.logger.Named("rail_gateways")
|
|
if s.deps.planBuilder == nil {
|
|
s.deps.planBuilder = newDefaultPlanBuilder(s.logger)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithClock overrides the default clock.
|
|
func WithClock(clock clockpkg.Clock) Option {
|
|
return func(s *Service) {
|
|
if clock != nil {
|
|
s.clock = clock
|
|
}
|
|
}
|
|
}
|
|
|
|
func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainResolver GatewayInvokeResolver, providerResolver GatewayInvokeResolver, logger mlogger.Logger) railGatewayDependency {
|
|
result := railGatewayDependency{
|
|
byID: map[string]rail.RailGateway{},
|
|
byRail: map[model.Rail][]rail.RailGateway{},
|
|
registry: registry,
|
|
chainResolver: chainResolver,
|
|
providerResolver: providerResolver,
|
|
logger: logger,
|
|
}
|
|
if len(gateways) == 0 {
|
|
return result
|
|
}
|
|
|
|
type item struct {
|
|
id string
|
|
gw rail.RailGateway
|
|
}
|
|
itemsByRail := map[model.Rail][]item{}
|
|
|
|
for id, gw := range gateways {
|
|
cleanID := strings.TrimSpace(id)
|
|
if cleanID == "" || gw == nil {
|
|
continue
|
|
}
|
|
result.byID[cleanID] = gw
|
|
railID := parseRailValue(gw.Rail())
|
|
if railID == model.RailUnspecified {
|
|
continue
|
|
}
|
|
itemsByRail[railID] = append(itemsByRail[railID], item{id: cleanID, gw: gw})
|
|
}
|
|
|
|
for railID, items := range itemsByRail {
|
|
sort.Slice(items, func(i, j int) bool {
|
|
return items[i].id < items[j].id
|
|
})
|
|
for _, entry := range items {
|
|
result.byRail[railID] = append(result.byRail[railID], entry.gw)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|