separated quotation and payments
This commit is contained in:
210
api/payments/quotation/internal/service/orchestrator/service.go
Normal file
210
api/payments/quotation/internal/service/orchestrator/service.go
Normal file
@@ -0,0 +1,210 @@
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/payments/storage"
|
||||
"github.com/tech/sendico/payments/storage/model"
|
||||
"github.com/tech/sendico/pkg/api/routers"
|
||||
clockpkg "github.com/tech/sendico/pkg/clock"
|
||||
msg "github.com/tech/sendico/pkg/messaging"
|
||||
mb "github.com/tech/sendico/pkg/messaging/broker"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type serviceError string
|
||||
|
||||
func (e serviceError) Error() string {
|
||||
return string(e)
|
||||
}
|
||||
|
||||
const (
|
||||
defaultFeeQuoteTTLMillis int64 = 120000
|
||||
defaultOracleTTLMillis int64 = 60000
|
||||
)
|
||||
|
||||
var (
|
||||
errStorageUnavailable = serviceError("payments.orchestrator: storage not initialised")
|
||||
)
|
||||
|
||||
// Service orchestrates payments across ledger, billing, FX, and chain domains.
|
||||
type Service struct {
|
||||
logger mlogger.Logger
|
||||
storage storage.Repository
|
||||
clock clockpkg.Clock
|
||||
|
||||
deps serviceDependencies
|
||||
h handlerSet
|
||||
comp componentSet
|
||||
|
||||
gatewayBroker mb.Broker
|
||||
gatewayConsumers []msg.Consumer
|
||||
|
||||
orchestratorv1.UnimplementedPaymentOrchestratorServer
|
||||
}
|
||||
|
||||
type serviceDependencies struct {
|
||||
fees feesDependency
|
||||
ledger ledgerDependency
|
||||
gateway gatewayDependency
|
||||
railGateways railGatewayDependency
|
||||
providerGateway providerGatewayDependency
|
||||
oracle oracleDependency
|
||||
mntx mntxDependency
|
||||
gatewayRegistry GatewayRegistry
|
||||
gatewayInvokeResolver GatewayInvokeResolver
|
||||
cardRoutes map[string]CardGatewayRoute
|
||||
feeLedgerAccounts map[string]string
|
||||
planBuilder PlanBuilder
|
||||
}
|
||||
|
||||
type handlerSet struct {
|
||||
commands *paymentCommandFactory
|
||||
queries *paymentQueryHandler
|
||||
events *paymentEventHandler
|
||||
}
|
||||
|
||||
type componentSet struct {
|
||||
executor *paymentExecutor
|
||||
}
|
||||
|
||||
// NewService constructs a payment orchestrator service.
|
||||
func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option) *Service {
|
||||
svc := &Service{
|
||||
logger: logger.Named("payment_orchestrator"),
|
||||
storage: repo,
|
||||
clock: clockpkg.NewSystem(),
|
||||
}
|
||||
|
||||
initMetrics()
|
||||
|
||||
for _, opt := range opts {
|
||||
if opt != nil {
|
||||
opt(svc)
|
||||
}
|
||||
}
|
||||
|
||||
if svc.clock == nil {
|
||||
svc.clock = clockpkg.NewSystem()
|
||||
}
|
||||
|
||||
engine := defaultPaymentEngine{svc: svc}
|
||||
svc.h.commands = newPaymentCommandFactory(engine, svc.logger)
|
||||
svc.h.queries = newPaymentQueryHandler(svc.storage, svc.ensureRepository, svc.logger.Named("queries"))
|
||||
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger.Named("events"), svc.submitCardPayout, svc.resumePaymentPlan, svc.releasePaymentHold)
|
||||
svc.comp.executor = newPaymentExecutor(&svc.deps, svc.logger.Named("payment_executor"), svc)
|
||||
svc.startGatewayConsumers()
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
func (s *Service) ensureHandlers() {
|
||||
if s.h.commands == nil {
|
||||
s.h.commands = newPaymentCommandFactory(defaultPaymentEngine{svc: s}, s.logger)
|
||||
}
|
||||
if s.h.queries == nil {
|
||||
s.h.queries = newPaymentQueryHandler(s.storage, s.ensureRepository, s.logger.Named("queries"))
|
||||
}
|
||||
if s.h.events == nil {
|
||||
s.h.events = newPaymentEventHandler(s.storage, s.ensureRepository, s.logger.Named("events"), s.submitCardPayout, s.resumePaymentPlan, s.releasePaymentHold)
|
||||
}
|
||||
if s.comp.executor == nil {
|
||||
s.comp.executor = newPaymentExecutor(&s.deps, s.logger.Named("payment_executor"), s)
|
||||
}
|
||||
}
|
||||
|
||||
// Register attaches the service to the supplied gRPC router.
|
||||
func (s *Service) Register(router routers.GRPC) error {
|
||||
return router.Register(func(reg grpc.ServiceRegistrar) {
|
||||
orchestratorv1.RegisterPaymentOrchestratorServer(reg, s)
|
||||
})
|
||||
}
|
||||
|
||||
// QuotePayment aggregates downstream quotes.
|
||||
func (s *Service) QuotePayment(ctx context.Context, req *orchestratorv1.QuotePaymentRequest) (*orchestratorv1.QuotePaymentResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "QuotePayment", s.h.commands.QuotePayment().Execute, req)
|
||||
}
|
||||
|
||||
// QuotePayments aggregates downstream quotes for multiple intents.
|
||||
func (s *Service) QuotePayments(ctx context.Context, req *orchestratorv1.QuotePaymentsRequest) (*orchestratorv1.QuotePaymentsResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "QuotePayments", s.h.commands.QuotePayments().Execute, req)
|
||||
}
|
||||
|
||||
// InitiatePayment captures a payment intent and reserves funds orchestration.
|
||||
func (s *Service) InitiatePayment(ctx context.Context, req *orchestratorv1.InitiatePaymentRequest) (*orchestratorv1.InitiatePaymentResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "InitiatePayment", s.h.commands.InitiatePayment().Execute, req)
|
||||
}
|
||||
|
||||
// InitiatePayments executes multiple payments using a stored quote reference.
|
||||
func (s *Service) InitiatePayments(ctx context.Context, req *orchestratorv1.InitiatePaymentsRequest) (*orchestratorv1.InitiatePaymentsResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "InitiatePayments", s.h.commands.InitiatePayments().Execute, req)
|
||||
}
|
||||
|
||||
// CancelPayment attempts to cancel an in-flight payment.
|
||||
func (s *Service) CancelPayment(ctx context.Context, req *orchestratorv1.CancelPaymentRequest) (*orchestratorv1.CancelPaymentResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "CancelPayment", s.h.commands.CancelPayment().Execute, req)
|
||||
}
|
||||
|
||||
// GetPayment returns a stored payment record.
|
||||
func (s *Service) GetPayment(ctx context.Context, req *orchestratorv1.GetPaymentRequest) (*orchestratorv1.GetPaymentResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "GetPayment", s.h.queries.getPayment, req)
|
||||
}
|
||||
|
||||
// ListPayments lists stored payment records.
|
||||
func (s *Service) ListPayments(ctx context.Context, req *orchestratorv1.ListPaymentsRequest) (*orchestratorv1.ListPaymentsResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "ListPayments", s.h.queries.listPayments, req)
|
||||
}
|
||||
|
||||
// InitiateConversion orchestrates standalone FX conversions.
|
||||
func (s *Service) InitiateConversion(ctx context.Context, req *orchestratorv1.InitiateConversionRequest) (*orchestratorv1.InitiateConversionResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "InitiateConversion", s.h.commands.InitiateConversion().Execute, req)
|
||||
}
|
||||
|
||||
// ProcessTransferUpdate reconciles chain events back into payment state.
|
||||
func (s *Service) ProcessTransferUpdate(ctx context.Context, req *orchestratorv1.ProcessTransferUpdateRequest) (*orchestratorv1.ProcessTransferUpdateResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "ProcessTransferUpdate", s.h.events.processTransferUpdate, req)
|
||||
}
|
||||
|
||||
// ProcessDepositObserved reconciles deposit events to ledger.
|
||||
func (s *Service) ProcessDepositObserved(ctx context.Context, req *orchestratorv1.ProcessDepositObservedRequest) (*orchestratorv1.ProcessDepositObservedResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "ProcessDepositObserved", s.h.events.processDepositObserved, req)
|
||||
}
|
||||
|
||||
// ProcessCardPayoutUpdate reconciles card payout events back into payment state.
|
||||
func (s *Service) ProcessCardPayoutUpdate(ctx context.Context, req *orchestratorv1.ProcessCardPayoutUpdateRequest) (*orchestratorv1.ProcessCardPayoutUpdateResponse, error) {
|
||||
s.ensureHandlers()
|
||||
return executeUnary(ctx, s, "ProcessCardPayoutUpdate", s.h.events.processCardPayoutUpdate, req)
|
||||
}
|
||||
|
||||
func (s *Service) executePayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment, quote *orchestratorv1.PaymentQuote) error {
|
||||
s.ensureHandlers()
|
||||
return s.comp.executor.executePayment(ctx, store, payment, quote)
|
||||
}
|
||||
|
||||
func (s *Service) resumePaymentPlan(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error {
|
||||
if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 {
|
||||
return nil
|
||||
}
|
||||
s.ensureHandlers()
|
||||
return s.comp.executor.executePaymentPlan(ctx, store, payment, nil)
|
||||
}
|
||||
|
||||
func (s *Service) releasePaymentHold(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error {
|
||||
if payment == nil || payment.PaymentPlan == nil || len(payment.PaymentPlan.Steps) == 0 {
|
||||
return nil
|
||||
}
|
||||
s.ensureHandlers()
|
||||
return s.comp.executor.releasePaymentHold(ctx, store, payment)
|
||||
}
|
||||
Reference in New Issue
Block a user