idempotency key usage fix

This commit is contained in:
Stephan D
2026-01-21 15:23:50 +01:00
parent a15375f18e
commit d2e78356e6
48 changed files with 729 additions and 559 deletions

View File

@@ -62,5 +62,5 @@ require (
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/text v0.33.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120174246-409b4a993575 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect
)

View File

@@ -215,8 +215,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120174246-409b4a993575 h1:vzOYHDZEHIsPYYnaSYo60AqHkJronSu0rzTz/s4quL0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120174246-409b4a993575/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=

View File

@@ -3,13 +3,9 @@ package serverimp
import (
"strings"
chainclient "github.com/tech/sendico/gateway/chain/client"
"github.com/tech/sendico/payments/orchestrator/internal/service/orchestrator"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/payments/rail"
"go.uber.org/zap"
)
func buildCardGatewayRoutes(src map[string]cardGatewayRouteConfig) map[string]orchestrator.CardGatewayRoute {
@@ -62,183 +58,3 @@ func buildGatewayRegistry(logger mlogger.Logger, src []gatewayInstanceConfig, re
}
return orchestrator.NewDiscoveryGatewayRegistry(logger, registry)
}
func buildRailGateways(chainClient chainclient.Client, paymentGatewayClient chainclient.Client, src []gatewayInstanceConfig) map[string]rail.RailGateway {
if len(src) == 0 || (chainClient == nil && paymentGatewayClient == nil) {
return nil
}
instances := buildGatewayInstances(nil, src)
if len(instances) == 0 {
return nil
}
result := map[string]rail.RailGateway{}
for _, inst := range instances {
if inst == nil || !inst.IsEnabled {
continue
}
cfg := chainclient.RailGatewayConfig{
Rail: string(inst.Rail),
Network: inst.Network,
Capabilities: rail.RailCapabilities{
CanPayIn: inst.Capabilities.CanPayIn,
CanPayOut: inst.Capabilities.CanPayOut,
CanReadBalance: inst.Capabilities.CanReadBalance,
CanSendFee: inst.Capabilities.CanSendFee,
RequiresObserveConfirm: inst.Capabilities.RequiresObserveConfirm,
CanBlock: inst.Capabilities.CanBlock,
CanRelease: inst.Capabilities.CanRelease,
},
}
switch inst.Rail {
case model.RailCrypto:
if chainClient == nil {
continue
}
result[inst.ID] = chainclient.NewRailGateway(chainClient, cfg)
case model.RailProviderSettlement:
if paymentGatewayClient == nil {
continue
}
result[inst.ID] = orchestrator.NewProviderSettlementGateway(paymentGatewayClient, cfg)
}
}
if len(result) == 0 {
return nil
}
return result
}
func buildGatewayInstances(logger mlogger.Logger, src []gatewayInstanceConfig) []*model.GatewayInstanceDescriptor {
if len(src) == 0 {
return nil
}
if logger != nil {
logger = logger.Named("gateway_instances")
}
result := make([]*model.GatewayInstanceDescriptor, 0, len(src))
for _, cfg := range src {
id := strings.TrimSpace(cfg.ID)
if id == "" {
if logger != nil {
logger.Warn("Gateway instance skipped: missing id")
}
continue
}
rail := parseRail(cfg.Rail)
if rail == model.RailUnspecified {
if logger != nil {
logger.Warn("Gateway instance skipped: invalid rail", zap.String("id", id), zap.String("rail", cfg.Rail))
}
continue
}
enabled := true
if cfg.IsEnabled != nil {
enabled = *cfg.IsEnabled
}
result = append(result, &model.GatewayInstanceDescriptor{
ID: id,
Rail: rail,
Network: strings.ToUpper(strings.TrimSpace(cfg.Network)),
Currencies: normalizeCurrencies(cfg.Currencies),
Capabilities: model.RailCapabilities{
CanPayIn: cfg.Capabilities.CanPayIn,
CanPayOut: cfg.Capabilities.CanPayOut,
CanReadBalance: cfg.Capabilities.CanReadBalance,
CanSendFee: cfg.Capabilities.CanSendFee,
RequiresObserveConfirm: cfg.Capabilities.RequiresObserveConfirm,
CanBlock: cfg.Capabilities.CanBlock,
CanRelease: cfg.Capabilities.CanRelease,
},
Limits: buildGatewayLimits(cfg.Limits),
Version: strings.TrimSpace(cfg.Version),
IsEnabled: enabled,
})
}
return result
}
func parseRail(value string) model.Rail {
switch strings.ToUpper(strings.TrimSpace(value)) {
case string(model.RailCrypto):
return model.RailCrypto
case string(model.RailProviderSettlement):
return model.RailProviderSettlement
case string(model.RailLedger):
return model.RailLedger
case string(model.RailCardPayout):
return model.RailCardPayout
case string(model.RailFiatOnRamp):
return model.RailFiatOnRamp
default:
return model.RailUnspecified
}
}
func normalizeCurrencies(values []string) []string {
if len(values) == 0 {
return nil
}
seen := map[string]bool{}
result := make([]string, 0, len(values))
for _, value := range values {
clean := strings.ToUpper(strings.TrimSpace(value))
if clean == "" || seen[clean] {
continue
}
seen[clean] = true
result = append(result, clean)
}
return result
}
func buildGatewayLimits(cfg limitsConfig) model.Limits {
limits := model.Limits{
MinAmount: strings.TrimSpace(cfg.MinAmount),
MaxAmount: strings.TrimSpace(cfg.MaxAmount),
PerTxMaxFee: strings.TrimSpace(cfg.PerTxMaxFee),
PerTxMinAmount: strings.TrimSpace(cfg.PerTxMinAmount),
PerTxMaxAmount: strings.TrimSpace(cfg.PerTxMaxAmount),
}
if len(cfg.VolumeLimit) > 0 {
limits.VolumeLimit = map[string]string{}
for key, value := range cfg.VolumeLimit {
bucket := strings.TrimSpace(key)
amount := strings.TrimSpace(value)
if bucket == "" || amount == "" {
continue
}
limits.VolumeLimit[bucket] = amount
}
}
if len(cfg.VelocityLimit) > 0 {
limits.VelocityLimit = map[string]int{}
for key, value := range cfg.VelocityLimit {
bucket := strings.TrimSpace(key)
if bucket == "" {
continue
}
limits.VelocityLimit[bucket] = value
}
}
if len(cfg.CurrencyLimits) > 0 {
limits.CurrencyLimits = map[string]model.LimitsOverride{}
for key, override := range cfg.CurrencyLimits {
currency := strings.ToUpper(strings.TrimSpace(key))
if currency == "" {
continue
}
limits.CurrencyLimits[currency] = model.LimitsOverride{
MaxVolume: strings.TrimSpace(override.MaxVolume),
MinAmount: strings.TrimSpace(override.MinAmount),
MaxAmount: strings.TrimSpace(override.MaxAmount),
MaxFee: strings.TrimSpace(override.MaxFee),
MaxOps: override.MaxOps,
}
}
}
return limits
}

View File

@@ -1,159 +1,5 @@
package serverimp
import (
"context"
"crypto/tls"
oracleclient "github.com/tech/sendico/fx/oracle/client"
chainclient "github.com/tech/sendico/gateway/chain/client"
mntxclient "github.com/tech/sendico/gateway/mntx/client"
ledgerclient "github.com/tech/sendico/ledger/client"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
func (i *Imp) initFeesClient(cfg clientConfig) (feesv1.FeeEngineClient, *grpc.ClientConn) {
addr := cfg.address()
if addr == "" {
return nil, nil
}
dialCtx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
creds := credentials.NewTLS(&tls.Config{})
if cfg.InsecureTransport {
creds = insecure.NewCredentials()
}
conn, err := grpc.DialContext(dialCtx, addr, grpc.WithTransportCredentials(creds))
if err != nil {
i.logger.Warn("Failed to connect to fees service", zap.String("address", addr), zap.Error(err))
return nil, nil
}
i.logger.Info("Connected to fees service", zap.String("address", addr))
return feesv1.NewFeeEngineClient(conn), conn
}
func (i *Imp) initLedgerClient(cfg clientConfig) ledgerclient.Client {
addr := cfg.address()
if addr == "" {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
client, err := ledgerclient.New(ctx, ledgerclient.Config{
Address: addr,
DialTimeout: cfg.dialTimeout(),
CallTimeout: cfg.callTimeout(),
Insecure: cfg.InsecureTransport,
})
if err != nil {
i.logger.Warn("Failed to connect to ledger service", zap.String("address", addr), zap.Error(err))
return nil
}
i.logger.Info("Connected to ledger service", zap.String("address", addr))
return client
}
func (i *Imp) initGatewayClient(cfg clientConfig) chainclient.Client {
addr := cfg.address()
if addr == "" {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
client, err := chainclient.New(ctx, chainclient.Config{
Address: addr,
DialTimeout: cfg.dialTimeout(),
CallTimeout: cfg.callTimeout(),
Insecure: cfg.InsecureTransport,
})
if err != nil {
i.logger.Warn("failed to connect to chain gateway service", zap.String("address", addr), zap.Error(err))
return nil
}
i.logger.Info("connected to chain gateway service", zap.String("address", addr))
return client
}
func (i *Imp) initPaymentGatewayClient(cfg clientConfig) chainclient.Client {
addr := cfg.address()
if addr == "" {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
client, err := chainclient.New(ctx, chainclient.Config{
Address: addr,
DialTimeout: cfg.dialTimeout(),
CallTimeout: cfg.callTimeout(),
Insecure: cfg.InsecureTransport,
})
if err != nil {
i.logger.Warn("failed to connect to payment gateway service", zap.String("address", addr), zap.Error(err))
return nil
}
i.logger.Info("connected to payment gateway service", zap.String("address", addr))
return client
}
func (i *Imp) initMntxClient(cfg clientConfig) mntxclient.Client {
addr := cfg.address()
if addr == "" {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
client, err := mntxclient.New(ctx, mntxclient.Config{
Address: addr,
DialTimeout: cfg.dialTimeout(),
CallTimeout: cfg.callTimeout(),
Logger: i.logger.Named("client.mntx"),
})
if err != nil {
i.logger.Warn("Failed to connect to mntx gateway service", zap.String("address", addr), zap.Error(err))
return nil
}
i.logger.Info("Connected to mntx gateway service", zap.String("address", addr))
return client
}
func (i *Imp) initOracleClient(cfg clientConfig) oracleclient.Client {
addr := cfg.address()
if addr == "" {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), cfg.dialTimeout())
defer cancel()
client, err := oracleclient.New(ctx, oracleclient.Config{
Address: addr,
DialTimeout: cfg.dialTimeout(),
CallTimeout: cfg.callTimeout(),
Insecure: cfg.InsecureTransport,
})
if err != nil {
i.logger.Warn("Failed to connect to oracle service", zap.String("address", addr), zap.Error(err))
return nil
}
i.logger.Info("Connected to oracle service", zap.String("address", addr))
return client
}
func (i *Imp) closeClients() {
if i.discoveryClients != nil {
i.discoveryClients.Close()

View File

@@ -77,17 +77,6 @@ type limitsOverrideCfg struct {
MaxOps int `yaml:"max_ops"`
}
func (c clientConfig) address() string {
return strings.TrimSpace(c.Address)
}
func (c clientConfig) dialTimeout() time.Duration {
if c.DialTimeoutSecs <= 0 {
return 5 * time.Second
}
return time.Duration(c.DialTimeoutSecs) * time.Second
}
func (c clientConfig) callTimeout() time.Duration {
if c.CallTimeoutSecs <= 0 {
return 3 * time.Second

View File

@@ -16,7 +16,7 @@ type orchestratorDeps struct {
gatewayInvokeResolver orchestrator.GatewayInvokeResolver
}
func (i *Imp) initDependencies(cfg *config) *orchestratorDeps {
func (i *Imp) initDependencies(_ *config) *orchestratorDeps {
deps := &orchestratorDeps{}
if i.discoveryReg == nil {
if i.logger != nil {

View File

@@ -2,7 +2,10 @@ package orchestrator
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"sort"
"strings"
"time"
@@ -16,6 +19,7 @@ import (
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
type quotePaymentCommand struct {
@@ -23,55 +27,194 @@ type quotePaymentCommand struct {
logger mlogger.Logger
}
func (h *quotePaymentCommand) Execute(ctx context.Context, req *orchestratorv1.QuotePaymentRequest) gsresponse.Responder[orchestratorv1.QuotePaymentResponse] {
var (
errIdempotencyRequired = errors.New("idempotency key is required")
errPreviewWithIdempotency = errors.New("preview requests must not use idempotency key")
errIdempotencyParamMismatch = errors.New("idempotency key reuse with different parameters")
)
type quoteCtx struct {
orgID string
orgRef primitive.ObjectID
intent *orchestratorv1.PaymentIntent
previewOnly bool
idempotencyKey string
hash string
}
func (h *quotePaymentCommand) Execute(
ctx context.Context,
req *orchestratorv1.QuotePaymentRequest,
) gsresponse.Responder[orchestratorv1.QuotePaymentResponse] {
if err := h.engine.EnsureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
qc, err := h.prepareQuoteCtx(req)
if err != nil {
return h.mapQuoteErr(err)
}
quotesStore, err := ensureQuotesStore(h.engine.Repository())
if err != nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
quoteProto, err := h.quotePayment(ctx, quotesStore, qc, req)
if err != nil {
return h.mapQuoteErr(err)
}
return gsresponse.Success(&orchestratorv1.QuotePaymentResponse{Quote: quoteProto})
}
func (h *quotePaymentCommand) prepareQuoteCtx(req *orchestratorv1.QuotePaymentRequest) (*quoteCtx, error) {
orgRef, orgID, err := validateMetaAndOrgRef(req.GetMeta())
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
return nil, err
}
if err := requireNonNilIntent(req.GetIntent()); err != nil {
return nil, err
}
intent := req.GetIntent()
preview := req.GetPreviewOnly()
idem := strings.TrimSpace(req.GetIdempotencyKey())
if preview && idem != "" {
return nil, errPreviewWithIdempotency
}
if !preview && idem == "" {
return nil, errIdempotencyRequired
}
return &quoteCtx{
orgID: orgRef,
orgRef: orgID,
intent: intent,
previewOnly: preview,
idempotencyKey: idem,
hash: hashQuoteRequest(req),
}, nil
}
func (h *quotePaymentCommand) quotePayment(
ctx context.Context,
quotesStore storage.QuotesStore,
qc *quoteCtx,
req *orchestratorv1.QuotePaymentRequest,
) (*orchestratorv1.PaymentQuote, error) {
if qc.previewOnly {
quote, _, err := h.engine.BuildPaymentQuote(ctx, qc.orgID, req)
if err != nil {
h.logger.Warn("Failed to build preview payment quote", zap.Error(err), zap.String("org_ref", qc.orgID))
return nil, err
}
quote.QuoteRef = primitive.NewObjectID().Hex()
return quote, nil
}
existing, err := quotesStore.GetByIdempotencyKey(ctx, qc.idempotencyKey)
if err != nil && !errors.Is(err, storage.ErrQuoteNotFound) {
h.logger.Warn(
"Failed to lookup quote by idempotency key",
zap.Error(err),
mzap.ObjRef("org_ref", qc.orgRef),
zap.String("idempotency_key", qc.idempotencyKey),
)
return nil, err
}
if existing != nil {
if existing.Hash != qc.hash {
return nil, errIdempotencyParamMismatch
}
h.logger.Debug(
"Idempotent quote reused",
mzap.ObjRef("org_ref", qc.orgRef),
zap.String("idempotency_key", qc.idempotencyKey),
zap.String("quote_ref", existing.QuoteRef),
)
return modelQuoteToProto(existing.Quote), nil
}
quote, expiresAt, err := h.engine.BuildPaymentQuote(ctx, qc.orgID, req)
if err != nil {
h.logger.Warn(
"Failed to build payment quote",
zap.Error(err),
mzap.ObjRef("org_ref", qc.orgRef),
zap.String("idempotency_key", qc.idempotencyKey),
)
return nil, err
}
quoteRef := primitive.NewObjectID().Hex()
quote.QuoteRef = quoteRef
record := &model.PaymentQuoteRecord{
QuoteRef: quoteRef,
IdempotencyKey: qc.idempotencyKey,
Hash: qc.hash,
Intent: intentFromProto(qc.intent),
Quote: quoteSnapshotToModel(quote),
ExpiresAt: expiresAt,
}
record.SetID(primitive.NewObjectID())
record.SetOrganizationRef(qc.orgRef)
if err := quotesStore.Create(ctx, record); err != nil {
if errors.Is(err, storage.ErrDuplicateQuote) {
existing, getErr := quotesStore.GetByIdempotencyKey(ctx, qc.idempotencyKey)
if getErr == nil && existing != nil {
if existing.Hash != qc.hash {
return nil, errIdempotencyParamMismatch
}
return modelQuoteToProto(existing.Quote), nil
}
}
return nil, err
}
h.logger.Info(
"Stored payment quote",
zap.String("quote_ref", quoteRef),
mzap.ObjRef("org_ref", qc.orgRef),
zap.String("idempotency_key", qc.idempotencyKey),
zap.String("kind", qc.intent.GetKind().String()),
)
return quote, nil
}
func (h *quotePaymentCommand) mapQuoteErr(err error) gsresponse.Responder[orchestratorv1.QuotePaymentResponse] {
if errors.Is(err, errIdempotencyRequired) ||
errors.Is(err, errPreviewWithIdempotency) ||
errors.Is(err, errIdempotencyParamMismatch) {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
intent := req.GetIntent()
return gsresponse.Auto[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
quote, expiresAt, err := h.engine.BuildPaymentQuote(ctx, orgRef, req)
// TODO: temprorarary hashing function, replace with a proper solution later
func hashQuoteRequest(req *orchestratorv1.QuotePaymentRequest) string {
cloned := proto.Clone(req).(*orchestratorv1.QuotePaymentRequest)
cloned.Meta = nil
cloned.IdempotencyKey = ""
cloned.PreviewOnly = false
b, err := proto.MarshalOptions{Deterministic: true}.Marshal(cloned)
if err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
sum := sha256.Sum256([]byte("marshal_error"))
return hex.EncodeToString(sum[:])
}
if !req.GetPreviewOnly() {
quotesStore, err := ensureQuotesStore(h.engine.Repository())
if err != nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
quoteRef := primitive.NewObjectID().Hex()
quote.QuoteRef = quoteRef
record := &model.PaymentQuoteRecord{
QuoteRef: quoteRef,
Intent: intentFromProto(intent),
Quote: quoteSnapshotToModel(quote),
ExpiresAt: expiresAt,
}
record.SetID(primitive.NewObjectID())
record.SetOrganizationRef(orgID)
if err := quotesStore.Create(ctx, record); err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info(
"Stored payment quote",
zap.String("quote_ref", quoteRef),
mzap.ObjRef("org_ref", orgID),
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
zap.String("kind", intent.GetKind().String()),
)
}
return gsresponse.Success(&orchestratorv1.QuotePaymentResponse{Quote: quote})
sum := sha256.Sum256(b)
return hex.EncodeToString(sum[:])
}
type quotePaymentsCommand struct {
@@ -79,76 +222,98 @@ type quotePaymentsCommand struct {
logger mlogger.Logger
}
func (h *quotePaymentsCommand) Execute(ctx context.Context, req *orchestratorv1.QuotePaymentsRequest) gsresponse.Responder[orchestratorv1.QuotePaymentsResponse] {
var (
errBatchIdempotencyRequired = errors.New("idempotency key is required")
errBatchPreviewWithIdempotency = errors.New("preview requests must not use idempotency key")
errBatchIdempotencyParamMismatch = errors.New("idempotency key reuse with different parameters")
errBatchIdempotencyShapeMismatch = errors.New("idempotency key already used for a different quote shape")
)
type quotePaymentsCtx struct {
orgID string
orgRef primitive.ObjectID
previewOnly bool
idempotencyKey string
hash string
intentCount int
}
func (h *quotePaymentsCommand) Execute(
ctx context.Context,
req *orchestratorv1.QuotePaymentsRequest,
) gsresponse.Responder[orchestratorv1.QuotePaymentsResponse] {
if err := h.engine.EnsureRepository(ctx); err != nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if req == nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("nil request"))
}
orgID, orgRef, err := validateMetaAndOrgRef(req.GetMeta())
qc, intents, err := h.prepare(req)
if err != nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
intents := req.GetIntents()
if len(intents) == 0 {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, merrors.InvalidArgument("intents are required"))
return h.mapErr(err)
}
baseKey := strings.TrimSpace(req.GetIdempotencyKey())
quotes := make([]*orchestratorv1.PaymentQuote, 0, len(intents))
expires := make([]time.Time, 0, len(intents))
for i, intent := range intents {
if err := requireNonNilIntent(intent); err != nil {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
quoteReq := &orchestratorv1.QuotePaymentRequest{
Meta: req.GetMeta(),
IdempotencyKey: perIntentIdempotencyKey(baseKey, i, len(intents)),
Intent: intent,
PreviewOnly: req.GetPreviewOnly(),
}
quote, expiresAt, err := h.engine.BuildPaymentQuote(ctx, orgID, quoteReq)
quotesStore, err := ensureQuotesStore(h.engine.Repository())
if err != nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if qc.previewOnly {
quotes, expires, err := h.buildQuotes(ctx, req.GetMeta(), qc.idempotencyKey, intents, true)
if err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
quotes = append(quotes, quote)
expires = append(expires, expiresAt)
}
aggregate, err := aggregatePaymentQuotes(quotes)
if err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, merrors.InternalWrap(err, "quote aggregation failed"))
}
expiresAt, ok := minQuoteExpiry(expires)
if !ok {
return gsresponse.Auto[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, merrors.Internal("quote expiry missing"))
}
quoteRef := ""
if !req.GetPreviewOnly() {
quotesStore, err := ensureQuotesStore(h.engine.Repository())
aggregate, expiresAt, err := h.aggregate(quotes, expires)
if err != nil {
return gsresponse.Unavailable[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
quoteRef = primitive.NewObjectID().Hex()
record := &model.PaymentQuoteRecord{
QuoteRef: quoteRef,
Intents: intentsFromProto(intents),
Quotes: quoteSnapshotsFromProto(quotes),
ExpiresAt: expiresAt,
}
record.SetID(primitive.NewObjectID())
record.SetOrganizationRef(orgRef)
if err := quotesStore.Create(ctx, record); err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
h.logger.Info("Stored payment quotes",
zap.String("quote_ref", quoteRef), mzap.ObjRef("org_ref", orgRef),
zap.String("idempotency_key", baseKey), zap.Int("quote_count", len(quotes)),
)
_ = expiresAt
return gsresponse.Success(&orchestratorv1.QuotePaymentsResponse{
QuoteRef: "",
Aggregate: aggregate,
Quotes: quotes,
})
}
if rec, ok, err := h.tryReuse(ctx, quotesStore, qc); err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
} else if ok {
return gsresponse.Success(h.responseFromRecord(rec))
}
quotes, expires, err := h.buildQuotes(ctx, req.GetMeta(), qc.idempotencyKey, intents, false)
if err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
aggregate, expiresAt, err := h.aggregate(quotes, expires)
if err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
quoteRef := primitive.NewObjectID().Hex()
for _, q := range quotes {
if q != nil {
q.QuoteRef = quoteRef
}
}
rec, err := h.storeBatch(ctx, quotesStore, qc, quoteRef, intents, quotes, expiresAt)
if err != nil {
return gsresponse.Auto[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
if rec != nil {
return gsresponse.Success(h.responseFromRecord(rec))
}
h.logger.Info(
"Stored payment quotes",
h.logFields(qc, quoteRef, expiresAt, len(quotes))...,
)
return gsresponse.Success(&orchestratorv1.QuotePaymentsResponse{
QuoteRef: quoteRef,
Aggregate: aggregate,
@@ -156,6 +321,256 @@ func (h *quotePaymentsCommand) Execute(ctx context.Context, req *orchestratorv1.
})
}
func (h *quotePaymentsCommand) prepare(req *orchestratorv1.QuotePaymentsRequest) (*quotePaymentsCtx, []*orchestratorv1.PaymentIntent, error) {
orgRefStr, orgID, err := validateMetaAndOrgRef(req.GetMeta())
if err != nil {
return nil, nil, err
}
intents := req.GetIntents()
if len(intents) == 0 {
return nil, nil, merrors.InvalidArgument("intents are required")
}
for _, intent := range intents {
if err := requireNonNilIntent(intent); err != nil {
return nil, nil, err
}
}
preview := req.GetPreviewOnly()
idem := strings.TrimSpace(req.GetIdempotencyKey())
if preview && idem != "" {
return nil, nil, errBatchPreviewWithIdempotency
}
if !preview && idem == "" {
return nil, nil, errBatchIdempotencyRequired
}
hash, err := hashQuotePaymentsIntents(intents)
if err != nil {
return nil, nil, err
}
return &quotePaymentsCtx{
orgID: orgRefStr,
orgRef: orgID,
previewOnly: preview,
idempotencyKey: idem,
hash: hash,
intentCount: len(intents),
}, intents, nil
}
func (h *quotePaymentsCommand) tryReuse(
ctx context.Context,
quotesStore storage.QuotesStore,
qc *quotePaymentsCtx,
) (*model.PaymentQuoteRecord, bool, error) {
rec, err := quotesStore.GetByIdempotencyKey(ctx, qc.idempotencyKey)
if err != nil {
if errors.Is(err, storage.ErrQuoteNotFound) {
return nil, false, nil
}
h.logger.Warn(
"Failed to lookup payment quotes by idempotency key",
h.logFields(qc, "", time.Time{}, 0)...,
)
return nil, false, err
}
if len(rec.Quotes) == 0 {
return nil, false, errBatchIdempotencyShapeMismatch
}
if rec.Hash != qc.hash {
return nil, false, errBatchIdempotencyParamMismatch
}
h.logger.Debug(
"Idempotent payment quotes reused",
h.logFields(qc, rec.QuoteRef, rec.ExpiresAt, len(rec.Quotes))...,
)
return rec, true, nil
}
func (h *quotePaymentsCommand) buildQuotes(
ctx context.Context,
meta *orchestratorv1.RequestMeta,
baseKey string,
intents []*orchestratorv1.PaymentIntent,
preview bool,
) ([]*orchestratorv1.PaymentQuote, []time.Time, error) {
quotes := make([]*orchestratorv1.PaymentQuote, 0, len(intents))
expires := make([]time.Time, 0, len(intents))
for i, intent := range intents {
req := &orchestratorv1.QuotePaymentRequest{
Meta: meta,
IdempotencyKey: perIntentIdempotencyKey(baseKey, i, len(intents)),
Intent: intent,
PreviewOnly: preview,
}
q, exp, err := h.engine.BuildPaymentQuote(ctx, meta.GetOrganizationRef(), req)
if err != nil {
h.logger.Warn(
"Failed to build payment quote (batch item)",
zap.Int("idx", i),
zap.Error(err),
)
return nil, nil, err
}
quotes = append(quotes, q)
expires = append(expires, exp)
}
return quotes, expires, nil
}
func (h *quotePaymentsCommand) aggregate(
quotes []*orchestratorv1.PaymentQuote,
expires []time.Time,
) (*orchestratorv1.PaymentQuoteAggregate, time.Time, error) {
agg, err := aggregatePaymentQuotes(quotes)
if err != nil {
return nil, time.Time{}, merrors.InternalWrap(err, "quote aggregation failed")
}
expiresAt, ok := minQuoteExpiry(expires)
if !ok {
return nil, time.Time{}, merrors.Internal("quote expiry missing")
}
return agg, expiresAt, nil
}
func (h *quotePaymentsCommand) storeBatch(
ctx context.Context,
quotesStore storage.QuotesStore,
qc *quotePaymentsCtx,
quoteRef string,
intents []*orchestratorv1.PaymentIntent,
quotes []*orchestratorv1.PaymentQuote,
expiresAt time.Time,
) (*model.PaymentQuoteRecord, error) {
record := &model.PaymentQuoteRecord{
QuoteRef: quoteRef,
IdempotencyKey: qc.idempotencyKey,
Hash: qc.hash,
Intents: intentsFromProto(intents),
Quotes: quoteSnapshotsFromProto(quotes),
ExpiresAt: expiresAt,
}
record.SetID(primitive.NewObjectID())
record.SetOrganizationRef(qc.orgRef)
if err := quotesStore.Create(ctx, record); err != nil {
if errors.Is(err, storage.ErrDuplicateQuote) {
rec, ok, reuseErr := h.tryReuse(ctx, quotesStore, qc)
if reuseErr != nil {
return nil, reuseErr
}
if ok {
return rec, nil
}
return nil, err
}
return nil, err
}
return nil, nil
}
func (h *quotePaymentsCommand) responseFromRecord(rec *model.PaymentQuoteRecord) *orchestratorv1.QuotePaymentsResponse {
quotes := modelQuotesToProto(rec.Quotes)
for _, q := range quotes {
if q != nil {
q.QuoteRef = rec.QuoteRef
}
}
aggregate, _ := aggregatePaymentQuotes(quotes)
return &orchestratorv1.QuotePaymentsResponse{
QuoteRef: rec.QuoteRef,
Aggregate: aggregate,
Quotes: quotes,
}
}
func (h *quotePaymentsCommand) logFields(qc *quotePaymentsCtx, quoteRef string, expiresAt time.Time, quoteCount int) []zap.Field {
fields := []zap.Field{
mzap.ObjRef("org_ref", qc.orgRef),
zap.String("org_ref_str", qc.orgID),
zap.String("idempotency_key", qc.idempotencyKey),
zap.String("hash", qc.hash),
zap.Bool("preview_only", qc.previewOnly),
zap.Int("intent_count", qc.intentCount),
}
if quoteRef != "" {
fields = append(fields, zap.String("quote_ref", quoteRef))
}
if !expiresAt.IsZero() {
fields = append(fields, zap.Time("expires_at", expiresAt))
}
if quoteCount > 0 {
fields = append(fields, zap.Int("quote_count", quoteCount))
}
return fields
}
func (h *quotePaymentsCommand) mapErr(err error) gsresponse.Responder[orchestratorv1.QuotePaymentsResponse] {
if errors.Is(err, errBatchIdempotencyRequired) ||
errors.Is(err, errBatchPreviewWithIdempotency) ||
errors.Is(err, errBatchIdempotencyParamMismatch) ||
errors.Is(err, errBatchIdempotencyShapeMismatch) {
return gsresponse.InvalidArgument[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
return gsresponse.Auto[orchestratorv1.QuotePaymentsResponse](h.logger, mservice.PaymentOrchestrator, err)
}
func modelQuotesToProto(snaps []*model.PaymentQuoteSnapshot) []*orchestratorv1.PaymentQuote {
if len(snaps) == 0 {
return nil
}
out := make([]*orchestratorv1.PaymentQuote, 0, len(snaps))
for _, s := range snaps {
out = append(out, modelQuoteToProto(s))
}
return out
}
func hashQuotePaymentsIntents(intents []*orchestratorv1.PaymentIntent) (string, error) {
type item struct {
Idx int
H [32]byte
}
items := make([]item, 0, len(intents))
for i, intent := range intents {
b, err := proto.MarshalOptions{Deterministic: true}.Marshal(intent)
if err != nil {
return "", err
}
items = append(items, item{Idx: i, H: sha256.Sum256(b)})
}
sort.Slice(items, func(i, j int) bool { return items[i].Idx < items[j].Idx })
h := sha256.New()
h.Write([]byte("quote-payments-fp/v1"))
h.Write([]byte{0})
for _, it := range items {
h.Write(it.H[:])
h.Write([]byte{0})
}
return hex.EncodeToString(h.Sum(nil)), nil
}
type initiatePaymentsCommand struct {
engine paymentEngine
logger mlogger.Logger

View File

@@ -429,3 +429,15 @@ func (s *helperQuotesStore) GetByRef(_ context.Context, _ primitive.ObjectID, re
}
return nil, storage.ErrQuoteNotFound
}
func (s *helperQuotesStore) GetByIdempotencyKey(_ context.Context, ref string) (*model.PaymentQuoteRecord, error) {
if s.records == nil {
return nil, storage.ErrQuoteNotFound
}
for _, rec := range s.records {
if rec.IdempotencyKey == ref {
return rec, nil
}
}
return nil, storage.ErrQuoteNotFound
}

View File

@@ -423,6 +423,18 @@ func (s *stubQuotesStore) GetByRef(ctx context.Context, orgRef primitive.ObjectI
return nil, storage.ErrQuoteNotFound
}
func (s *stubQuotesStore) GetByIdempotencyKey(ctx context.Context, idempotencyKey string) (*model.PaymentQuoteRecord, error) {
if s.quotes == nil {
return nil, storage.ErrQuoteNotFound
}
for _, q := range s.quotes {
if q.IdempotencyKey == idempotencyKey {
return q, nil
}
}
return nil, storage.ErrQuoteNotFound
}
type stubRoutesStore struct {
routes []*model.PaymentRoute
}

View File

@@ -12,12 +12,14 @@ type PaymentQuoteRecord struct {
storable.Base `bson:",inline" json:",inline"`
model.OrganizationBoundBase `bson:",inline" json:",inline"`
QuoteRef string `bson:"quoteRef" json:"quoteRef"`
Intent PaymentIntent `bson:"intent,omitempty" json:"intent,omitempty"`
Intents []PaymentIntent `bson:"intents,omitempty" json:"intents,omitempty"`
Quote *PaymentQuoteSnapshot `bson:"quote,omitempty" json:"quote,omitempty"`
Quotes []*PaymentQuoteSnapshot `bson:"quotes,omitempty" json:"quotes,omitempty"`
ExpiresAt time.Time `bson:"expiresAt" json:"expiresAt"`
QuoteRef string `bson:"quoteRef" json:"quoteRef"`
IdempotencyKey string `bson:"idempotencyKey" json:"idempotencyKey"`
Intent PaymentIntent `bson:"intent,omitempty" json:"intent,omitempty"`
Intents []PaymentIntent `bson:"intents,omitempty" json:"intents,omitempty"`
Quote *PaymentQuoteSnapshot `bson:"quote,omitempty" json:"quote,omitempty"`
Quotes []*PaymentQuoteSnapshot `bson:"quotes,omitempty" json:"quotes,omitempty"`
ExpiresAt time.Time `bson:"expiresAt" json:"expiresAt"`
Hash string `bson:"hash" json:"hash"`
}
// Collection implements storable.Storable.

View File

@@ -65,6 +65,9 @@ func (q *Quotes) Create(ctx context.Context, quote *model.PaymentQuoteRecord) er
if quote.OrganizationRef == primitive.NilObjectID {
return merrors.InvalidArgument("quotesStore: organization_ref is required")
}
if quote.IdempotencyKey == "" {
return merrors.InvalidArgument("quotesStore: idempotency key is required")
}
if quote.ExpiresAt.IsZero() {
return merrors.InvalidArgument("quotesStore: expires_at is required")
}
@@ -120,6 +123,25 @@ func (q *Quotes) GetByRef(ctx context.Context, orgRef primitive.ObjectID, quoteR
return entity, nil
}
func (q *Quotes) GetByIdempotencyKey(ctx context.Context, idempotencyKey string) (*model.PaymentQuoteRecord, error) {
idempotencyKey = strings.TrimSpace(idempotencyKey)
if idempotencyKey == "" {
return nil, merrors.InvalidArgument("quotesStore: empty idempotency key")
}
entity := &model.PaymentQuoteRecord{}
query := repository.Filter("idempotencyKey", idempotencyKey)
if err := q.repo.FindOneByFilter(ctx, query, entity); err != nil {
if errors.Is(err, merrors.ErrNoData) {
return nil, storage.ErrQuoteNotFound
}
return nil, err
}
if !entity.ExpiresAt.IsZero() && time.Now().After(entity.ExpiresAt) {
return nil, storage.ErrQuoteNotFound
}
return entity, nil
}
var _ storage.QuotesStore = (*Quotes)(nil)
func int32Ptr(v int32) *int32 {

View File

@@ -55,6 +55,7 @@ type PaymentsStore interface {
type QuotesStore interface {
Create(ctx context.Context, quote *model.PaymentQuoteRecord) error
GetByRef(ctx context.Context, orgRef primitive.ObjectID, quoteRef string) (*model.PaymentQuoteRecord, error)
GetByIdempotencyKey(ctx context.Context, idempotencyKey string) (*model.PaymentQuoteRecord, error)
}
// RoutesStore manages allowed routing transitions.