payment rails

This commit is contained in:
Stephan D
2025-12-10 18:40:55 +01:00
parent 81d2db394b
commit 47899e25d4
39 changed files with 3043 additions and 909 deletions

View File

@@ -7,8 +7,8 @@ import (
"strings"
"time"
chainclient "github.com/tech/sendico/gateway/chain/client"
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
"github.com/tech/sendico/payments/orchestrator/storage"
@@ -41,10 +41,11 @@ type Imp struct {
type config struct {
*grpcapp.Config `yaml:",inline"`
Fees clientConfig `yaml:"fees"`
Ledger clientConfig `yaml:"ledger"`
Gateway clientConfig `yaml:"gateway"`
Oracle clientConfig `yaml:"oracle"`
Fees clientConfig `yaml:"fees"`
Ledger clientConfig `yaml:"ledger"`
Gateway clientConfig `yaml:"gateway"`
Oracle clientConfig `yaml:"oracle"`
CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"`
}
type clientConfig struct {
@@ -54,6 +55,11 @@ type clientConfig struct {
InsecureTransport bool `yaml:"insecure"`
}
type cardGatewayRouteConfig struct {
FundingAddress string `yaml:"funding_address"`
FeeAddress string `yaml:"fee_address"`
}
func (c clientConfig) address() string {
return strings.TrimSpace(c.Address)
}
@@ -107,7 +113,7 @@ func (i *Imp) Shutdown() {
func (i *Imp) Start() error {
cfg, err := i.loadConfig()
if err != nil {
if err != nil {
return err
}
i.config = cfg
@@ -150,6 +156,9 @@ func (i *Imp) Start() error {
if oracleClient != nil {
opts = append(opts, orchestrator.WithOracleClient(oracleClient))
}
if routes := buildCardGatewayRoutes(cfg.CardGateways); len(routes) > 0 {
opts = append(opts, orchestrator.WithCardGatewayRoutes(routes))
}
return orchestrator.NewService(logger, repo, opts...), nil
}
@@ -296,3 +305,21 @@ func (i *Imp) loadConfig() (*config, error) {
return cfg, nil
}
func buildCardGatewayRoutes(src map[string]cardGatewayRouteConfig) map[string]orchestrator.CardGatewayRoute {
if len(src) == 0 {
return nil
}
result := make(map[string]orchestrator.CardGatewayRoute, len(src))
for key, route := range src {
trimmedKey := strings.TrimSpace(key)
if trimmedKey == "" {
continue
}
result[trimmedKey] = orchestrator.CardGatewayRoute{
FundingAddress: strings.TrimSpace(route.FundingAddress),
FeeAddress: strings.TrimSpace(route.FeeAddress),
}
}
return result
}

View File

@@ -0,0 +1,252 @@
package orchestrator
import (
"context"
"strings"
"github.com/shopspring/decimal"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.uber.org/zap"
)
const defaultCardGateway = "monetix"
func (s *Service) cardRoute(gateway string) (CardGatewayRoute, error) {
if len(s.deps.cardRoutes) == 0 {
s.logger.Warn("card routing not configured", zap.String("gateway", gateway))
return CardGatewayRoute{}, merrors.InvalidArgument("card routing not configured")
}
key := strings.ToLower(strings.TrimSpace(gateway))
if key == "" {
key = defaultCardGateway
}
route, ok := s.deps.cardRoutes[key]
if !ok {
s.logger.Warn("card routing missing for gateway", zap.String("gateway", key))
return CardGatewayRoute{}, merrors.InvalidArgument("card routing missing for gateway " + key)
}
if strings.TrimSpace(route.FundingAddress) == "" {
s.logger.Warn("card routing missing funding address", zap.String("gateway", key))
return CardGatewayRoute{}, merrors.InvalidArgument("card funding address is required for gateway " + key)
}
return route, nil
}
func (s *Service) submitCardFundingTransfers(ctx context.Context, payment *model.Payment, quote *orchestratorv1.PaymentQuote) error {
if payment == nil {
return merrors.InvalidArgument("payment is required")
}
intent := payment.Intent
source := intent.Source.ManagedWallet
if source == nil || strings.TrimSpace(source.ManagedWalletRef) == "" {
return merrors.InvalidArgument("card funding: source managed wallet is required")
}
if !s.deps.gateway.available() {
s.logger.Warn("card funding aborted: chain gateway unavailable")
return merrors.InvalidArgument("card funding: chain gateway unavailable")
}
route, err := s.cardRoute(defaultCardGateway)
if err != nil {
return err
}
amount := cloneMoney(intent.Amount)
if amount == nil {
return merrors.InvalidArgument("card funding: amount is required")
}
exec := payment.Execution
if exec == nil {
exec = &model.ExecutionRefs{}
}
// Transfer payout amount to funding wallet.
fundReq := &chainv1.SubmitTransferRequest{
IdempotencyKey: payment.IdempotencyKey + ":card:fund",
OrganizationRef: payment.OrganizationRef.Hex(),
SourceWalletRef: strings.TrimSpace(source.ManagedWalletRef),
Destination: &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ExternalAddress{ExternalAddress: strings.TrimSpace(route.FundingAddress)},
},
Amount: amount,
Metadata: cloneMetadata(payment.Metadata),
ClientReference: payment.PaymentRef,
}
fundResp, err := s.deps.gateway.client.SubmitTransfer(ctx, fundReq)
if err != nil {
s.logger.Warn("card funding transfer failed", zap.Error(err), zap.String("payment_ref", payment.PaymentRef))
return err
}
if fundResp != nil && fundResp.GetTransfer() != nil {
exec.ChainTransferRef = strings.TrimSpace(fundResp.GetTransfer().GetTransferRef())
}
s.logger.Info("card funding transfer submitted", zap.String("payment_ref", payment.PaymentRef), zap.String("transfer_ref", exec.ChainTransferRef))
feeMoney := quote.GetExpectedFeeTotal()
if feeMoney != nil && strings.TrimSpace(feeMoney.GetAmount()) != "" {
if strings.TrimSpace(route.FeeAddress) == "" {
return merrors.InvalidArgument("card funding: fee address is required when fee exists")
}
feeDecimal, err := decimalFromMoney(feeMoney)
if err != nil {
return err
}
if feeDecimal.IsPositive() {
feeReq := &chainv1.SubmitTransferRequest{
IdempotencyKey: payment.IdempotencyKey + ":card:fee",
OrganizationRef: payment.OrganizationRef.Hex(),
SourceWalletRef: strings.TrimSpace(source.ManagedWalletRef),
Destination: &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ExternalAddress{ExternalAddress: strings.TrimSpace(route.FeeAddress)},
},
Amount: feeMoney,
Metadata: cloneMetadata(payment.Metadata),
ClientReference: payment.PaymentRef,
}
feeResp, feeErr := s.deps.gateway.client.SubmitTransfer(ctx, feeReq)
if feeErr != nil {
s.logger.Warn("card fee transfer failed", zap.Error(feeErr), zap.String("payment_ref", payment.PaymentRef))
return feeErr
}
if feeResp != nil && feeResp.GetTransfer() != nil {
exec.FeeTransferRef = strings.TrimSpace(feeResp.GetTransfer().GetTransferRef())
}
s.logger.Info("card fee transfer submitted", zap.String("payment_ref", payment.PaymentRef), zap.String("transfer_ref", exec.FeeTransferRef))
}
}
payment.Execution = exec
return nil
}
func (s *Service) submitCardPayout(ctx context.Context, payment *model.Payment) error {
if payment == nil {
return merrors.InvalidArgument("payment is required")
}
intent := payment.Intent
card := intent.Destination.Card
if card == nil {
return merrors.InvalidArgument("card payout: card endpoint is required")
}
amount := cloneMoney(intent.Amount)
if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" {
return merrors.InvalidArgument("card payout: amount is required")
}
amtDec, err := decimalFromMoney(amount)
if err != nil {
return err
}
minor := amtDec.Mul(decimal.NewFromInt(100)).IntPart()
payoutID := payment.PaymentRef
currency := strings.TrimSpace(amount.GetCurrency())
holder := strings.TrimSpace(card.Cardholder)
meta := cloneMetadata(payment.Metadata)
var (
state *mntxv1.CardPayoutState
)
if token := strings.TrimSpace(card.Token); token != "" {
req := &mntxv1.CardTokenPayoutRequest{
PayoutId: payoutID,
AmountMinor: minor,
Currency: currency,
CardToken: token,
CardHolder: holder,
MaskedPan: strings.TrimSpace(card.MaskedPan),
Metadata: meta,
}
resp, err := s.deps.mntx.client.CreateCardTokenPayout(ctx, req)
if err != nil {
s.logger.Warn("card token payout failed", zap.Error(err), zap.String("payment_ref", payment.PaymentRef))
return err
}
state = resp.GetPayout()
} else if pan := strings.TrimSpace(card.Pan); pan != "" {
req := &mntxv1.CardPayoutRequest{
PayoutId: payoutID,
AmountMinor: minor,
Currency: currency,
CardPan: pan,
CardExpYear: card.ExpYear,
CardExpMonth: card.ExpMonth,
CardHolder: holder,
Metadata: meta,
}
resp, err := s.deps.mntx.client.CreateCardPayout(ctx, req)
if err != nil {
s.logger.Warn("card payout failed", zap.Error(err), zap.String("payment_ref", payment.PaymentRef))
return err
}
state = resp.GetPayout()
} else {
return merrors.InvalidArgument("card payout: either token or pan must be provided")
}
if state == nil {
return merrors.Internal("card payout: missing payout state")
}
recordCardPayoutState(payment, state)
if payment.Execution == nil {
payment.Execution = &model.ExecutionRefs{}
}
if payment.Execution.CardPayoutRef == "" {
payment.Execution.CardPayoutRef = strings.TrimSpace(state.GetPayoutId())
}
s.logger.Info("card payout submitted", zap.String("payment_ref", payment.PaymentRef), zap.String("payout_id", payment.Execution.CardPayoutRef))
return nil
}
func recordCardPayoutState(payment *model.Payment, state *mntxv1.CardPayoutState) {
if payment == nil || state == nil {
return
}
if payment.CardPayout == nil {
payment.CardPayout = &model.CardPayout{}
}
payment.CardPayout.PayoutRef = strings.TrimSpace(state.GetPayoutId())
payment.CardPayout.ProviderPaymentID = strings.TrimSpace(state.GetProviderPaymentId())
payment.CardPayout.Status = state.GetStatus().String()
payment.CardPayout.FailureReason = strings.TrimSpace(state.GetProviderMessage())
payment.CardPayout.ProviderCode = strings.TrimSpace(state.GetProviderCode())
if payment.CardPayout.CardCountry == "" && payment.Intent.Destination.Card != nil {
payment.CardPayout.CardCountry = strings.TrimSpace(payment.Intent.Destination.Card.Country)
}
if payment.CardPayout.MaskedPan == "" && payment.Intent.Destination.Card != nil {
payment.CardPayout.MaskedPan = strings.TrimSpace(payment.Intent.Destination.Card.MaskedPan)
}
payment.CardPayout.GatewayReference = strings.TrimSpace(state.GetPayoutId())
}
func applyCardPayoutUpdate(payment *model.Payment, payout *mntxv1.CardPayoutState) {
if payment == nil || payout == nil {
return
}
recordCardPayoutState(payment, payout)
if payment.Execution == nil {
payment.Execution = &model.ExecutionRefs{}
}
if payment.Execution.CardPayoutRef == "" {
payment.Execution.CardPayoutRef = strings.TrimSpace(payout.GetPayoutId())
}
payment.State = mapMntxStatusToState(payout.GetStatus())
switch payout.GetStatus() {
case mntxv1.PayoutStatus_PAYOUT_STATUS_PROCESSED:
payment.FailureCode = model.PaymentFailureCodeUnspecified
payment.FailureReason = ""
case mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED:
payment.FailureCode = model.PaymentFailureCodePolicy
payment.FailureReason = strings.TrimSpace(payout.GetProviderMessage())
default:
// leave as-is for pending/unspecified
}
}

View File

@@ -0,0 +1,83 @@
package orchestrator
import (
"context"
"time"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/mlogger"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
)
type paymentEngine interface {
EnsureRepository(ctx context.Context) error
BuildPaymentQuote(ctx context.Context, orgRef string, req *orchestratorv1.QuotePaymentRequest) (*orchestratorv1.PaymentQuote, time.Time, error)
ResolvePaymentQuote(ctx context.Context, in quoteResolutionInput) (*orchestratorv1.PaymentQuote, error)
ExecutePayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment, quote *orchestratorv1.PaymentQuote) error
Repository() storage.Repository
}
type defaultPaymentEngine struct {
svc *Service
}
func (e defaultPaymentEngine) EnsureRepository(ctx context.Context) error {
return e.svc.ensureRepository(ctx)
}
func (e defaultPaymentEngine) BuildPaymentQuote(ctx context.Context, orgRef string, req *orchestratorv1.QuotePaymentRequest) (*orchestratorv1.PaymentQuote, time.Time, error) {
return e.svc.buildPaymentQuote(ctx, orgRef, req)
}
func (e defaultPaymentEngine) ResolvePaymentQuote(ctx context.Context, in quoteResolutionInput) (*orchestratorv1.PaymentQuote, error) {
return e.svc.resolvePaymentQuote(ctx, in)
}
func (e defaultPaymentEngine) ExecutePayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment, quote *orchestratorv1.PaymentQuote) error {
return e.svc.executePayment(ctx, store, payment, quote)
}
func (e defaultPaymentEngine) Repository() storage.Repository {
return e.svc.storage
}
type paymentCommandFactory struct {
engine paymentEngine
logger mlogger.Logger
}
func newPaymentCommandFactory(engine paymentEngine, logger mlogger.Logger) *paymentCommandFactory {
return &paymentCommandFactory{
engine: engine,
logger: logger.Named("commands"),
}
}
func (f *paymentCommandFactory) QuotePayment() *quotePaymentCommand {
return &quotePaymentCommand{
engine: f.engine,
logger: f.logger.Named("quote_payment"),
}
}
func (f *paymentCommandFactory) InitiatePayment() *initiatePaymentCommand {
return &initiatePaymentCommand{
engine: f.engine,
logger: f.logger.Named("initiate_payment"),
}
}
func (f *paymentCommandFactory) CancelPayment() *cancelPaymentCommand {
return &cancelPaymentCommand{
engine: f.engine,
logger: f.logger.Named("cancel_payment"),
}
}
func (f *paymentCommandFactory) InitiateConversion() *initiateConversionCommand {
return &initiateConversionCommand{
engine: f.engine,
logger: f.logger.Named("initiate_conversion"),
}
}

View File

@@ -67,6 +67,19 @@ func endpointFromProto(src *orchestratorv1.PaymentEndpoint) model.PaymentEndpoin
}
return result
}
if card := src.GetCard(); card != nil {
result.Type = model.EndpointTypeCard
result.Card = &model.CardEndpoint{
Pan: strings.TrimSpace(card.GetPan()),
Token: strings.TrimSpace(card.GetToken()),
Cardholder: strings.TrimSpace(card.GetCardholderName()),
ExpMonth: card.GetExpMonth(),
ExpYear: card.GetExpYear(),
Country: strings.TrimSpace(card.GetCountry()),
MaskedPan: strings.TrimSpace(card.GetMaskedPan()),
}
return result
}
return result
}
@@ -116,6 +129,18 @@ func toProtoPayment(src *model.Payment) *orchestratorv1.Payment {
Execution: protoExecutionFromModel(src.Execution),
Metadata: cloneMetadata(src.Metadata),
}
if src.CardPayout != nil {
payment.CardPayout = &orchestratorv1.CardPayout{
PayoutRef: src.CardPayout.PayoutRef,
ProviderPaymentId: src.CardPayout.ProviderPaymentID,
Status: src.CardPayout.Status,
FailureReason: src.CardPayout.FailureReason,
CardCountry: src.CardPayout.CardCountry,
MaskedPan: src.CardPayout.MaskedPan,
ProviderCode: src.CardPayout.ProviderCode,
GatewayReference: src.CardPayout.GatewayReference,
}
}
if src.CreatedAt.IsZero() {
payment.CreatedAt = timestamppb.New(time.Now().UTC())
} else {
@@ -176,6 +201,23 @@ func protoEndpointFromModel(src model.PaymentEndpoint) *orchestratorv1.PaymentEn
},
}
}
case model.EndpointTypeCard:
if src.Card != nil {
card := &orchestratorv1.CardEndpoint{
CardholderName: src.Card.Cardholder,
ExpMonth: src.Card.ExpMonth,
ExpYear: src.Card.ExpYear,
Country: src.Card.Country,
MaskedPan: src.Card.MaskedPan,
}
if pan := strings.TrimSpace(src.Card.Pan); pan != "" {
card.Card = &orchestratorv1.CardEndpoint_Pan{Pan: pan}
}
if token := strings.TrimSpace(src.Card.Token); token != "" {
card.Card = &orchestratorv1.CardEndpoint_Token{Token: token}
}
endpoint.Endpoint = &orchestratorv1.PaymentEndpoint_Card{Card: card}
}
default:
// leave unspecified
}
@@ -205,6 +247,8 @@ func protoExecutionFromModel(src *model.ExecutionRefs) *orchestratorv1.Execution
CreditEntryRef: src.CreditEntryRef,
FxEntryRef: src.FXEntryRef,
ChainTransferRef: src.ChainTransferRef,
CardPayoutRef: src.CardPayoutRef,
FeeTransferRef: src.FeeTransferRef,
}
}
@@ -402,6 +446,18 @@ func applyProtoPaymentToModel(src *orchestratorv1.Payment, dst *model.Payment) e
dst.Metadata = cloneMetadata(src.GetMetadata())
dst.LastQuote = quoteSnapshotToModel(src.GetLastQuote())
dst.Execution = executionFromProto(src.GetExecution())
if src.GetCardPayout() != nil {
dst.CardPayout = &model.CardPayout{
PayoutRef: strings.TrimSpace(src.GetCardPayout().GetPayoutRef()),
ProviderPaymentID: strings.TrimSpace(src.GetCardPayout().GetProviderPaymentId()),
Status: strings.TrimSpace(src.GetCardPayout().GetStatus()),
FailureReason: strings.TrimSpace(src.GetCardPayout().GetFailureReason()),
CardCountry: strings.TrimSpace(src.GetCardPayout().GetCardCountry()),
MaskedPan: strings.TrimSpace(src.GetCardPayout().GetMaskedPan()),
ProviderCode: strings.TrimSpace(src.GetCardPayout().GetProviderCode()),
GatewayReference: strings.TrimSpace(src.GetCardPayout().GetGatewayReference()),
}
}
return nil
}
@@ -414,6 +470,8 @@ func executionFromProto(src *orchestratorv1.ExecutionRefs) *model.ExecutionRefs
CreditEntryRef: strings.TrimSpace(src.GetCreditEntryRef()),
FXEntryRef: strings.TrimSpace(src.GetFxEntryRef()),
ChainTransferRef: strings.TrimSpace(src.GetChainTransferRef()),
CardPayoutRef: strings.TrimSpace(src.GetCardPayoutRef()),
FeeTransferRef: strings.TrimSpace(src.GetFeeTransferRef()),
}
}

View File

@@ -0,0 +1,69 @@
package orchestrator
import (
"testing"
"github.com/tech/sendico/payments/orchestrator/storage/model"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
)
func TestEndpointFromProtoCard(t *testing.T) {
protoEndpoint := &orchestratorv1.PaymentEndpoint{
Endpoint: &orchestratorv1.PaymentEndpoint_Card{
Card: &orchestratorv1.CardEndpoint{
Card: &orchestratorv1.CardEndpoint_Pan{Pan: " 411111 "},
CardholderName: " Jane Doe ",
ExpMonth: 12,
ExpYear: 2030,
Country: " US ",
MaskedPan: " ****1111 ",
},
},
Metadata: map[string]string{"k": "v"},
}
modelEndpoint := endpointFromProto(protoEndpoint)
if modelEndpoint.Type != model.EndpointTypeCard {
t.Fatalf("expected card type, got %s", modelEndpoint.Type)
}
if modelEndpoint.Card == nil {
t.Fatalf("card payload missing")
}
if modelEndpoint.Card.Pan != "411111" || modelEndpoint.Card.Cardholder != "Jane Doe" || modelEndpoint.Card.Country != "US" || modelEndpoint.Card.MaskedPan != "****1111" {
t.Fatalf("card payload not trimmed as expected: %#v", modelEndpoint.Card)
}
if modelEndpoint.Metadata["k"] != "v" {
t.Fatalf("metadata not preserved")
}
}
func TestProtoEndpointFromModelCard(t *testing.T) {
modelEndpoint := model.PaymentEndpoint{
Type: model.EndpointTypeCard,
Card: &model.CardEndpoint{
Token: "tok_123",
Cardholder: "Jane",
ExpMonth: 1,
ExpYear: 2028,
Country: "GB",
MaskedPan: "****1234",
},
Metadata: map[string]string{"k": "v"},
}
protoEndpoint := protoEndpointFromModel(modelEndpoint)
card := protoEndpoint.GetCard()
if card == nil {
t.Fatalf("card payload missing in proto")
}
token, ok := card.Card.(*orchestratorv1.CardEndpoint_Token)
if !ok || token.Token != "tok_123" {
t.Fatalf("expected token payload, got %T %#v", card.Card, card.Card)
}
if card.GetCardholderName() != "Jane" || card.GetCountry() != "GB" || card.GetMaskedPan() != "****1234" {
t.Fatalf("card details mismatch: %#v", card)
}
if protoEndpoint.GetMetadata()["k"] != "v" {
t.Fatalf("metadata not preserved in proto endpoint")
}
}

View File

@@ -0,0 +1,275 @@
package orchestrator
import (
"context"
"errors"
"strings"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/api/routers/gsresponse"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
)
type quotePaymentCommand struct {
engine paymentEngine
logger mlogger.Logger
}
func (h *quotePaymentCommand) Execute(ctx context.Context, req *orchestratorv1.QuotePaymentRequest) gsresponse.Responder[orchestratorv1.QuotePaymentResponse] {
if err := h.engine.EnsureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
orgRef, orgID, err := validateMetaAndOrgRef(req.GetMeta())
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if err := requireNonNilIntent(req.GetIntent()); err != nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
intent := req.GetIntent()
quote, expiresAt, err := h.engine.BuildPaymentQuote(ctx, orgRef, req)
if err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if !req.GetPreviewOnly() {
quotesStore, err := ensureQuotesStore(h.engine.Repository())
if err != nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
quoteRef := primitive.NewObjectID().Hex()
quote.QuoteRef = quoteRef
record := &model.PaymentQuoteRecord{
QuoteRef: quoteRef,
Intent: intentFromProto(intent),
Quote: quoteSnapshotToModel(quote),
ExpiresAt: expiresAt,
}
record.SetID(primitive.NewObjectID())
record.SetOrganizationRef(orgID)
if err := quotesStore.Create(ctx, record); err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info("stored payment quote", zap.String("quote_ref", quoteRef), zap.String("org_ref", orgID.Hex()))
}
return gsresponse.Success(&orchestratorv1.QuotePaymentResponse{Quote: quote})
}
type initiatePaymentCommand struct {
engine paymentEngine
logger mlogger.Logger
}
func (h *initiatePaymentCommand) Execute(ctx context.Context, req *orchestratorv1.InitiatePaymentRequest) gsresponse.Responder[orchestratorv1.InitiatePaymentResponse] {
if err := h.engine.EnsureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
orgRef, orgID, err := validateMetaAndOrgRef(req.GetMeta())
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
intent := req.GetIntent()
if err := requireNonNilIntent(intent); err != nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
idempotencyKey, err := requireIdempotencyKey(req.GetIdempotencyKey())
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
store, err := ensurePaymentsStore(h.engine.Repository())
if err != nil {
return gsresponse.Unavailable[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if existing, err := getPaymentByIdempotencyKey(ctx, store, orgID, idempotencyKey); err == nil && existing != nil {
h.logger.Debug("idempotent payment request reused", zap.String("payment_ref", existing.PaymentRef), zap.String("org_ref", orgID.Hex()))
return gsresponse.Success(&orchestratorv1.InitiatePaymentResponse{Payment: toProtoPayment(existing)})
} else if err != nil && !errors.Is(err, storage.ErrPaymentNotFound) {
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
quoteSnapshot, err := h.engine.ResolvePaymentQuote(ctx, quoteResolutionInput{
OrgRef: orgRef,
OrgID: orgID,
Meta: req.GetMeta(),
Intent: intent,
QuoteRef: req.GetQuoteRef(),
FeeQuoteToken: req.GetFeeQuoteToken(),
IdempotencyKey: req.GetIdempotencyKey(),
})
if err != nil {
if qerr, ok := err.(quoteResolutionError); ok {
switch qerr.code {
case "quote_not_found":
return gsresponse.FailedPrecondition[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, qerr.code, qerr.err)
case "quote_expired":
return gsresponse.FailedPrecondition[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, qerr.code, qerr.err)
case "quote_intent_mismatch":
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, qerr.err)
default:
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, qerr.err)
}
}
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if quoteSnapshot == nil {
quoteSnapshot = &orchestratorv1.PaymentQuote{}
}
entity := newPayment(orgID, intent, idempotencyKey, req.GetMetadata(), quoteSnapshot)
if err = store.Create(ctx, entity); err != nil {
if errors.Is(err, storage.ErrDuplicatePayment) {
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, merrors.DataConflict("payment already exists"))
}
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if err := h.engine.ExecutePayment(ctx, store, entity, quoteSnapshot); err != nil {
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info("payment initiated", zap.String("payment_ref", entity.PaymentRef), zap.String("org_ref", orgID.Hex()), zap.String("kind", intent.GetKind().String()))
return gsresponse.Success(&orchestratorv1.InitiatePaymentResponse{
Payment: toProtoPayment(entity),
})
}
type cancelPaymentCommand struct {
engine paymentEngine
logger mlogger.Logger
}
func (h *cancelPaymentCommand) Execute(ctx context.Context, req *orchestratorv1.CancelPaymentRequest) gsresponse.Responder[orchestratorv1.CancelPaymentResponse] {
if err := h.engine.EnsureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.CancelPaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.CancelPaymentResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
paymentRef, err := requirePaymentRef(req.GetPaymentRef())
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.CancelPaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
store, err := ensurePaymentsStore(h.engine.Repository())
if err != nil {
return gsresponse.Unavailable[orchestratorv1.CancelPaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
payment, err := store.GetByPaymentRef(ctx, paymentRef)
if err != nil {
return paymentNotFoundResponder[orchestratorv1.CancelPaymentResponse](mservice.PaymentOrchestrator, h.logger, err)
}
if payment.State != model.PaymentStateAccepted {
reason := merrors.InvalidArgument("payment cannot be cancelled in current state")
return gsresponse.FailedPrecondition[orchestratorv1.CancelPaymentResponse](h.logger, mservice.PaymentOrchestrator, "payment_not_cancellable", reason)
}
payment.State = model.PaymentStateCancelled
payment.FailureCode = model.PaymentFailureCodePolicy
payment.FailureReason = strings.TrimSpace(req.GetReason())
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.CancelPaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info("payment cancelled", zap.String("payment_ref", payment.PaymentRef), zap.String("org_ref", payment.OrganizationRef.Hex()))
return gsresponse.Success(&orchestratorv1.CancelPaymentResponse{Payment: toProtoPayment(payment)})
}
type initiateConversionCommand struct {
engine paymentEngine
logger mlogger.Logger
}
func (h *initiateConversionCommand) Execute(ctx context.Context, req *orchestratorv1.InitiateConversionRequest) gsresponse.Responder[orchestratorv1.InitiateConversionResponse] {
if err := h.engine.EnsureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
orgRef, orgID, err := validateMetaAndOrgRef(req.GetMeta())
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
idempotencyKey, err := requireIdempotencyKey(req.GetIdempotencyKey())
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req.GetSource() == nil || req.GetSource().GetLedger() == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("source ledger endpoint is required"))
}
if req.GetDestination() == nil || req.GetDestination().GetLedger() == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("destination ledger endpoint is required"))
}
fxIntent := req.GetFx()
if fxIntent == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("fx intent is required"))
}
store, err := ensurePaymentsStore(h.engine.Repository())
if err != nil {
return gsresponse.Unavailable[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if existing, err := getPaymentByIdempotencyKey(ctx, store, orgID, idempotencyKey); err == nil && existing != nil {
h.logger.Debug("idempotent conversion request reused", zap.String("payment_ref", existing.PaymentRef), zap.String("org_ref", orgID.Hex()))
return gsresponse.Success(&orchestratorv1.InitiateConversionResponse{Conversion: toProtoPayment(existing)})
} else if err != nil && !errors.Is(err, storage.ErrPaymentNotFound) {
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
amount, err := conversionAmountFromMetadata(req.GetMetadata(), fxIntent)
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
intentProto := &orchestratorv1.PaymentIntent{
Kind: orchestratorv1.PaymentKind_PAYMENT_KIND_FX_CONVERSION,
Source: req.GetSource(),
Destination: req.GetDestination(),
Amount: amount,
RequiresFx: true,
Fx: fxIntent,
FeePolicy: req.GetFeePolicy(),
}
quote, _, err := h.engine.BuildPaymentQuote(ctx, orgRef, &orchestratorv1.QuotePaymentRequest{
Meta: req.GetMeta(),
IdempotencyKey: req.GetIdempotencyKey(),
Intent: intentProto,
})
if err != nil {
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
entity := newPayment(orgID, intentProto, idempotencyKey, req.GetMetadata(), quote)
if err = store.Create(ctx, entity); err != nil {
if errors.Is(err, storage.ErrDuplicatePayment) {
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, merrors.DataConflict("payment already exists"))
}
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if err := h.engine.ExecutePayment(ctx, store, entity, quote); err != nil {
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info("conversion initiated", zap.String("payment_ref", entity.PaymentRef), zap.String("org_ref", orgID.Hex()))
return gsresponse.Success(&orchestratorv1.InitiateConversionResponse{
Conversion: toProtoPayment(entity),
})
}

View File

@@ -0,0 +1,139 @@
package orchestrator
import (
"context"
"strings"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/api/routers/gsresponse"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.uber.org/zap"
)
type paymentEventHandler struct {
repo storage.Repository
ensureRepo func(ctx context.Context) error
logger mlogger.Logger
}
func newPaymentEventHandler(repo storage.Repository, ensure func(ctx context.Context) error, logger mlogger.Logger) *paymentEventHandler {
return &paymentEventHandler{
repo: repo,
ensureRepo: ensure,
logger: logger,
}
}
func (h *paymentEventHandler) processTransferUpdate(ctx context.Context, req *orchestratorv1.ProcessTransferUpdateRequest) gsresponse.Responder[orchestratorv1.ProcessTransferUpdateResponse] {
if err := h.ensureRepo(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil || req.GetEvent() == nil || req.GetEvent().GetTransfer() == nil {
return gsresponse.InvalidArgument[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("transfer event is required"))
}
transfer := req.GetEvent().GetTransfer()
transferRef := strings.TrimSpace(transfer.GetTransferRef())
if transferRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("transfer_ref is required"))
}
store := h.repo.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
payment, err := store.GetByChainTransferRef(ctx, transferRef)
if err != nil {
return paymentNotFoundResponder[orchestratorv1.ProcessTransferUpdateResponse](mservice.PaymentOrchestrator, h.logger, err)
}
applyTransferStatus(req.GetEvent(), payment)
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessTransferUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info("transfer update applied", zap.String("payment_ref", payment.PaymentRef), zap.String("transfer_ref", transferRef), zap.Any("state", payment.State))
return gsresponse.Success(&orchestratorv1.ProcessTransferUpdateResponse{Payment: toProtoPayment(payment)})
}
func (h *paymentEventHandler) processDepositObserved(ctx context.Context, req *orchestratorv1.ProcessDepositObservedRequest) gsresponse.Responder[orchestratorv1.ProcessDepositObservedResponse] {
if err := h.ensureRepo(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.ProcessDepositObservedResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil || req.GetEvent() == nil {
return gsresponse.InvalidArgument[orchestratorv1.ProcessDepositObservedResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("deposit event is required"))
}
event := req.GetEvent()
walletRef := strings.TrimSpace(event.GetWalletRef())
if walletRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.ProcessDepositObservedResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("wallet_ref is required"))
}
store := h.repo.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.ProcessDepositObservedResponse](h.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
filter := &model.PaymentFilter{
States: []model.PaymentState{model.PaymentStateSubmitted, model.PaymentStateFundsReserved},
DestinationRef: walletRef,
}
result, err := store.List(ctx, filter)
if err != nil {
return gsresponse.Auto[orchestratorv1.ProcessDepositObservedResponse](h.logger, mservice.PaymentOrchestrator, err)
}
for _, payment := range result.Items {
if payment.Intent.Destination.Type != model.EndpointTypeManagedWallet {
continue
}
if !moneyEquals(payment.Intent.Amount, event.GetAmount()) {
continue
}
payment.State = model.PaymentStateSettled
payment.FailureCode = model.PaymentFailureCodeUnspecified
payment.FailureReason = ""
if payment.Execution == nil {
payment.Execution = &model.ExecutionRefs{}
}
if payment.Execution.ChainTransferRef == "" {
payment.Execution.ChainTransferRef = strings.TrimSpace(event.GetTransactionHash())
}
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessDepositObservedResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info("deposit observed matched payment", zap.String("payment_ref", payment.PaymentRef), zap.String("wallet_ref", walletRef))
return gsresponse.Success(&orchestratorv1.ProcessDepositObservedResponse{Payment: toProtoPayment(payment)})
}
return gsresponse.Success(&orchestratorv1.ProcessDepositObservedResponse{})
}
func (h *paymentEventHandler) processCardPayoutUpdate(ctx context.Context, req *orchestratorv1.ProcessCardPayoutUpdateRequest) gsresponse.Responder[orchestratorv1.ProcessCardPayoutUpdateResponse] {
if err := h.ensureRepo(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil || req.GetEvent() == nil || req.GetEvent().GetPayout() == nil {
return gsresponse.InvalidArgument[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("event is required"))
}
payout := req.GetEvent().GetPayout()
paymentRef := strings.TrimSpace(payout.GetPayoutId())
if paymentRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("payout_id is required"))
}
store := h.repo.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
payment, err := store.GetByPaymentRef(ctx, paymentRef)
if err != nil {
return paymentNotFoundResponder[orchestratorv1.ProcessCardPayoutUpdateResponse](mservice.PaymentOrchestrator, h.logger, err)
}
applyCardPayoutUpdate(payment, payout)
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessCardPayoutUpdateResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info("card payout update applied", zap.String("payment_ref", payment.PaymentRef), zap.String("payout_id", paymentRef), zap.Any("state", payment.State))
return gsresponse.Success(&orchestratorv1.ProcessCardPayoutUpdateResponse{
Payment: toProtoPayment(payment),
})
}

View File

@@ -0,0 +1,80 @@
package orchestrator
import (
"context"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/pkg/api/routers/gsresponse"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
paginationv1 "github.com/tech/sendico/pkg/proto/common/pagination/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.uber.org/zap"
)
type paymentQueryHandler struct {
repo storage.Repository
ensureRepo func(ctx context.Context) error
logger mlogger.Logger
}
func newPaymentQueryHandler(repo storage.Repository, ensure func(ctx context.Context) error, logger mlogger.Logger) *paymentQueryHandler {
return &paymentQueryHandler{
repo: repo,
ensureRepo: ensure,
logger: logger,
}
}
func (h *paymentQueryHandler) getPayment(ctx context.Context, req *orchestratorv1.GetPaymentRequest) gsresponse.Responder[orchestratorv1.GetPaymentResponse] {
if err := h.ensureRepo(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.GetPaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.GetPaymentResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
paymentRef, err := requirePaymentRef(req.GetPaymentRef())
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.GetPaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
store, err := ensurePaymentsStore(h.repo)
if err != nil {
return gsresponse.Unavailable[orchestratorv1.GetPaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
entity, err := store.GetByPaymentRef(ctx, paymentRef)
if err != nil {
return paymentNotFoundResponder[orchestratorv1.GetPaymentResponse](mservice.PaymentOrchestrator, h.logger, err)
}
h.logger.Debug("payment fetched", zap.String("payment_ref", paymentRef))
return gsresponse.Success(&orchestratorv1.GetPaymentResponse{Payment: toProtoPayment(entity)})
}
func (h *paymentQueryHandler) listPayments(ctx context.Context, req *orchestratorv1.ListPaymentsRequest) gsresponse.Responder[orchestratorv1.ListPaymentsResponse] {
if err := h.ensureRepo(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.ListPaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.ListPaymentsResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
store, err := ensurePaymentsStore(h.repo)
if err != nil {
return gsresponse.Unavailable[orchestratorv1.ListPaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
filter := filterFromProto(req)
result, err := store.List(ctx, filter)
if err != nil {
return gsresponse.Auto[orchestratorv1.ListPaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
resp := &orchestratorv1.ListPaymentsResponse{
Page: &paginationv1.CursorPageResponse{
NextCursor: result.NextCursor,
},
}
resp.Payments = make([]*orchestratorv1.Payment, 0, len(result.Items))
for _, item := range result.Items {
resp.Payments = append(resp.Payments, toProtoPayment(item))
}
h.logger.Debug("payments listed", zap.Int("count", len(resp.Payments)), zap.String("next_cursor", resp.GetPage().GetNextCursor()))
return gsresponse.Success(resp)
}

View File

@@ -4,9 +4,11 @@ import (
"context"
"time"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/api/routers/gsresponse"
"github.com/tech/sendico/pkg/mservice"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
)
@@ -51,10 +53,17 @@ func shouldEstimateNetworkFee(intent *orchestratorv1.PaymentIntent) bool {
if intent == nil {
return false
}
dest := intent.GetDestination()
if dest == nil {
return false
}
if dest.GetCard() != nil {
return false
}
if intent.GetKind() == orchestratorv1.PaymentKind_PAYMENT_KIND_PAYOUT {
return true
}
if intent.GetDestination().GetManagedWallet() != nil || intent.GetDestination().GetExternalChain() != nil {
if dest.GetManagedWallet() != nil || dest.GetExternalChain() != nil {
return true
}
return false
@@ -69,3 +78,16 @@ func shouldRequestFX(intent *orchestratorv1.PaymentIntent) bool {
}
return intent.GetFx() != nil && intent.GetFx().GetPair() != nil
}
func mapMntxStatusToState(status mntxv1.PayoutStatus) model.PaymentState {
switch status {
case mntxv1.PayoutStatus_PAYOUT_STATUS_PROCESSED:
return model.PaymentStateSettled
case mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED:
return model.PaymentStateFailed
case mntxv1.PayoutStatus_PAYOUT_STATUS_PENDING:
return model.PaymentStateSubmitted
default:
return model.PaymentStateUnspecified
}
}

View File

@@ -0,0 +1,51 @@
package orchestrator
import (
"testing"
"github.com/tech/sendico/payments/orchestrator/storage/model"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
)
func TestShouldEstimateNetworkFeeSkipsCard(t *testing.T) {
intent := &orchestratorv1.PaymentIntent{
Kind: orchestratorv1.PaymentKind_PAYMENT_KIND_PAYOUT,
Destination: &orchestratorv1.PaymentEndpoint{
Endpoint: &orchestratorv1.PaymentEndpoint_Card{
Card: &orchestratorv1.CardEndpoint{},
},
},
}
if shouldEstimateNetworkFee(intent) {
t.Fatalf("expected network fee estimation to be skipped for card payouts")
}
}
func TestShouldEstimateNetworkFeeManagedWallet(t *testing.T) {
intent := &orchestratorv1.PaymentIntent{
Destination: &orchestratorv1.PaymentEndpoint{
Endpoint: &orchestratorv1.PaymentEndpoint_ManagedWallet{
ManagedWallet: &orchestratorv1.ManagedWalletEndpoint{ManagedWalletRef: "mw"},
},
},
}
if !shouldEstimateNetworkFee(intent) {
t.Fatalf("expected network fee estimation when destination is managed wallet")
}
}
func TestMapMntxStatusToState(t *testing.T) {
if mapMntxStatusToState(mntxv1.PayoutStatus_PAYOUT_STATUS_PROCESSED) != model.PaymentStateSettled {
t.Fatalf("processed should map to settled")
}
if mapMntxStatusToState(mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED) != model.PaymentStateFailed {
t.Fatalf("failed should map to failed")
}
if mapMntxStatusToState(mntxv1.PayoutStatus_PAYOUT_STATUS_PENDING) != model.PaymentStateSubmitted {
t.Fatalf("pending should map to submitted")
}
if mapMntxStatusToState(mntxv1.PayoutStatus_PAYOUT_STATUS_UNSPECIFIED) != model.PaymentStateUnspecified {
t.Fatalf("unspecified should map to unspecified")
}
}

View File

@@ -1,10 +1,12 @@
package orchestrator
import (
"strings"
"time"
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
clockpkg "github.com/tech/sendico/pkg/clock"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
@@ -46,10 +48,24 @@ func (o oracleDependency) available() bool {
return o.client != nil
}
type mntxDependency struct {
client mntxclient.Client
}
func (m mntxDependency) available() bool {
return m.client != nil
}
// CardGatewayRoute maps a gateway to its funding and fee destinations (addresses).
type CardGatewayRoute struct {
FundingAddress string
FeeAddress string
}
// WithFeeEngine wires the fee engine client.
func WithFeeEngine(client feesv1.FeeEngineClient, timeout time.Duration) Option {
return func(s *Service) {
s.fees = feesDependency{
s.deps.fees = feesDependency{
client: client,
timeout: timeout,
}
@@ -59,21 +75,41 @@ func WithFeeEngine(client feesv1.FeeEngineClient, timeout time.Duration) Option
// WithLedgerClient wires the ledger client.
func WithLedgerClient(client ledgerclient.Client) Option {
return func(s *Service) {
s.ledger = ledgerDependency{client: client}
s.deps.ledger = ledgerDependency{client: client}
}
}
// WithChainGatewayClient wires the chain gateway client.
func WithChainGatewayClient(client chainclient.Client) Option {
return func(s *Service) {
s.gateway = gatewayDependency{client: client}
s.deps.gateway = gatewayDependency{client: client}
}
}
// WithOracleClient wires the FX oracle client.
func WithOracleClient(client oracleclient.Client) Option {
return func(s *Service) {
s.oracle = oracleDependency{client: client}
s.deps.oracle = oracleDependency{client: client}
}
}
// WithMntxGateway wires the Monetix gateway client.
func WithMntxGateway(client mntxclient.Client) Option {
return func(s *Service) {
s.deps.mntx = mntxDependency{client: client}
}
}
// WithCardGatewayRoutes configures funding/fee wallet routing per gateway.
func WithCardGatewayRoutes(routes map[string]CardGatewayRoute) Option {
return func(s *Service) {
if len(routes) == 0 {
return
}
s.deps.cardRoutes = make(map[string]CardGatewayRoute, len(routes))
for k, v := range routes {
s.deps.cardRoutes[strings.ToLower(strings.TrimSpace(k))] = v
}
}
}

View File

@@ -3,215 +3,30 @@ package orchestrator
import (
"context"
"strings"
"time"
oracleclient "github.com/tech/sendico/fx/oracle/client"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/merrors"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
"github.com/tech/sendico/pkg/mlogger"
fxv1 "github.com/tech/sendico/pkg/proto/common/fx/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
oraclev1 "github.com/tech/sendico/pkg/proto/oracle/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)
func (s *Service) buildPaymentQuote(ctx context.Context, orgRef string, req *orchestratorv1.QuotePaymentRequest) (*orchestratorv1.PaymentQuote, time.Time, error) {
intent := req.GetIntent()
amount := intent.GetAmount()
fxSide := fxv1.Side_SIDE_UNSPECIFIED
if intent.GetFx() != nil {
fxSide = intent.GetFx().GetSide()
}
var fxQuote *oraclev1.Quote
var err error
if shouldRequestFX(intent) {
fxQuote, err = s.requestFXQuote(ctx, orgRef, req)
if err != nil {
return nil, time.Time{}, err
}
}
payAmount, settlementAmountBeforeFees := resolveTradeAmounts(amount, fxQuote, fxSide)
feeBaseAmount := payAmount
if feeBaseAmount == nil {
feeBaseAmount = cloneMoney(amount)
}
feeQuote, err := s.quoteFees(ctx, orgRef, req, feeBaseAmount)
if err != nil {
return nil, time.Time{}, err
}
feeCurrency := ""
if feeBaseAmount != nil {
feeCurrency = feeBaseAmount.GetCurrency()
} else if amount != nil {
feeCurrency = amount.GetCurrency()
}
feeTotal := extractFeeTotal(feeQuote.GetLines(), feeCurrency)
var networkFee *chainv1.EstimateTransferFeeResponse
if shouldEstimateNetworkFee(intent) {
networkFee, err = s.estimateNetworkFee(ctx, intent)
if err != nil {
return nil, time.Time{}, err
}
}
debitAmount, settlementAmount := computeAggregates(payAmount, settlementAmountBeforeFees, feeTotal, networkFee, fxQuote)
quote := &orchestratorv1.PaymentQuote{
DebitAmount: debitAmount,
ExpectedSettlementAmount: settlementAmount,
ExpectedFeeTotal: feeTotal,
FeeLines: cloneFeeLines(feeQuote.GetLines()),
FeeRules: cloneFeeRules(feeQuote.GetApplied()),
FxQuote: fxQuote,
NetworkFee: networkFee,
FeeQuoteToken: feeQuote.GetFeeQuoteToken(),
}
expiresAt := quoteExpiry(s.clock.Now(), feeQuote, fxQuote)
return quote, expiresAt, nil
type paymentExecutor struct {
deps *serviceDependencies
logger mlogger.Logger
svc *Service
}
func (s *Service) quoteFees(ctx context.Context, orgRef string, req *orchestratorv1.QuotePaymentRequest, baseAmount *moneyv1.Money) (*feesv1.PrecomputeFeesResponse, error) {
if !s.fees.available() {
return &feesv1.PrecomputeFeesResponse{}, nil
}
intent := req.GetIntent()
amount := cloneMoney(baseAmount)
if amount == nil {
amount = cloneMoney(intent.GetAmount())
}
feeIntent := &feesv1.Intent{
Trigger: triggerFromKind(intent.GetKind(), intent.GetRequiresFx()),
BaseAmount: amount,
BookedAt: timestamppb.New(s.clock.Now()),
OriginType: "payments.orchestrator.quote",
OriginRef: strings.TrimSpace(req.GetIdempotencyKey()),
Attributes: cloneMetadata(intent.GetAttributes()),
}
timeout := req.GetMeta().GetTrace()
ctxTimeout, cancel := s.withTimeout(ctx, s.fees.timeout)
defer cancel()
resp, err := s.fees.client.PrecomputeFees(ctxTimeout, &feesv1.PrecomputeFeesRequest{
Meta: &feesv1.RequestMeta{
OrganizationRef: orgRef,
Trace: timeout,
},
Intent: feeIntent,
TtlMs: defaultFeeQuoteTTLMillis,
})
if err != nil {
s.logger.Error("fees precompute failed", zap.Error(err))
return nil, merrors.Internal("fees_precompute_failed")
}
return resp, nil
func newPaymentExecutor(deps *serviceDependencies, logger mlogger.Logger, svc *Service) *paymentExecutor {
return &paymentExecutor{deps: deps, logger: logger, svc: svc}
}
func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1.PaymentIntent) (*chainv1.EstimateTransferFeeResponse, error) {
if !s.gateway.available() {
return nil, nil
}
req := &chainv1.EstimateTransferFeeRequest{
Amount: cloneMoney(intent.GetAmount()),
}
if src := intent.GetSource().GetManagedWallet(); src != nil {
req.SourceWalletRef = strings.TrimSpace(src.GetManagedWalletRef())
}
if dst := intent.GetDestination().GetManagedWallet(); dst != nil {
req.Destination = &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: strings.TrimSpace(dst.GetManagedWalletRef())},
}
}
if dst := intent.GetDestination().GetExternalChain(); dst != nil {
req.Destination = &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ExternalAddress{ExternalAddress: strings.TrimSpace(dst.GetAddress())},
Memo: strings.TrimSpace(dst.GetMemo()),
}
req.Asset = dst.GetAsset()
}
if req.Asset == nil {
if src := intent.GetSource().GetManagedWallet(); src != nil {
req.Asset = src.GetAsset()
}
}
resp, err := s.gateway.client.EstimateTransferFee(ctx, req)
if err != nil {
s.logger.Error("chain gateway fee estimation failed", zap.Error(err))
return nil, merrors.Internal("chain_gateway_fee_estimation_failed")
}
return resp, nil
}
func (s *Service) requestFXQuote(ctx context.Context, orgRef string, req *orchestratorv1.QuotePaymentRequest) (*oraclev1.Quote, error) {
if !s.oracle.available() {
return nil, nil
}
intent := req.GetIntent()
meta := req.GetMeta()
fxIntent := intent.GetFx()
if fxIntent == nil {
return nil, nil
}
ttl := fxIntent.GetTtlMs()
if ttl <= 0 {
ttl = defaultOracleTTLMillis
}
params := oracleclient.GetQuoteParams{
Meta: oracleclient.RequestMeta{
OrganizationRef: orgRef,
Trace: meta.GetTrace(),
},
Pair: fxIntent.GetPair(),
Side: fxIntent.GetSide(),
Firm: fxIntent.GetFirm(),
TTL: time.Duration(ttl) * time.Millisecond,
PreferredProvider: strings.TrimSpace(fxIntent.GetPreferredProvider()),
}
if fxIntent.GetMaxAgeMs() > 0 {
params.MaxAge = time.Duration(fxIntent.GetMaxAgeMs()) * time.Millisecond
}
if amount := intent.GetAmount(); amount != nil {
pair := fxIntent.GetPair()
if pair != nil {
switch {
case strings.EqualFold(amount.GetCurrency(), pair.GetBase()):
params.BaseAmount = cloneMoney(amount)
case strings.EqualFold(amount.GetCurrency(), pair.GetQuote()):
params.QuoteAmount = cloneMoney(amount)
default:
params.BaseAmount = cloneMoney(amount)
}
} else {
params.BaseAmount = cloneMoney(amount)
}
}
quote, err := s.oracle.client.GetQuote(ctx, params)
if err != nil {
s.logger.Error("fx oracle quote failed", zap.Error(err))
return nil, merrors.Internal("fx_quote_failed")
}
return quoteToProto(quote), nil
}
func (s *Service) executePayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment, quote *orchestratorv1.PaymentQuote) error {
func (p *paymentExecutor) executePayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment, quote *orchestratorv1.PaymentQuote) error {
if store == nil {
return errStorageUnavailable
}
@@ -219,6 +34,7 @@ func (s *Service) executePayment(ctx context.Context, store storage.PaymentsStor
charges := ledgerChargesFromFeeLines(quote.GetFeeLines())
ledgerNeeded := requiresLedger(payment)
chainNeeded := requiresChain(payment)
cardNeeded := payment.Intent.Destination.Type == model.EndpointTypeCard
exec := payment.Execution
if exec == nil {
@@ -226,25 +42,26 @@ func (s *Service) executePayment(ctx context.Context, store storage.PaymentsStor
}
if ledgerNeeded {
if !s.ledger.available() {
return s.failPayment(ctx, store, payment, model.PaymentFailureCodeLedger, "ledger_client_unavailable", merrors.Internal("ledger_client_unavailable"))
if !p.deps.ledger.available() {
return p.failPayment(ctx, store, payment, model.PaymentFailureCodeLedger, "ledger_client_unavailable", merrors.Internal("ledger_client_unavailable"))
}
if err := s.performLedgerOperation(ctx, payment, quote, charges); err != nil {
return s.failPayment(ctx, store, payment, model.PaymentFailureCodeLedger, strings.TrimSpace(err.Error()), err)
if err := p.performLedgerOperation(ctx, payment, quote, charges); err != nil {
return p.failPayment(ctx, store, payment, model.PaymentFailureCodeLedger, strings.TrimSpace(err.Error()), err)
}
payment.State = model.PaymentStateFundsReserved
if err := s.persistPayment(ctx, store, payment); err != nil {
if err := p.persistPayment(ctx, store, payment); err != nil {
return err
}
p.logger.Info("ledger reservation completed", zap.String("payment_ref", payment.PaymentRef))
}
if chainNeeded {
if !s.gateway.available() {
return s.failPayment(ctx, store, payment, model.PaymentFailureCodeChain, "chain_client_unavailable", merrors.Internal("chain_client_unavailable"))
if !p.deps.gateway.available() {
return p.failPayment(ctx, store, payment, model.PaymentFailureCodeChain, "chain_client_unavailable", merrors.Internal("chain_client_unavailable"))
}
resp, err := s.submitChainTransfer(ctx, payment, quote)
resp, err := p.submitChainTransfer(ctx, payment, quote)
if err != nil {
return s.failPayment(ctx, store, payment, model.PaymentFailureCodeChain, strings.TrimSpace(err.Error()), err)
return p.failPayment(ctx, store, payment, model.PaymentFailureCodeChain, strings.TrimSpace(err.Error()), err)
}
exec = payment.Execution
if exec == nil {
@@ -255,17 +72,42 @@ func (s *Service) executePayment(ctx context.Context, store storage.PaymentsStor
}
payment.Execution = exec
payment.State = model.PaymentStateSubmitted
if err := s.persistPayment(ctx, store, payment); err != nil {
if err := p.persistPayment(ctx, store, payment); err != nil {
return err
}
p.logger.Info("chain transfer submitted", zap.String("payment_ref", payment.PaymentRef), zap.String("transfer_ref", exec.ChainTransferRef))
if !cardNeeded {
return nil
}
}
if cardNeeded {
if !p.deps.mntx.available() {
return p.failPayment(ctx, store, payment, model.PaymentFailureCodePolicy, "card_gateway_unavailable", merrors.Internal("card_gateway_unavailable"))
}
if err := p.svc.submitCardFundingTransfers(ctx, payment, quote); err != nil {
return p.failPayment(ctx, store, payment, model.PaymentFailureCodeChain, strings.TrimSpace(err.Error()), err)
}
if err := p.svc.submitCardPayout(ctx, payment); err != nil {
return p.failPayment(ctx, store, payment, model.PaymentFailureCodePolicy, strings.TrimSpace(err.Error()), err)
}
payment.State = model.PaymentStateSubmitted
if err := p.persistPayment(ctx, store, payment); err != nil {
return err
}
p.logger.Info("card payout submitted", zap.String("payment_ref", payment.PaymentRef), zap.String("card_payout_ref", payment.Execution.CardPayoutRef))
return nil
}
payment.State = model.PaymentStateSettled
return s.persistPayment(ctx, store, payment)
if err := p.persistPayment(ctx, store, payment); err != nil {
return err
}
p.logger.Info("payment settled without chain", zap.String("payment_ref", payment.PaymentRef))
return nil
}
func (s *Service) performLedgerOperation(ctx context.Context, payment *model.Payment, quote *orchestratorv1.PaymentQuote, charges []*ledgerv1.PostingLine) error {
func (p *paymentExecutor) performLedgerOperation(ctx context.Context, payment *model.Payment, quote *orchestratorv1.PaymentQuote, charges []*ledgerv1.PostingLine) error {
intent := payment.Intent
if payment.OrganizationRef == primitive.NilObjectID {
return merrors.InvalidArgument("ledger: organization_ref is required")
@@ -285,7 +127,7 @@ func (s *Service) performLedgerOperation(ctx context.Context, payment *model.Pay
switch intent.Kind {
case model.PaymentKindFXConversion:
if err := s.applyFX(ctx, payment, quote, charges, description, metadata, exec); err != nil {
if err := p.applyFX(ctx, payment, quote, charges, description, metadata, exec); err != nil {
return err
}
case model.PaymentKindInternalTransfer, model.PaymentKindPayout, model.PaymentKindUnspecified:
@@ -303,7 +145,7 @@ func (s *Service) performLedgerOperation(ctx context.Context, payment *model.Pay
Charges: charges,
Metadata: metadata,
}
resp, err := s.ledger.client.TransferInternal(ctx, req)
resp, err := p.deps.ledger.client.TransferInternal(ctx, req)
if err != nil {
return err
}
@@ -316,7 +158,7 @@ func (s *Service) performLedgerOperation(ctx context.Context, payment *model.Pay
return nil
}
func (s *Service) applyFX(ctx context.Context, payment *model.Payment, quote *orchestratorv1.PaymentQuote, charges []*ledgerv1.PostingLine, description string, metadata map[string]string, exec *model.ExecutionRefs) error {
func (p *paymentExecutor) applyFX(ctx context.Context, payment *model.Payment, quote *orchestratorv1.PaymentQuote, charges []*ledgerv1.PostingLine, description string, metadata map[string]string, exec *model.ExecutionRefs) error {
intent := payment.Intent
source := intent.Source.Ledger
destination := intent.Destination.Ledger
@@ -354,7 +196,7 @@ func (s *Service) applyFX(ctx context.Context, payment *model.Payment, quote *or
Charges: charges,
Metadata: metadata,
}
resp, err := s.ledger.client.ApplyFXWithCharges(ctx, req)
resp, err := p.deps.ledger.client.ApplyFXWithCharges(ctx, req)
if err != nil {
return err
}
@@ -363,7 +205,7 @@ func (s *Service) applyFX(ctx context.Context, payment *model.Payment, quote *or
return nil
}
func (s *Service) submitChainTransfer(ctx context.Context, payment *model.Payment, quote *orchestratorv1.PaymentQuote) (*chainv1.SubmitTransferResponse, error) {
func (p *paymentExecutor) submitChainTransfer(ctx context.Context, payment *model.Payment, quote *orchestratorv1.PaymentQuote) (*chainv1.SubmitTransferResponse, error) {
intent := payment.Intent
source := intent.Source.ManagedWallet
destination := intent.Destination
@@ -389,23 +231,23 @@ func (s *Service) submitChainTransfer(ctx context.Context, payment *model.Paymen
Metadata: cloneMetadata(payment.Metadata),
ClientReference: payment.PaymentRef,
}
return s.gateway.client.SubmitTransfer(ctx, req)
return p.deps.gateway.client.SubmitTransfer(ctx, req)
}
func (s *Service) persistPayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error {
func (p *paymentExecutor) persistPayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment) error {
if store == nil {
return errStorageUnavailable
}
return store.Update(ctx, payment)
}
func (s *Service) failPayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment, code model.PaymentFailureCode, reason string, err error) error {
func (p *paymentExecutor) failPayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment, code model.PaymentFailureCode, reason string, err error) error {
payment.State = model.PaymentStateFailed
payment.FailureCode = code
payment.FailureReason = strings.TrimSpace(reason)
if store != nil {
if updateErr := store.Update(ctx, payment); updateErr != nil {
s.logger.Error("failed to persist payment failure", zap.Error(updateErr), zap.String("payment_ref", payment.PaymentRef))
p.logger.Error("failed to persist payment failure", zap.Error(updateErr), zap.String("payment_ref", payment.PaymentRef))
}
}
if err != nil {
@@ -414,6 +256,21 @@ func (s *Service) failPayment(ctx context.Context, store storage.PaymentsStore,
return merrors.Internal(reason)
}
func paymentDescription(payment *model.Payment) string {
if payment == nil {
return ""
}
if val := strings.TrimSpace(payment.Intent.Attributes["description"]); val != "" {
return val
}
if payment.Metadata != nil {
if val := strings.TrimSpace(payment.Metadata["description"]); val != "" {
return val
}
}
return payment.PaymentRef
}
func resolveLedgerAccounts(intent model.PaymentIntent) (string, string, error) {
source := intent.Source.Ledger
destination := intent.Destination.Ledger
@@ -432,21 +289,6 @@ func resolveLedgerAccounts(intent model.PaymentIntent) (string, string, error) {
return strings.TrimSpace(source.LedgerAccountRef), to, nil
}
func paymentDescription(payment *model.Payment) string {
if payment == nil {
return ""
}
if val := strings.TrimSpace(payment.Intent.Attributes["description"]); val != "" {
return val
}
if payment.Metadata != nil {
if val := strings.TrimSpace(payment.Metadata["description"]); val != "" {
return val
}
}
return payment.PaymentRef
}
func requiresLedger(payment *model.Payment) bool {
if payment == nil {
return false

View File

@@ -0,0 +1,210 @@
package orchestrator
import (
"context"
"strings"
"time"
oracleclient "github.com/tech/sendico/fx/oracle/client"
"github.com/tech/sendico/pkg/merrors"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
fxv1 "github.com/tech/sendico/pkg/proto/common/fx/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
oraclev1 "github.com/tech/sendico/pkg/proto/oracle/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)
func (s *Service) buildPaymentQuote(ctx context.Context, orgRef string, req *orchestratorv1.QuotePaymentRequest) (*orchestratorv1.PaymentQuote, time.Time, error) {
intent := req.GetIntent()
amount := intent.GetAmount()
fxSide := fxv1.Side_SIDE_UNSPECIFIED
if intent.GetFx() != nil {
fxSide = intent.GetFx().GetSide()
}
var fxQuote *oraclev1.Quote
var err error
if shouldRequestFX(intent) {
fxQuote, err = s.requestFXQuote(ctx, orgRef, req)
if err != nil {
return nil, time.Time{}, err
}
s.logger.Debug("fx quote attached to payment quote", zap.String("org_ref", orgRef))
}
payAmount, settlementAmountBeforeFees := resolveTradeAmounts(amount, fxQuote, fxSide)
feeBaseAmount := payAmount
if feeBaseAmount == nil {
feeBaseAmount = cloneMoney(amount)
}
feeQuote, err := s.quoteFees(ctx, orgRef, req, feeBaseAmount)
if err != nil {
return nil, time.Time{}, err
}
feeCurrency := ""
if feeBaseAmount != nil {
feeCurrency = feeBaseAmount.GetCurrency()
} else if amount != nil {
feeCurrency = amount.GetCurrency()
}
feeTotal := extractFeeTotal(feeQuote.GetLines(), feeCurrency)
var networkFee *chainv1.EstimateTransferFeeResponse
if shouldEstimateNetworkFee(intent) {
networkFee, err = s.estimateNetworkFee(ctx, intent)
if err != nil {
return nil, time.Time{}, err
}
s.logger.Debug("network fee estimated", zap.String("org_ref", orgRef))
}
debitAmount, settlementAmount := computeAggregates(payAmount, settlementAmountBeforeFees, feeTotal, networkFee, fxQuote)
quote := &orchestratorv1.PaymentQuote{
DebitAmount: debitAmount,
ExpectedSettlementAmount: settlementAmount,
ExpectedFeeTotal: feeTotal,
FeeLines: cloneFeeLines(feeQuote.GetLines()),
FeeRules: cloneFeeRules(feeQuote.GetApplied()),
FxQuote: fxQuote,
NetworkFee: networkFee,
FeeQuoteToken: feeQuote.GetFeeQuoteToken(),
}
expiresAt := quoteExpiry(s.clock.Now(), feeQuote, fxQuote)
return quote, expiresAt, nil
}
func (s *Service) quoteFees(ctx context.Context, orgRef string, req *orchestratorv1.QuotePaymentRequest, baseAmount *moneyv1.Money) (*feesv1.PrecomputeFeesResponse, error) {
if !s.deps.fees.available() {
return &feesv1.PrecomputeFeesResponse{}, nil
}
intent := req.GetIntent()
amount := cloneMoney(baseAmount)
if amount == nil {
amount = cloneMoney(intent.GetAmount())
}
feeIntent := &feesv1.Intent{
Trigger: triggerFromKind(intent.GetKind(), intent.GetRequiresFx()),
BaseAmount: amount,
BookedAt: timestamppb.New(s.clock.Now()),
OriginType: "payments.orchestrator.quote",
OriginRef: strings.TrimSpace(req.GetIdempotencyKey()),
Attributes: cloneMetadata(intent.GetAttributes()),
}
timeout := req.GetMeta().GetTrace()
ctxTimeout, cancel := s.withTimeout(ctx, s.deps.fees.timeout)
defer cancel()
resp, err := s.deps.fees.client.PrecomputeFees(ctxTimeout, &feesv1.PrecomputeFeesRequest{
Meta: &feesv1.RequestMeta{
OrganizationRef: orgRef,
Trace: timeout,
},
Intent: feeIntent,
TtlMs: defaultFeeQuoteTTLMillis,
})
if err != nil {
s.logger.Warn("fees precompute failed", zap.Error(err))
return nil, merrors.Internal("fees_precompute_failed")
}
return resp, nil
}
func (s *Service) estimateNetworkFee(ctx context.Context, intent *orchestratorv1.PaymentIntent) (*chainv1.EstimateTransferFeeResponse, error) {
if !s.deps.gateway.available() {
return nil, nil
}
req := &chainv1.EstimateTransferFeeRequest{
Amount: cloneMoney(intent.GetAmount()),
}
if src := intent.GetSource().GetManagedWallet(); src != nil {
req.SourceWalletRef = strings.TrimSpace(src.GetManagedWalletRef())
}
if dst := intent.GetDestination().GetManagedWallet(); dst != nil {
req.Destination = &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ManagedWalletRef{ManagedWalletRef: strings.TrimSpace(dst.GetManagedWalletRef())},
}
}
if dst := intent.GetDestination().GetExternalChain(); dst != nil {
req.Destination = &chainv1.TransferDestination{
Destination: &chainv1.TransferDestination_ExternalAddress{ExternalAddress: strings.TrimSpace(dst.GetAddress())},
Memo: strings.TrimSpace(dst.GetMemo()),
}
req.Asset = dst.GetAsset()
}
if req.Asset == nil {
if src := intent.GetSource().GetManagedWallet(); src != nil {
req.Asset = src.GetAsset()
}
}
resp, err := s.deps.gateway.client.EstimateTransferFee(ctx, req)
if err != nil {
s.logger.Warn("chain gateway fee estimation failed", zap.Error(err))
return nil, merrors.Internal("chain_gateway_fee_estimation_failed")
}
return resp, nil
}
func (s *Service) requestFXQuote(ctx context.Context, orgRef string, req *orchestratorv1.QuotePaymentRequest) (*oraclev1.Quote, error) {
if !s.deps.oracle.available() {
return nil, nil
}
intent := req.GetIntent()
meta := req.GetMeta()
fxIntent := intent.GetFx()
if fxIntent == nil {
return nil, nil
}
ttl := fxIntent.GetTtlMs()
if ttl <= 0 {
ttl = defaultOracleTTLMillis
}
params := oracleclient.GetQuoteParams{
Meta: oracleclient.RequestMeta{
OrganizationRef: orgRef,
Trace: meta.GetTrace(),
},
Pair: fxIntent.GetPair(),
Side: fxIntent.GetSide(),
Firm: fxIntent.GetFirm(),
TTL: time.Duration(ttl) * time.Millisecond,
PreferredProvider: strings.TrimSpace(fxIntent.GetPreferredProvider()),
}
if fxIntent.GetMaxAgeMs() > 0 {
params.MaxAge = time.Duration(fxIntent.GetMaxAgeMs()) * time.Millisecond
}
if amount := intent.GetAmount(); amount != nil {
pair := fxIntent.GetPair()
if pair != nil {
switch {
case strings.EqualFold(amount.GetCurrency(), pair.GetBase()):
params.BaseAmount = cloneMoney(amount)
case strings.EqualFold(amount.GetCurrency(), pair.GetQuote()):
params.QuoteAmount = cloneMoney(amount)
default:
params.BaseAmount = cloneMoney(amount)
}
} else {
params.BaseAmount = cloneMoney(amount)
}
}
quote, err := s.deps.oracle.client.GetQuote(ctx, params)
if err != nil {
s.logger.Warn("fx oracle quote failed", zap.Error(err))
return nil, merrors.Internal("fx_quote_failed")
}
return quoteToProto(quote), nil
}

View File

@@ -19,19 +19,21 @@ func TestRequestFXQuoteUsesQuoteAmountWhenCurrencyMatchesQuote(t *testing.T) {
svc := &Service{
logger: zap.NewNop(),
clock: testClock{now: time.Now()},
oracle: oracleDependency{
client: &oracleclient.Fake{
GetQuoteFn: func(ctx context.Context, params oracleclient.GetQuoteParams) (*oracleclient.Quote, error) {
captured = params
return &oracleclient.Quote{
QuoteRef: "q",
Pair: params.Pair,
Side: params.Side,
Price: "1.1",
BaseAmount: params.BaseAmount,
QuoteAmount: params.QuoteAmount,
ExpiresAt: time.Now(),
}, nil
deps: serviceDependencies{
oracle: oracleDependency{
client: &oracleclient.Fake{
GetQuoteFn: func(ctx context.Context, params oracleclient.GetQuoteParams) (*oracleclient.Quote, error) {
captured = params
return &oracleclient.Quote{
QuoteRef: "q",
Pair: params.Pair,
Side: params.Side,
Price: "1.1",
BaseAmount: params.BaseAmount,
QuoteAmount: params.QuoteAmount,
ExpiresAt: time.Now(),
}, nil
},
},
},
},

View File

@@ -2,21 +2,14 @@ package orchestrator
import (
"context"
"strings"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/api/routers"
"github.com/tech/sendico/pkg/api/routers/gsresponse"
clockpkg "github.com/tech/sendico/pkg/clock"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
paginationv1 "github.com/tech/sendico/pkg/proto/common/pagination/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
type serviceError string
@@ -40,14 +33,32 @@ type Service struct {
storage storage.Repository
clock clockpkg.Clock
fees feesDependency
ledger ledgerDependency
gateway gatewayDependency
oracle oracleDependency
deps serviceDependencies
h handlerSet
comp componentSet
orchestratorv1.UnimplementedPaymentOrchestratorServer
}
type serviceDependencies struct {
fees feesDependency
ledger ledgerDependency
gateway gatewayDependency
oracle oracleDependency
mntx mntxDependency
cardRoutes map[string]CardGatewayRoute
}
type handlerSet struct {
commands *paymentCommandFactory
queries *paymentQueryHandler
events *paymentEventHandler
}
type componentSet struct {
executor *paymentExecutor
}
// NewService constructs a payment orchestrator service.
func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option) *Service {
svc := &Service{
@@ -68,9 +79,30 @@ func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option)
svc.clock = clockpkg.NewSystem()
}
engine := defaultPaymentEngine{svc: svc}
svc.h.commands = newPaymentCommandFactory(engine, svc.logger)
svc.h.queries = newPaymentQueryHandler(svc.storage, svc.ensureRepository, svc.logger.Named("queries"))
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger.Named("events"))
svc.comp.executor = newPaymentExecutor(&svc.deps, svc.logger.Named("payment_executor"), svc)
return svc
}
func (s *Service) ensureHandlers() {
if s.h.commands == nil {
s.h.commands = newPaymentCommandFactory(defaultPaymentEngine{svc: s}, s.logger)
}
if s.h.queries == nil {
s.h.queries = newPaymentQueryHandler(s.storage, s.ensureRepository, s.logger.Named("queries"))
}
if s.h.events == nil {
s.h.events = newPaymentEventHandler(s.storage, s.ensureRepository, s.logger.Named("events"))
}
if s.comp.executor == nil {
s.comp.executor = newPaymentExecutor(&s.deps, s.logger.Named("payment_executor"), s)
}
}
// Register attaches the service to the supplied gRPC router.
func (s *Service) Register(router routers.GRPC) error {
return router.Register(func(reg grpc.ServiceRegistrar) {
@@ -80,474 +112,59 @@ func (s *Service) Register(router routers.GRPC) error {
// QuotePayment aggregates downstream quotes.
func (s *Service) QuotePayment(ctx context.Context, req *orchestratorv1.QuotePaymentRequest) (*orchestratorv1.QuotePaymentResponse, error) {
return executeUnary(ctx, s, "QuotePayment", s.quotePaymentHandler, req)
s.ensureHandlers()
return executeUnary(ctx, s, "QuotePayment", s.h.commands.QuotePayment().Execute, req)
}
// InitiatePayment captures a payment intent and reserves funds orchestration.
func (s *Service) InitiatePayment(ctx context.Context, req *orchestratorv1.InitiatePaymentRequest) (*orchestratorv1.InitiatePaymentResponse, error) {
return executeUnary(ctx, s, "InitiatePayment", s.initiatePaymentHandler, req)
s.ensureHandlers()
return executeUnary(ctx, s, "InitiatePayment", s.h.commands.InitiatePayment().Execute, req)
}
// CancelPayment attempts to cancel an in-flight payment.
func (s *Service) CancelPayment(ctx context.Context, req *orchestratorv1.CancelPaymentRequest) (*orchestratorv1.CancelPaymentResponse, error) {
return executeUnary(ctx, s, "CancelPayment", s.cancelPaymentHandler, req)
s.ensureHandlers()
return executeUnary(ctx, s, "CancelPayment", s.h.commands.CancelPayment().Execute, req)
}
// GetPayment returns a stored payment record.
func (s *Service) GetPayment(ctx context.Context, req *orchestratorv1.GetPaymentRequest) (*orchestratorv1.GetPaymentResponse, error) {
return executeUnary(ctx, s, "GetPayment", s.getPaymentHandler, req)
s.ensureHandlers()
return executeUnary(ctx, s, "GetPayment", s.h.queries.getPayment, req)
}
// ListPayments lists stored payment records.
func (s *Service) ListPayments(ctx context.Context, req *orchestratorv1.ListPaymentsRequest) (*orchestratorv1.ListPaymentsResponse, error) {
return executeUnary(ctx, s, "ListPayments", s.listPaymentsHandler, req)
s.ensureHandlers()
return executeUnary(ctx, s, "ListPayments", s.h.queries.listPayments, req)
}
// InitiateConversion orchestrates standalone FX conversions.
func (s *Service) InitiateConversion(ctx context.Context, req *orchestratorv1.InitiateConversionRequest) (*orchestratorv1.InitiateConversionResponse, error) {
return executeUnary(ctx, s, "InitiateConversion", s.initiateConversionHandler, req)
s.ensureHandlers()
return executeUnary(ctx, s, "InitiateConversion", s.h.commands.InitiateConversion().Execute, req)
}
// ProcessTransferUpdate reconciles chain events back into payment state.
func (s *Service) ProcessTransferUpdate(ctx context.Context, req *orchestratorv1.ProcessTransferUpdateRequest) (*orchestratorv1.ProcessTransferUpdateResponse, error) {
return executeUnary(ctx, s, "ProcessTransferUpdate", s.processTransferUpdateHandler, req)
s.ensureHandlers()
return executeUnary(ctx, s, "ProcessTransferUpdate", s.h.events.processTransferUpdate, req)
}
// ProcessDepositObserved reconciles deposit events to ledger.
func (s *Service) ProcessDepositObserved(ctx context.Context, req *orchestratorv1.ProcessDepositObservedRequest) (*orchestratorv1.ProcessDepositObservedResponse, error) {
return executeUnary(ctx, s, "ProcessDepositObserved", s.processDepositObservedHandler, req)
s.ensureHandlers()
return executeUnary(ctx, s, "ProcessDepositObserved", s.h.events.processDepositObserved, req)
}
func (s *Service) quotePaymentHandler(ctx context.Context, req *orchestratorv1.QuotePaymentRequest) gsresponse.Responder[orchestratorv1.QuotePaymentResponse] {
if err := s.ensureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
meta := req.GetMeta()
if meta == nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("meta is required"))
}
orgRef := strings.TrimSpace(meta.GetOrganizationRef())
if orgRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("organization_ref is required"))
}
orgObjectID, parseErr := primitive.ObjectIDFromHex(orgRef)
if parseErr != nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("organization_ref must be a valid objectID"))
}
intent := req.GetIntent()
if intent == nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("intent is required"))
}
if intent.GetAmount() == nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("intent.amount is required"))
}
quote, expiresAt, err := s.buildPaymentQuote(ctx, orgRef, req)
if err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if !req.GetPreviewOnly() {
quotesStore := s.storage.Quotes()
if quotesStore == nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
quoteRef := primitive.NewObjectID().Hex()
quote.QuoteRef = quoteRef
record := &model.PaymentQuoteRecord{
QuoteRef: quoteRef,
Intent: intentFromProto(intent),
Quote: quoteSnapshotToModel(quote),
ExpiresAt: expiresAt,
}
record.SetID(primitive.NewObjectID())
record.SetOrganizationRef(orgObjectID)
if err := quotesStore.Create(ctx, record); err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
}
return gsresponse.Success(&orchestratorv1.QuotePaymentResponse{Quote: quote})
// ProcessCardPayoutUpdate reconciles card payout events back into payment state.
func (s *Service) ProcessCardPayoutUpdate(ctx context.Context, req *orchestratorv1.ProcessCardPayoutUpdateRequest) (*orchestratorv1.ProcessCardPayoutUpdateResponse, error) {
s.ensureHandlers()
return executeUnary(ctx, s, "ProcessCardPayoutUpdate", s.h.events.processCardPayoutUpdate, req)
}
func (s *Service) initiatePaymentHandler(ctx context.Context, req *orchestratorv1.InitiatePaymentRequest) gsresponse.Responder[orchestratorv1.InitiatePaymentResponse] {
if err := s.ensureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
meta := req.GetMeta()
if meta == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("meta is required"))
}
orgRef := strings.TrimSpace(meta.GetOrganizationRef())
if orgRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("organization_ref is required"))
}
orgObjectID, parseErr := primitive.ObjectIDFromHex(orgRef)
if parseErr != nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("organization_ref must be a valid objectID"))
}
intent := req.GetIntent()
if intent == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("intent is required"))
}
if intent.GetAmount() == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("intent.amount is required"))
}
idempotencyKey := strings.TrimSpace(req.GetIdempotencyKey())
if idempotencyKey == "" {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("idempotency_key is required"))
}
store := s.storage.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
existing, err := store.GetByIdempotencyKey(ctx, orgObjectID, idempotencyKey)
if err == nil && existing != nil {
return gsresponse.Success(&orchestratorv1.InitiatePaymentResponse{
Payment: toProtoPayment(existing),
})
}
if err != nil && err != storage.ErrPaymentNotFound {
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
quoteRef := strings.TrimSpace(req.GetQuoteRef())
quote := strings.TrimSpace(req.GetFeeQuoteToken())
var quoteSnapshot *orchestratorv1.PaymentQuote
if quoteRef != "" {
quotesStore := s.storage.Quotes()
if quotesStore == nil {
return gsresponse.Unavailable[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
record, err := quotesStore.GetByRef(ctx, orgObjectID, quoteRef)
if err != nil {
if err == storage.ErrQuoteNotFound {
return gsresponse.FailedPrecondition[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, "quote_not_found", merrors.InvalidArgument("quote_ref not found or expired"))
}
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if !record.ExpiresAt.IsZero() && s.clock.Now().After(record.ExpiresAt) {
return gsresponse.FailedPrecondition[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, "quote_expired", merrors.InvalidArgument("quote_ref expired"))
}
if !proto.Equal(protoIntentFromModel(record.Intent), intent) {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("quote_ref does not match intent"))
}
quoteSnapshot = modelQuoteToProto(record.Quote)
if quoteSnapshot == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("stored quote is empty"))
}
quoteSnapshot.QuoteRef = quoteRef
} else if quote == "" {
quoteSnapshot, _, err = s.buildPaymentQuote(ctx, orgRef, &orchestratorv1.QuotePaymentRequest{
Meta: req.GetMeta(),
IdempotencyKey: req.GetIdempotencyKey(),
Intent: req.GetIntent(),
PreviewOnly: false,
})
if err != nil {
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
} else {
quoteSnapshot = &orchestratorv1.PaymentQuote{FeeQuoteToken: quote}
}
entity := &model.Payment{}
entity.SetID(primitive.NewObjectID())
entity.SetOrganizationRef(orgObjectID)
entity.PaymentRef = entity.GetID().Hex()
entity.IdempotencyKey = idempotencyKey
entity.State = model.PaymentStateAccepted
entity.Intent = intentFromProto(intent)
entity.Metadata = cloneMetadata(req.GetMetadata())
entity.LastQuote = quoteSnapshotToModel(quoteSnapshot)
entity.Normalize()
if err = store.Create(ctx, entity); err != nil {
if err == storage.ErrDuplicatePayment {
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.DataConflict("payment already exists"))
}
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if quoteSnapshot == nil {
quoteSnapshot = &orchestratorv1.PaymentQuote{}
}
if err := s.executePayment(ctx, store, entity, quoteSnapshot); err != nil {
return gsresponse.Auto[orchestratorv1.InitiatePaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Success(&orchestratorv1.InitiatePaymentResponse{
Payment: toProtoPayment(entity),
})
}
func (s *Service) cancelPaymentHandler(ctx context.Context, req *orchestratorv1.CancelPaymentRequest) gsresponse.Responder[orchestratorv1.CancelPaymentResponse] {
if err := s.ensureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.CancelPaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.CancelPaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
paymentRef := strings.TrimSpace(req.GetPaymentRef())
if paymentRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.CancelPaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("payment_ref is required"))
}
store := s.storage.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.CancelPaymentResponse](s.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
payment, err := store.GetByPaymentRef(ctx, paymentRef)
if err != nil {
if err == storage.ErrPaymentNotFound {
return gsresponse.NotFound[orchestratorv1.CancelPaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Auto[orchestratorv1.CancelPaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if payment.State != model.PaymentStateAccepted {
reason := merrors.InvalidArgument("payment cannot be cancelled in current state")
return gsresponse.FailedPrecondition[orchestratorv1.CancelPaymentResponse](s.logger, mservice.PaymentOrchestrator, "payment_not_cancellable", reason)
}
payment.State = model.PaymentStateCancelled
payment.FailureCode = model.PaymentFailureCodePolicy
payment.FailureReason = strings.TrimSpace(req.GetReason())
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.CancelPaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Success(&orchestratorv1.CancelPaymentResponse{Payment: toProtoPayment(payment)})
}
func (s *Service) getPaymentHandler(ctx context.Context, req *orchestratorv1.GetPaymentRequest) gsresponse.Responder[orchestratorv1.GetPaymentResponse] {
if err := s.ensureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.GetPaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.GetPaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
paymentRef := strings.TrimSpace(req.GetPaymentRef())
if paymentRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.GetPaymentResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("payment_ref is required"))
}
store := s.storage.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.GetPaymentResponse](s.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
entity, err := store.GetByPaymentRef(ctx, paymentRef)
if err != nil {
if err == storage.ErrPaymentNotFound {
return gsresponse.NotFound[orchestratorv1.GetPaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Auto[orchestratorv1.GetPaymentResponse](s.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Success(&orchestratorv1.GetPaymentResponse{Payment: toProtoPayment(entity)})
}
func (s *Service) listPaymentsHandler(ctx context.Context, req *orchestratorv1.ListPaymentsRequest) gsresponse.Responder[orchestratorv1.ListPaymentsResponse] {
if err := s.ensureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.ListPaymentsResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.ListPaymentsResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
store := s.storage.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.ListPaymentsResponse](s.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
filter := filterFromProto(req)
result, err := store.List(ctx, filter)
if err != nil {
return gsresponse.Auto[orchestratorv1.ListPaymentsResponse](s.logger, mservice.PaymentOrchestrator, err)
}
resp := &orchestratorv1.ListPaymentsResponse{
Page: &paginationv1.CursorPageResponse{
NextCursor: result.NextCursor,
},
}
resp.Payments = make([]*orchestratorv1.Payment, 0, len(result.Items))
for _, item := range result.Items {
resp.Payments = append(resp.Payments, toProtoPayment(item))
}
return gsresponse.Success(resp)
}
func (s *Service) initiateConversionHandler(ctx context.Context, req *orchestratorv1.InitiateConversionRequest) gsresponse.Responder[orchestratorv1.InitiateConversionResponse] {
if err := s.ensureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
meta := req.GetMeta()
if meta == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("meta is required"))
}
orgRef := strings.TrimSpace(meta.GetOrganizationRef())
if orgRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("organization_ref is required"))
}
orgObjectID, parseErr := primitive.ObjectIDFromHex(orgRef)
if parseErr != nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("organization_ref must be a valid objectID"))
}
idempotencyKey := strings.TrimSpace(req.GetIdempotencyKey())
if idempotencyKey == "" {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("idempotency_key is required"))
}
if req.GetSource() == nil || req.GetSource().GetLedger() == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("source ledger endpoint is required"))
}
if req.GetDestination() == nil || req.GetDestination().GetLedger() == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("destination ledger endpoint is required"))
}
fxIntent := req.GetFx()
if fxIntent == nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("fx intent is required"))
}
store := s.storage.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
if existing, err := store.GetByIdempotencyKey(ctx, orgObjectID, idempotencyKey); err == nil && existing != nil {
return gsresponse.Success(&orchestratorv1.InitiateConversionResponse{Conversion: toProtoPayment(existing)})
} else if err != nil && err != storage.ErrPaymentNotFound {
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, err)
}
amount, err := conversionAmountFromMetadata(req.GetMetadata(), fxIntent)
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, err)
}
intentProto := &orchestratorv1.PaymentIntent{
Kind: orchestratorv1.PaymentKind_PAYMENT_KIND_FX_CONVERSION,
Source: req.GetSource(),
Destination: req.GetDestination(),
Amount: amount,
RequiresFx: true,
Fx: fxIntent,
FeePolicy: req.GetFeePolicy(),
}
quote, _, err := s.buildPaymentQuote(ctx, orgRef, &orchestratorv1.QuotePaymentRequest{
Meta: req.GetMeta(),
IdempotencyKey: req.GetIdempotencyKey(),
Intent: intentProto,
})
if err != nil {
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, err)
}
entity := &model.Payment{}
entity.SetID(primitive.NewObjectID())
entity.SetOrganizationRef(orgObjectID)
entity.PaymentRef = entity.GetID().Hex()
entity.IdempotencyKey = idempotencyKey
entity.State = model.PaymentStateAccepted
entity.Intent = intentFromProto(intentProto)
entity.Metadata = cloneMetadata(req.GetMetadata())
entity.LastQuote = quoteSnapshotToModel(quote)
entity.Normalize()
if err = store.Create(ctx, entity); err != nil {
if err == storage.ErrDuplicatePayment {
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, merrors.DataConflict("payment already exists"))
}
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if err := s.executePayment(ctx, store, entity, quote); err != nil {
return gsresponse.Auto[orchestratorv1.InitiateConversionResponse](s.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Success(&orchestratorv1.InitiateConversionResponse{
Conversion: toProtoPayment(entity),
})
}
func (s *Service) processTransferUpdateHandler(ctx context.Context, req *orchestratorv1.ProcessTransferUpdateRequest) gsresponse.Responder[orchestratorv1.ProcessTransferUpdateResponse] {
if err := s.ensureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.ProcessTransferUpdateResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if req == nil || req.GetEvent() == nil || req.GetEvent().GetTransfer() == nil {
return gsresponse.InvalidArgument[orchestratorv1.ProcessTransferUpdateResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("transfer event is required"))
}
transfer := req.GetEvent().GetTransfer()
transferRef := strings.TrimSpace(transfer.GetTransferRef())
if transferRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.ProcessTransferUpdateResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("transfer_ref is required"))
}
store := s.storage.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.ProcessTransferUpdateResponse](s.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
payment, err := store.GetByChainTransferRef(ctx, transferRef)
if err != nil {
if err == storage.ErrPaymentNotFound {
return gsresponse.NotFound[orchestratorv1.ProcessTransferUpdateResponse](s.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Auto[orchestratorv1.ProcessTransferUpdateResponse](s.logger, mservice.PaymentOrchestrator, err)
}
applyTransferStatus(req.GetEvent(), payment)
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessTransferUpdateResponse](s.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Success(&orchestratorv1.ProcessTransferUpdateResponse{Payment: toProtoPayment(payment)})
}
func (s *Service) processDepositObservedHandler(ctx context.Context, req *orchestratorv1.ProcessDepositObservedRequest) gsresponse.Responder[orchestratorv1.ProcessDepositObservedResponse] {
if err := s.ensureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.ProcessDepositObservedResponse](s.logger, mservice.PaymentOrchestrator, err)
}
if req == nil || req.GetEvent() == nil {
return gsresponse.InvalidArgument[orchestratorv1.ProcessDepositObservedResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("deposit event is required"))
}
event := req.GetEvent()
walletRef := strings.TrimSpace(event.GetWalletRef())
if walletRef == "" {
return gsresponse.InvalidArgument[orchestratorv1.ProcessDepositObservedResponse](s.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("wallet_ref is required"))
}
store := s.storage.Payments()
if store == nil {
return gsresponse.Unavailable[orchestratorv1.ProcessDepositObservedResponse](s.logger, mservice.PaymentOrchestrator, errStorageUnavailable)
}
filter := &model.PaymentFilter{
States: []model.PaymentState{model.PaymentStateSubmitted, model.PaymentStateFundsReserved},
DestinationRef: walletRef,
}
result, err := store.List(ctx, filter)
if err != nil {
return gsresponse.Auto[orchestratorv1.ProcessDepositObservedResponse](s.logger, mservice.PaymentOrchestrator, err)
}
for _, payment := range result.Items {
if payment.Intent.Destination.Type != model.EndpointTypeManagedWallet {
continue
}
if !moneyEquals(payment.Intent.Amount, event.GetAmount()) {
continue
}
payment.State = model.PaymentStateSettled
payment.FailureCode = model.PaymentFailureCodeUnspecified
payment.FailureReason = ""
if payment.Execution == nil {
payment.Execution = &model.ExecutionRefs{}
}
if payment.Execution.ChainTransferRef == "" {
payment.Execution.ChainTransferRef = strings.TrimSpace(event.GetTransactionHash())
}
if err := store.Update(ctx, payment); err != nil {
return gsresponse.Auto[orchestratorv1.ProcessDepositObservedResponse](s.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Success(&orchestratorv1.ProcessDepositObservedResponse{Payment: toProtoPayment(payment)})
}
return gsresponse.Success(&orchestratorv1.ProcessDepositObservedResponse{})
func (s *Service) executePayment(ctx context.Context, store storage.PaymentsStore, payment *model.Payment, quote *orchestratorv1.PaymentQuote) error {
s.ensureHandlers()
return s.comp.executor.executePayment(ctx, store, payment, quote)
}

View File

@@ -0,0 +1,170 @@
package orchestrator
import (
"context"
"errors"
"strings"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/api/routers/gsresponse"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"google.golang.org/protobuf/proto"
)
func validateMetaAndOrgRef(meta *orchestratorv1.RequestMeta) (string, primitive.ObjectID, error) {
if meta == nil {
return "", primitive.NilObjectID, merrors.InvalidArgument("meta is required")
}
orgRef := strings.TrimSpace(meta.GetOrganizationRef())
if orgRef == "" {
return "", primitive.NilObjectID, merrors.InvalidArgument("organization_ref is required")
}
orgID, err := primitive.ObjectIDFromHex(orgRef)
if err != nil {
return "", primitive.NilObjectID, merrors.InvalidArgument("organization_ref must be a valid objectID")
}
return orgRef, orgID, nil
}
func requireIdempotencyKey(k string) (string, error) {
key := strings.TrimSpace(k)
if key == "" {
return "", merrors.InvalidArgument("idempotency_key is required")
}
return key, nil
}
func requirePaymentRef(ref string) (string, error) {
val := strings.TrimSpace(ref)
if val == "" {
return "", merrors.InvalidArgument("payment_ref is required")
}
return val, nil
}
func requireNonNilIntent(intent *orchestratorv1.PaymentIntent) error {
if intent == nil {
return merrors.InvalidArgument("intent is required")
}
if intent.GetAmount() == nil {
return merrors.InvalidArgument("intent.amount is required")
}
return nil
}
func ensurePaymentsStore(repo storage.Repository) (storage.PaymentsStore, error) {
if repo == nil {
return nil, errStorageUnavailable
}
store := repo.Payments()
if store == nil {
return nil, errStorageUnavailable
}
return store, nil
}
func ensureQuotesStore(repo storage.Repository) (storage.QuotesStore, error) {
if repo == nil {
return nil, errStorageUnavailable
}
store := repo.Quotes()
if store == nil {
return nil, errStorageUnavailable
}
return store, nil
}
func getPaymentByIdempotencyKey(ctx context.Context, store storage.PaymentsStore, orgID primitive.ObjectID, key string) (*model.Payment, error) {
payment, err := store.GetByIdempotencyKey(ctx, orgID, key)
if err != nil {
return nil, err
}
return payment, nil
}
type quoteResolutionInput struct {
OrgRef string
OrgID primitive.ObjectID
Meta *orchestratorv1.RequestMeta
Intent *orchestratorv1.PaymentIntent
QuoteRef string
FeeQuoteToken string
IdempotencyKey string
}
type quoteResolutionError struct {
code string
err error
}
func (e quoteResolutionError) Error() string { return e.err.Error() }
func (s *Service) resolvePaymentQuote(ctx context.Context, in quoteResolutionInput) (*orchestratorv1.PaymentQuote, error) {
if ref := strings.TrimSpace(in.QuoteRef); ref != "" {
quotesStore, err := ensureQuotesStore(s.storage)
if err != nil {
return nil, err
}
record, err := quotesStore.GetByRef(ctx, in.OrgID, ref)
if err != nil {
if errors.Is(err, storage.ErrQuoteNotFound) {
return nil, quoteResolutionError{code: "quote_not_found", err: merrors.InvalidArgument("quote_ref not found or expired")}
}
return nil, err
}
if !record.ExpiresAt.IsZero() && s.clock.Now().After(record.ExpiresAt) {
return nil, quoteResolutionError{code: "quote_expired", err: merrors.InvalidArgument("quote_ref expired")}
}
if !proto.Equal(protoIntentFromModel(record.Intent), in.Intent) {
return nil, quoteResolutionError{code: "quote_intent_mismatch", err: merrors.InvalidArgument("quote_ref does not match intent")}
}
quote := modelQuoteToProto(record.Quote)
if quote == nil {
return nil, merrors.InvalidArgument("stored quote is empty")
}
quote.QuoteRef = ref
return quote, nil
}
if token := strings.TrimSpace(in.FeeQuoteToken); token != "" {
return &orchestratorv1.PaymentQuote{FeeQuoteToken: token}, nil
}
req := &orchestratorv1.QuotePaymentRequest{
Meta: in.Meta,
IdempotencyKey: in.IdempotencyKey,
Intent: in.Intent,
PreviewOnly: false,
}
quote, _, err := s.buildPaymentQuote(ctx, in.OrgRef, req)
if err != nil {
return nil, err
}
return quote, nil
}
func newPayment(orgID primitive.ObjectID, intent *orchestratorv1.PaymentIntent, idempotencyKey string, metadata map[string]string, quote *orchestratorv1.PaymentQuote) *model.Payment {
entity := &model.Payment{}
entity.SetID(primitive.NewObjectID())
entity.SetOrganizationRef(orgID)
entity.PaymentRef = entity.GetID().Hex()
entity.IdempotencyKey = idempotencyKey
entity.State = model.PaymentStateAccepted
entity.Intent = intentFromProto(intent)
entity.Metadata = cloneMetadata(metadata)
entity.LastQuote = quoteSnapshotToModel(quote)
entity.Normalize()
return entity
}
func paymentNotFoundResponder[T any](svc mservice.Type, logger mlogger.Logger, err error) gsresponse.Responder[T] {
if errors.Is(err, storage.ErrPaymentNotFound) {
return gsresponse.NotFound[T](logger, svc, err)
}
return gsresponse.Auto[T](logger, svc, err)
}

View File

@@ -0,0 +1,252 @@
package orchestrator
import (
"context"
"testing"
"time"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
clockpkg "github.com/tech/sendico/pkg/clock"
mloggerfactory "github.com/tech/sendico/pkg/mlogger/factory"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
)
func TestValidateMetaAndOrgRef(t *testing.T) {
org := primitive.NewObjectID()
meta := &orchestratorv1.RequestMeta{OrganizationRef: org.Hex()}
ref, id, err := validateMetaAndOrgRef(meta)
if err != nil {
t.Fatalf("expected nil error: %v", err)
}
if ref != org.Hex() || id != org {
t.Fatalf("unexpected org parsing: %s %s", ref, id.Hex())
}
if _, _, err := validateMetaAndOrgRef(nil); err == nil {
t.Fatalf("expected error on nil meta")
}
if _, _, err := validateMetaAndOrgRef(&orchestratorv1.RequestMeta{OrganizationRef: ""}); err == nil {
t.Fatalf("expected error on empty orgRef")
}
if _, _, err := validateMetaAndOrgRef(&orchestratorv1.RequestMeta{OrganizationRef: "bad"}); err == nil {
t.Fatalf("expected error on invalid orgRef")
}
}
func TestRequireIdempotencyKey(t *testing.T) {
if _, err := requireIdempotencyKey(" "); err == nil {
t.Fatalf("expected error for empty key")
}
val, err := requireIdempotencyKey(" key ")
if err != nil || val != "key" {
t.Fatalf("unexpected result %s err %v", val, err)
}
}
func TestNewPayment(t *testing.T) {
org := primitive.NewObjectID()
intent := &orchestratorv1.PaymentIntent{
Amount: &moneyv1.Money{Currency: "USD", Amount: "10"},
}
quote := &orchestratorv1.PaymentQuote{QuoteRef: "q1"}
p := newPayment(org, intent, "idem", map[string]string{"k": "v"}, quote)
if p.PaymentRef == "" || p.IdempotencyKey != "idem" || p.State != model.PaymentStateAccepted {
t.Fatalf("unexpected payment fields: %+v", p)
}
if p.Intent.Amount == nil || p.Intent.Amount.GetAmount() != "10" {
t.Fatalf("intent not copied")
}
if p.LastQuote == nil || p.LastQuote.QuoteRef != "q1" {
t.Fatalf("quote not copied")
}
}
func TestResolvePaymentQuote_NotFound(t *testing.T) {
org := primitive.NewObjectID()
svc := &Service{
storage: stubRepo{quotes: &helperQuotesStore{}},
clock: clockpkg.NewSystem(),
}
_, err := svc.resolvePaymentQuote(context.Background(), quoteResolutionInput{
OrgRef: org.Hex(),
OrgID: org,
Meta: &orchestratorv1.RequestMeta{OrganizationRef: org.Hex()},
Intent: &orchestratorv1.PaymentIntent{Amount: &moneyv1.Money{Currency: "USD", Amount: "1"}},
QuoteRef: "missing",
})
if qerr, ok := err.(quoteResolutionError); !ok || qerr.code != "quote_not_found" {
t.Fatalf("expected quote_not_found, got %v", err)
}
}
func TestResolvePaymentQuote_Expired(t *testing.T) {
org := primitive.NewObjectID()
intent := &orchestratorv1.PaymentIntent{Amount: &moneyv1.Money{Currency: "USD", Amount: "1"}}
record := &model.PaymentQuoteRecord{
QuoteRef: "q1",
Intent: intentFromProto(intent),
Quote: &model.PaymentQuoteSnapshot{},
ExpiresAt: time.Now().Add(-time.Minute),
}
svc := &Service{
storage: stubRepo{quotes: &helperQuotesStore{records: map[string]*model.PaymentQuoteRecord{"q1": record}}},
clock: clockpkg.NewSystem(),
}
_, err := svc.resolvePaymentQuote(context.Background(), quoteResolutionInput{
OrgRef: org.Hex(),
OrgID: org,
Meta: &orchestratorv1.RequestMeta{OrganizationRef: org.Hex()},
Intent: intent,
QuoteRef: "q1",
})
if qerr, ok := err.(quoteResolutionError); !ok || qerr.code != "quote_expired" {
t.Fatalf("expected quote_expired, got %v", err)
}
}
func TestResolvePaymentQuote_FeeToken(t *testing.T) {
org := primitive.NewObjectID()
intent := &orchestratorv1.PaymentIntent{Amount: &moneyv1.Money{Currency: "USD", Amount: "1"}}
svc := &Service{clock: clockpkg.NewSystem()}
quote, err := svc.resolvePaymentQuote(context.Background(), quoteResolutionInput{
OrgRef: org.Hex(),
OrgID: org,
Meta: &orchestratorv1.RequestMeta{OrganizationRef: org.Hex()},
Intent: intent,
FeeQuoteToken: "token",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if quote.GetFeeQuoteToken() != "token" {
t.Fatalf("expected fee token preserved")
}
}
func TestInitiatePaymentIdempotency(t *testing.T) {
logger := mloggerfactory.NewLogger(false)
org := primitive.NewObjectID()
store := newHelperPaymentStore()
svc := NewService(logger, stubRepo{
payments: store,
}, WithClock(clockpkg.NewSystem()))
svc.ensureHandlers()
intent := &orchestratorv1.PaymentIntent{
Amount: &moneyv1.Money{Currency: "USD", Amount: "1"},
}
req := &orchestratorv1.InitiatePaymentRequest{
Meta: &orchestratorv1.RequestMeta{OrganizationRef: org.Hex()},
Intent: intent,
IdempotencyKey: "k1",
FeeQuoteToken: "fq",
}
resp, err := svc.h.commands.InitiatePayment().Execute(context.Background(), req)(context.Background())
if err != nil {
t.Fatalf("first call failed: %v", err)
}
resp2, err := svc.h.commands.InitiatePayment().Execute(context.Background(), req)(context.Background())
if err != nil {
t.Fatalf("second call failed: %v", err)
}
if resp == nil || resp2 == nil || resp.Payment.GetPaymentRef() != resp2.Payment.GetPaymentRef() {
t.Fatalf("idempotent call returned different payments")
}
}
// --- test doubles ---
type stubRepo struct {
payments storage.PaymentsStore
quotes storage.QuotesStore
pingErr error
}
func (s stubRepo) Ping(context.Context) error { return s.pingErr }
func (s stubRepo) Payments() storage.PaymentsStore { return s.payments }
func (s stubRepo) Quotes() storage.QuotesStore { return s.quotes }
type helperPaymentStore struct {
byRef map[string]*model.Payment
byIdem map[string]*model.Payment
byChain map[string]*model.Payment
}
func newHelperPaymentStore() *helperPaymentStore {
return &helperPaymentStore{
byRef: make(map[string]*model.Payment),
byIdem: make(map[string]*model.Payment),
byChain: make(map[string]*model.Payment),
}
}
func (s *helperPaymentStore) Create(_ context.Context, p *model.Payment) error {
if _, ok := s.byRef[p.PaymentRef]; ok {
return storage.ErrDuplicatePayment
}
s.byRef[p.PaymentRef] = p
if p.IdempotencyKey != "" {
s.byIdem[p.IdempotencyKey] = p
}
if p.Execution != nil && p.Execution.ChainTransferRef != "" {
s.byChain[p.Execution.ChainTransferRef] = p
}
return nil
}
func (s *helperPaymentStore) Update(_ context.Context, p *model.Payment) error {
if p == nil {
return storage.ErrPaymentNotFound
}
if _, ok := s.byRef[p.PaymentRef]; !ok {
return storage.ErrPaymentNotFound
}
s.byRef[p.PaymentRef] = p
if p.IdempotencyKey != "" {
s.byIdem[p.IdempotencyKey] = p
}
return nil
}
func (s *helperPaymentStore) GetByPaymentRef(_ context.Context, ref string) (*model.Payment, error) {
if p, ok := s.byRef[ref]; ok {
return p, nil
}
return nil, storage.ErrPaymentNotFound
}
func (s *helperPaymentStore) GetByIdempotencyKey(_ context.Context, _ primitive.ObjectID, key string) (*model.Payment, error) {
if p, ok := s.byIdem[key]; ok {
return p, nil
}
return nil, storage.ErrPaymentNotFound
}
func (s *helperPaymentStore) GetByChainTransferRef(_ context.Context, ref string) (*model.Payment, error) {
if p, ok := s.byChain[ref]; ok {
return p, nil
}
return nil, storage.ErrPaymentNotFound
}
func (s *helperPaymentStore) List(_ context.Context, _ *model.PaymentFilter) (*model.PaymentList, error) {
return &model.PaymentList{}, nil
}
type helperQuotesStore struct {
records map[string]*model.PaymentQuoteRecord
}
func (s *helperQuotesStore) Create(_ context.Context, _ *model.PaymentQuoteRecord) error { return nil }
func (s *helperQuotesStore) GetByRef(_ context.Context, _ primitive.ObjectID, ref string) (*model.PaymentQuoteRecord, error) {
if s.records == nil {
return nil, storage.ErrQuoteNotFound
}
if rec, ok := s.records[ref]; ok {
return rec, nil
}
return nil, storage.ErrQuoteNotFound
}

View File

@@ -32,11 +32,13 @@ func TestExecutePayment_FXConversionSettled(t *testing.T) {
logger: zap.NewNop(),
clock: testClock{now: time.Now()},
storage: repo,
ledger: ledgerDependency{client: &ledgerclient.Fake{
ApplyFXWithChargesFn: func(ctx context.Context, req *ledgerv1.FXRequest) (*ledgerv1.PostResponse, error) {
return &ledgerv1.PostResponse{JournalEntryRef: "fx-entry"}, nil
},
}},
deps: serviceDependencies{
ledger: ledgerDependency{client: &ledgerclient.Fake{
ApplyFXWithChargesFn: func(ctx context.Context, req *ledgerv1.FXRequest) (*ledgerv1.PostResponse, error) {
return &ledgerv1.PostResponse{JournalEntryRef: "fx-entry"}, nil
},
}},
},
}
payment := &model.Payment{
@@ -88,11 +90,13 @@ func TestExecutePayment_ChainFailure(t *testing.T) {
logger: zap.NewNop(),
clock: testClock{now: time.Now()},
storage: repo,
gateway: gatewayDependency{client: &chainclient.Fake{
SubmitTransferFn: func(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) {
return nil, errors.New("chain failure")
},
}},
deps: serviceDependencies{
gateway: gatewayDependency{client: &chainclient.Fake{
SubmitTransferFn: func(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) {
return nil, errors.New("chain failure")
},
}},
},
}
payment := &model.Payment{
@@ -146,6 +150,7 @@ func TestProcessTransferUpdateHandler_Settled(t *testing.T) {
clock: testClock{now: time.Now()},
storage: &stubRepository{store: store},
}
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger)
req := &orchestratorv1.ProcessTransferUpdateRequest{
Event: &chainv1.TransferStatusChangedEvent{
@@ -156,7 +161,7 @@ func TestProcessTransferUpdateHandler_Settled(t *testing.T) {
},
}
reSP, err := gsresponse.Execute(ctx, svc.processTransferUpdateHandler(ctx, req))
reSP, err := gsresponse.Execute(ctx, svc.h.events.processTransferUpdate(ctx, req))
if err != nil {
t.Fatalf("handler returned error: %v", err)
}
@@ -189,6 +194,7 @@ func TestProcessDepositObservedHandler_MatchesPayment(t *testing.T) {
clock: testClock{now: time.Now()},
storage: &stubRepository{store: store},
}
svc.h.events = newPaymentEventHandler(svc.storage, svc.ensureRepository, svc.logger)
req := &orchestratorv1.ProcessDepositObservedRequest{
Event: &chainv1.WalletDepositObservedEvent{
@@ -197,7 +203,7 @@ func TestProcessDepositObservedHandler_MatchesPayment(t *testing.T) {
},
}
reSP, err := gsresponse.Execute(ctx, svc.processDepositObservedHandler(ctx, req))
reSP, err := gsresponse.Execute(ctx, svc.h.events.processDepositObserved(ctx, req))
if err != nil {
t.Fatalf("handler returned error: %v", err)
}