package orchestrator import ( "context" "github.com/tech/sendico/payments/storage" "github.com/tech/sendico/payments/storage/model" "github.com/tech/sendico/pkg/api/routers" clockpkg "github.com/tech/sendico/pkg/clock" msg "github.com/tech/sendico/pkg/messaging" mb "github.com/tech/sendico/pkg/messaging/broker" "github.com/tech/sendico/pkg/mlogger" orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1" "google.golang.org/grpc" ) type serviceError string func (e serviceError) Error() string { return string(e) } const ( defaultFeeQuoteTTLMillis int64 = 120000 defaultOracleTTLMillis int64 = 60000 ) var ( errStorageUnavailable = serviceError("payments.orchestrator: storage not initialised") ) // Service orchestrates payments across ledger, billing, FX, and chain domains. type Service struct { logger mlogger.Logger storage storage.Repository clock clockpkg.Clock deps serviceDependencies h handlerSet comp componentSet gatewayBroker mb.Broker gatewayConsumers []msg.Consumer orchestratorv1.UnimplementedPaymentOrchestratorServer } type serviceDependencies struct { fees feesDependency ledger ledgerDependency gateway gatewayDependency railGateways railGatewayDependency providerGateway providerGatewayDependency oracle oracleDependency mntx mntxDependency gatewayRegistry GatewayRegistry gatewayInvokeResolver GatewayInvokeResolver cardRoutes map[string]CardGatewayRoute feeLedgerAccounts map[string]string planBuilder PlanBuilder } type handlerSet struct { commands *paymentCommandFactory queries *paymentQueryHandler events *paymentEventHandler } type componentSet struct { executor *paymentExecutor } // NewService constructs a payment orchestrator service. func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option) *Service { svc := &Service{ logger: logger.Named("payment_orchestrator"), storage: repo, clock: clockpkg.NewSystem(), } initMetrics() for _, opt := range opts { if opt != nil { opt(svc) } } if svc.clock == nil { svc.clock = clockpkg.NewSystem() } engine := defaultPaymentEngine{svc: svc} svc.h.commands = newPaymentCommandFactory(engine, svc.logger) svc.h.queries = newPaymentQueryHandler(svc.storage, svc.ensureRepository, svc.logger.Named("queries")) svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger.Named("events"), svc.submitCardPayout, svc.resumePaymentPlan, svc.releasePaymentHold) svc.comp.executor = newPaymentExecutor(&svc.deps, svc.logger.Named("payment_executor"), svc) svc.startGatewayConsumers() return svc } func (s *Service) ensureHandlers() { if s.h.commands == nil { s.h.commands = newPaymentCommandFactory(defaultPaymentEngine{svc: s}, s.logger) } if s.h.queries == nil { s.h.queries = newPaymentQueryHandler(s.storage, s.ensureRepository, s.logger.Named("queries")) } if s.h.events == nil { s.h.events = newPaymentEventHandler(s.storage, s.ensureRepository, s.logger.Named("events"), s.submitCardPayout, s.resumePaymentPlan, s.releasePaymentHold) } if s.comp.executor == nil { s.comp.executor = newPaymentExecutor(&s.deps, s.logger.Named("payment_executor"), s) } } // Register attaches the service to the supplied gRPC router. func (s *Service) Register(router routers.GRPC) error { return router.Register(func(reg grpc.ServiceRegistrar) { orchestratorv1.RegisterPaymentOrchestratorServer(reg, s) }) } // QuotePayment aggregates downstream quotes. func (s *Service) QuotePayment(ctx context.Context, req *orchestratorv1.QuotePaymentRequest) (*orchestratorv1.QuotePaymentResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "QuotePayment", s.h.commands.QuotePayment().Execute, req) } // QuotePayments aggregates downstream quotes for multiple intents. func (s *Service) QuotePayments(ctx context.Context, req *orchestratorv1.QuotePaymentsRequest) (*orchestratorv1.QuotePaymentsResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "QuotePayments", s.h.commands.QuotePayments().Execute, req) } // InitiatePayment captures a payment intent and reserves funds orchestration. func (s *Service) InitiatePayment(ctx context.Context, req *orchestratorv1.InitiatePaymentRequest) (*orchestratorv1.InitiatePaymentResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "InitiatePayment", s.h.commands.InitiatePayment().Execute, req) } // InitiatePayments executes multiple payments using a stored quote reference. func (s *Service) InitiatePayments(ctx context.Context, req *orchestratorv1.InitiatePaymentsRequest) (*orchestratorv1.InitiatePaymentsResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "InitiatePayments", s.h.commands.InitiatePayments().Execute, req) } // CancelPayment attempts to cancel an in-flight payment. func (s *Service) CancelPayment(ctx context.Context, req *orchestratorv1.CancelPaymentRequest) (*orchestratorv1.CancelPaymentResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "CancelPayment", s.h.commands.CancelPayment().Execute, req) } // GetPayment returns a stored payment record. func (s *Service) GetPayment(ctx context.Context, req *orchestratorv1.GetPaymentRequest) (*orchestratorv1.GetPaymentResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "GetPayment", s.h.queries.getPayment, req) } // ListPayments lists stored payment records. func (s *Service) ListPayments(ctx context.Context, req *orchestratorv1.ListPaymentsRequest) (*orchestratorv1.ListPaymentsResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "ListPayments", s.h.queries.listPayments, req) } // InitiateConversion orchestrates standalone FX conversions. func (s *Service) InitiateConversion(ctx context.Context, req *orchestratorv1.InitiateConversionRequest) (*orchestratorv1.InitiateConversionResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "InitiateConversion", s.h.commands.InitiateConversion().Execute, req) } // ProcessTransferUpdate reconciles chain events back into payment state. func (s *Service) ProcessTransferUpdate(ctx context.Context, req *orchestratorv1.ProcessTransferUpdateRequest) (*orchestratorv1.ProcessTransferUpdateResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "ProcessTransferUpdate", s.h.events.processTransferUpdate, req) } // ProcessDepositObserved reconciles deposit events to ledger. func (s *Service) ProcessDepositObserved(ctx context.Context, req *orchestratorv1.ProcessDepositObservedRequest) (*orchestratorv1.ProcessDepositObservedResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "ProcessDepositObserved", s.h.events.processDepositObserved, req) } // ProcessCardPayoutUpdate reconciles card payout events back into payment state. func (s *Service) ProcessCardPayoutUpdate(ctx context.Context, req *orchestratorv1.ProcessCardPayoutUpdateRequest) (*orchestratorv1.ProcessCardPayoutUpdateResponse, error) { s.ensureHandlers() return executeUnary(ctx, s, "ProcessCardPayoutUpdate", s.h.events.processCardPayoutUpdate, req) } func (s *Service) executePayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment, quote *orchestratorv1.PaymentQuote) error { s.ensureHandlers() return s.comp.executor.executePayment(ctx, store, payment, quote) } func (s *Service) resumePaymentPlan(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error { if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 { return nil } s.ensureHandlers() return s.comp.executor.executePaymentPlan(ctx, store, payment, nil) } func (s *Service) releasePaymentHold(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error { if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 { return nil } s.ensureHandlers() return s.comp.executor.releasePaymentHold(ctx, store, payment) }