package orchestrator import ( "context" "sort" "strings" "time" "github.com/shopspring/decimal" oracleclient "github.com/tech/sendico/fx/oracle/client" chainclient "github.com/tech/sendico/gateway/chain/client" mntxclient "github.com/tech/sendico/gateway/mntx/client" ledgerclient "github.com/tech/sendico/ledger/client" "github.com/tech/sendico/payments/storage/model" clockpkg "github.com/tech/sendico/pkg/clock" "github.com/tech/sendico/pkg/merrors" mb "github.com/tech/sendico/pkg/messaging/broker" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/payments/rail" feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1" "go.uber.org/zap" ) // Option configures service dependencies. type Option func(*Service) // GatewayInvokeResolver resolves gateway invoke URIs into chain gateway clients. type GatewayInvokeResolver interface { Resolve(ctx context.Context, invokeURI string) (chainclient.Client, error) } // ChainGatewayResolver resolves chain gateway clients by network. type ChainGatewayResolver interface { Resolve(ctx context.Context, network string) (chainclient.Client, error) } type feesDependency struct { client feesv1.FeeEngineClient timeout time.Duration } func (f feesDependency) available() bool { if f.client == nil { return false } if checker, ok := f.client.(interface{ Available() bool }); ok { return checker.Available() } return true } type ledgerDependency struct { client ledgerclient.Client internal rail.InternalLedger } type gatewayDependency struct { resolver ChainGatewayResolver } type railGatewayDependency struct { byID map[string]rail.RailGateway byRail map[model.Rail][]rail.RailGateway registry GatewayRegistry chainResolver GatewayInvokeResolver providerResolver GatewayInvokeResolver logger mlogger.Logger } func (g railGatewayDependency) available() bool { return len(g.byID) > 0 || len(g.byRail) > 0 || (g.registry != nil && (g.chainResolver != nil || g.providerResolver != nil)) } func (g railGatewayDependency) resolve(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) { if step == nil { return nil, merrors.InvalidArgument("rail gateway: step is required") } if id := strings.TrimSpace(step.GatewayID); id != "" { if gw, ok := g.byID[id]; ok { return gw, nil } return g.resolveDynamic(ctx, step) } if len(g.byRail) == 0 { return g.resolveDynamic(ctx, step) } list := g.byRail[step.Rail] if len(list) == 0 { return g.resolveDynamic(ctx, step) } return list[0], nil } func (g railGatewayDependency) resolveDynamic(ctx context.Context, step *model.PaymentStep) (rail.RailGateway, error) { if g.registry == nil { return nil, merrors.InvalidArgument("rail gateway: registry is required") } if g.chainResolver == nil && g.providerResolver == nil { return nil, merrors.InvalidArgument("rail gateway: gateway resolver is required") } items, err := g.registry.List(ctx) if err != nil { return nil, err } if len(items) == 0 { return nil, merrors.InvalidArgument("rail gateway: no gateway instances available") } currency := "" amount := decimal.Zero if step.Amount != nil && strings.TrimSpace(step.Amount.GetAmount()) != "" { value, err := decimalFromMoney(step.Amount) if err != nil { return nil, err } amount = value currency = strings.ToUpper(strings.TrimSpace(step.Amount.GetCurrency())) } candidates := make([]*model.GatewayInstanceDescriptor, 0) var lastErr error for _, entry := range items { if entry == nil || !entry.IsEnabled { continue } if entry.Rail != step.Rail { continue } if step.GatewayID != "" && entry.ID != step.GatewayID { continue } if step.InstanceID != "" && !strings.EqualFold(strings.TrimSpace(entry.InstanceID), strings.TrimSpace(step.InstanceID)) { continue } if step.Action != model.RailOperationUnspecified { if err := isGatewayEligible(entry, step.Rail, "", currency, step.Action, sendDirectionForRail(step.Rail), amount); err != nil { lastErr = err continue } } candidates = append(candidates, entry) } if len(candidates) == 0 { if lastErr != nil { return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail: " + lastErr.Error()) } return nil, merrors.InvalidArgument("rail gateway: missing gateway for rail") } sort.Slice(candidates, func(i, j int) bool { return candidates[i].ID < candidates[j].ID }) entry := candidates[0] invokeURI := strings.TrimSpace(entry.InvokeURI) if invokeURI == "" { return nil, merrors.InvalidArgument("rail gateway: invoke uri is required") } cfg := chainclient.RailGatewayConfig{ Rail: string(entry.Rail), Network: entry.Network, Capabilities: rail.RailCapabilities{ CanPayIn: entry.Capabilities.CanPayIn, CanPayOut: entry.Capabilities.CanPayOut, CanReadBalance: entry.Capabilities.CanReadBalance, CanSendFee: entry.Capabilities.CanSendFee, RequiresObserveConfirm: entry.Capabilities.RequiresObserveConfirm, CanBlock: entry.Capabilities.CanBlock, CanRelease: entry.Capabilities.CanRelease, }, } g.logger.Info("Rail gateway resolved", zap.String("step_id", strings.TrimSpace(step.StepID)), zap.String("action", string(step.Action)), zap.String("gateway_id", entry.ID), zap.String("instance_id", entry.InstanceID), zap.String("rail", string(entry.Rail)), zap.String("network", entry.Network), zap.String("invoke_uri", invokeURI)) switch entry.Rail { case model.RailProviderSettlement: if g.providerResolver == nil { return nil, merrors.InvalidArgument("rail gateway: provider settlement resolver required") } client, err := g.providerResolver.Resolve(ctx, invokeURI) if err != nil { return nil, err } return NewProviderSettlementGateway(client, cfg), nil default: if g.chainResolver == nil { return nil, merrors.InvalidArgument("rail gateway: chain gateway resolver required") } client, err := g.chainResolver.Resolve(ctx, invokeURI) if err != nil { return nil, err } return chainclient.NewRailGateway(client, cfg), nil } } type oracleDependency struct { client oracleclient.Client } func (o oracleDependency) available() bool { if o.client == nil { return false } if checker, ok := o.client.(interface{ Available() bool }); ok { return checker.Available() } return true } type mntxDependency struct { client mntxclient.Client } func (m mntxDependency) available() bool { if m.client == nil { return false } if checker, ok := m.client.(interface{ Available() bool }); ok { return checker.Available() } return true } type providerGatewayDependency struct { resolver ChainGatewayResolver } type staticChainGatewayResolver struct { client chainclient.Client } func (r staticChainGatewayResolver) Resolve(ctx context.Context, _ string) (chainclient.Client, error) { if r.client == nil { return nil, merrors.InvalidArgument("chain gateway client is required") } return r.client, nil } // CardGatewayRoute maps a gateway to its funding and fee destinations. type CardGatewayRoute struct { FundingAddress string FeeAddress string FeeWalletRef string } // WithFeeEngine wires the fee engine client. func WithFeeEngine(client feesv1.FeeEngineClient, timeout time.Duration) Option { return func(s *Service) { s.deps.fees = feesDependency{ client: client, timeout: timeout, } } } func WithPaymentGatewayBroker(broker mb.Broker) Option { return func(s *Service) { if broker != nil { s.gatewayBroker = broker } } } // WithLedgerClient wires the ledger client. func WithLedgerClient(client ledgerclient.Client) Option { return func(s *Service) { s.deps.ledger = ledgerDependency{ client: client, internal: client, } } } // WithChainGatewayClient wires the chain gateway client. func WithChainGatewayClient(client chainclient.Client) Option { return func(s *Service) { s.deps.gateway = gatewayDependency{resolver: staticChainGatewayResolver{client: client}} } } // WithChainGatewayResolver wires a resolver for chain gateway clients. func WithChainGatewayResolver(resolver ChainGatewayResolver) Option { return func(s *Service) { if resolver != nil { s.deps.gateway = gatewayDependency{resolver: resolver} } } } // WithProviderSettlementGatewayClient wires the provider settlement gateway client. func WithProviderSettlementGatewayClient(client chainclient.Client) Option { return func(s *Service) { s.deps.providerGateway = providerGatewayDependency{resolver: staticChainGatewayResolver{client: client}} } } // WithProviderSettlementGatewayResolver wires a resolver for provider settlement gateway clients. func WithProviderSettlementGatewayResolver(resolver ChainGatewayResolver) Option { return func(s *Service) { if resolver != nil { s.deps.providerGateway = providerGatewayDependency{resolver: resolver} } } } // WithGatewayInvokeResolver wires a resolver for gateway invoke URIs. func WithGatewayInvokeResolver(resolver GatewayInvokeResolver) Option { return func(s *Service) { if resolver == nil { return } s.deps.gatewayInvokeResolver = resolver s.deps.railGateways.chainResolver = resolver s.deps.railGateways.providerResolver = resolver } } // WithRailGateways wires rail gateway adapters by instance ID. func WithRailGateways(gateways map[string]rail.RailGateway) Option { return func(s *Service) { if len(gateways) == 0 { return } s.deps.railGateways = buildRailGatewayDependency(gateways, s.deps.gatewayRegistry, s.deps.gatewayInvokeResolver, s.deps.gatewayInvokeResolver, s.logger) } } // WithOracleClient wires the FX oracle client. func WithOracleClient(client oracleclient.Client) Option { return func(s *Service) { s.deps.oracle = oracleDependency{client: client} } } // WithMntxGateway wires the Monetix gateway client. func WithMntxGateway(client mntxclient.Client) Option { return func(s *Service) { s.deps.mntx = mntxDependency{client: client} } } // WithCardGatewayRoutes configures funding/fee wallet routing per gateway. func WithCardGatewayRoutes(routes map[string]CardGatewayRoute) Option { return func(s *Service) { if len(routes) == 0 { return } s.deps.cardRoutes = make(map[string]CardGatewayRoute, len(routes)) for k, v := range routes { s.deps.cardRoutes[strings.ToLower(strings.TrimSpace(k))] = v } } } // WithFeeLedgerAccounts maps gateway identifiers to ledger accounts used for fees. func WithFeeLedgerAccounts(routes map[string]string) Option { return func(s *Service) { if len(routes) == 0 { return } s.deps.feeLedgerAccounts = make(map[string]string, len(routes)) for k, v := range routes { key := strings.ToLower(strings.TrimSpace(k)) val := strings.TrimSpace(v) if key == "" || val == "" { continue } s.deps.feeLedgerAccounts[key] = val } } } // WithPlanBuilder wires a payment plan builder implementation. func WithPlanBuilder(builder PlanBuilder) Option { return func(s *Service) { if builder != nil { s.deps.planBuilder = builder } } } // WithGatewayRegistry wires a registry of gateway instances for routing. func WithGatewayRegistry(registry GatewayRegistry) Option { return func(s *Service) { if registry != nil { s.deps.gatewayRegistry = registry s.deps.railGateways.registry = registry s.deps.railGateways.chainResolver = s.deps.gatewayInvokeResolver s.deps.railGateways.providerResolver = s.deps.gatewayInvokeResolver s.deps.railGateways.logger = s.logger.Named("rail_gateways") if s.deps.planBuilder == nil { s.deps.planBuilder = newDefaultPlanBuilder(s.logger) } } } } // WithClock overrides the default clock. func WithClock(clock clockpkg.Clock) Option { return func(s *Service) { if clock != nil { s.clock = clock } } } func buildRailGatewayDependency(gateways map[string]rail.RailGateway, registry GatewayRegistry, chainResolver GatewayInvokeResolver, providerResolver GatewayInvokeResolver, logger mlogger.Logger) railGatewayDependency { result := railGatewayDependency{ byID: map[string]rail.RailGateway{}, byRail: map[model.Rail][]rail.RailGateway{}, registry: registry, chainResolver: chainResolver, providerResolver: providerResolver, logger: logger, } if len(gateways) == 0 { return result } type item struct { id string gw rail.RailGateway } itemsByRail := map[model.Rail][]item{} for id, gw := range gateways { cleanID := strings.TrimSpace(id) if cleanID == "" || gw == nil { continue } result.byID[cleanID] = gw railID := parseRailValue(gw.Rail()) if railID == model.RailUnspecified { continue } itemsByRail[railID] = append(itemsByRail[railID], item{id: cleanID, gw: gw}) } for railID, items := range itemsByRail { sort.Slice(items, func(i, j int) bool { return items[i].id < items[j].id }) for _, entry := range items { result.byRail[railID] = append(result.byRail[railID], entry.gw) } } return result }