From 4e70873a949137770c7a014d3125b42b1605d468 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Wed, 4 Mar 2026 05:02:52 +0100 Subject: [PATCH] mntx gateway throttling --- .../service/gateway/card_processor.go | 650 ++++++++++++++++-- .../service/gateway/card_processor_test.go | 139 ++++ .../service/gateway/payout_failure_policy.go | 87 +++ .../gateway/payout_failure_policy_test.go | 53 ++ .../mntx/internal/service/gateway/service.go | 3 + 5 files changed, 886 insertions(+), 46 deletions(-) create mode 100644 api/gateway/mntx/internal/service/gateway/payout_failure_policy.go create mode 100644 api/gateway/mntx/internal/service/gateway/payout_failure_policy_test.go diff --git a/api/gateway/mntx/internal/service/gateway/card_processor.go b/api/gateway/mntx/internal/service/gateway/card_processor.go index d0a9630a..17e68001 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor.go @@ -7,6 +7,8 @@ import ( "fmt" "net/http" "strings" + "sync" + "time" "github.com/shopspring/decimal" gatewayoutbox "github.com/tech/sendico/gateway/common/outbox" @@ -23,6 +25,12 @@ import ( mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" "go.mongodb.org/mongo-driver/v2/bson" "go.uber.org/zap" + "google.golang.org/protobuf/proto" +) + +const ( + defaultDispatchThrottleInterval = 150 * time.Millisecond + defaultMaxDispatchAttempts = uint32(5) ) type cardPayoutProcessor struct { @@ -37,6 +45,18 @@ type cardPayoutProcessor struct { perTxMinAmountMinor int64 perTxMinAmountMinorByCurrency map[string]int64 + dispatchThrottleInterval time.Duration + + dispatchMu sync.Mutex + nextDispatchAllowed time.Time + + retryPolicy payoutFailurePolicy + retryDelayFn func(attempt uint32) time.Duration + + retryMu sync.Mutex + retryTimers map[string]*time.Timer + retryCtx context.Context + retryStop context.CancelFunc } func mergePayoutStateWithExisting(state, existing *model.CardPayout) { @@ -121,6 +141,8 @@ func applyCardPayoutSendResult(state *model.CardPayout, result *monetix.CardPayo state.ProviderPaymentID = strings.TrimSpace(result.ProviderRequestID) if result.Accepted { state.Status = model.PayoutStatusWaiting + state.ProviderCode = "" + state.ProviderMessage = "" return } state.Status = model.PayoutStatusFailed @@ -149,13 +171,20 @@ func newCardPayoutProcessor( client *http.Client, producer msg.Producer, ) *cardPayoutProcessor { + retryCtx, retryStop := context.WithCancel(context.Background()) return &cardPayoutProcessor{ - logger: logger.Named("card_payout_processor"), - config: cfg, - clock: clock, - store: store, - httpClient: client, - producer: producer, + logger: logger.Named("card_payout_processor"), + config: cfg, + clock: clock, + store: store, + httpClient: client, + producer: producer, + dispatchThrottleInterval: defaultDispatchThrottleInterval, + retryPolicy: defaultPayoutFailurePolicy(), + retryDelayFn: retryDelayDuration, + retryTimers: map[string]*time.Timer{}, + retryCtx: retryCtx, + retryStop: retryStop, } } @@ -166,6 +195,10 @@ func (p *cardPayoutProcessor) applyGatewayDescriptor(descriptor *gatewayv1.Gatew minAmountMinor, perCurrency := perTxMinAmountPolicy(descriptor) p.perTxMinAmountMinor = minAmountMinor p.perTxMinAmountMinorByCurrency = perCurrency + p.dispatchThrottleInterval = dispatchThrottleIntervalFromDescriptor(descriptor, defaultDispatchThrottleInterval) + p.logger.Info("Configured payout dispatch throttle", + zap.Duration("dispatch_interval", p.dispatchThrottleInterval), + ) } func perTxMinAmountPolicy(descriptor *gatewayv1.GatewayInstanceDescriptor) (int64, map[string]int64) { @@ -243,6 +276,457 @@ func (p *cardPayoutProcessor) perTxMinimum(currency string) int64 { return minAmountMinor } +func dispatchThrottleIntervalFromDescriptor( + descriptor *gatewayv1.GatewayInstanceDescriptor, + fallback time.Duration, +) time.Duration { + if fallback < 0 { + fallback = 0 + } + if descriptor == nil || descriptor.GetLimits() == nil { + return fallback + } + velocity := descriptor.GetLimits().GetVelocityLimit() + if len(velocity) == 0 { + return fallback + } + + interval := time.Duration(0) + for bucket, maxOps := range velocity { + cleanBucket := strings.TrimSpace(bucket) + if cleanBucket == "" || maxOps <= 0 { + continue + } + window, err := time.ParseDuration(cleanBucket) + if err != nil || window <= 0 { + continue + } + candidate := window / time.Duration(maxOps) + if candidate <= 0 { + continue + } + if candidate > interval { + interval = candidate + } + } + if interval <= 0 { + return fallback + } + return interval +} + +func (p *cardPayoutProcessor) waitDispatchSlot(ctx context.Context) error { + if p == nil { + return merrors.Internal("card payout processor not initialised") + } + if ctx == nil { + ctx = context.Background() + } + if p.dispatchThrottleInterval <= 0 { + return nil + } + + for { + p.dispatchMu.Lock() + now := time.Now().UTC() + if p.nextDispatchAllowed.IsZero() || !p.nextDispatchAllowed.After(now) { + p.nextDispatchAllowed = now.Add(p.dispatchThrottleInterval) + p.dispatchMu.Unlock() + return nil + } + wait := p.nextDispatchAllowed.Sub(now) + p.dispatchMu.Unlock() + + timer := time.NewTimer(wait) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + } +} + +func (p *cardPayoutProcessor) stopRetries() { + if p == nil { + return + } + if p.retryStop != nil { + p.retryStop() + } + p.retryMu.Lock() + defer p.retryMu.Unlock() + for key, timer := range p.retryTimers { + if timer != nil { + timer.Stop() + } + delete(p.retryTimers, key) + } +} + +func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) { + if p == nil { + return + } + key := strings.TrimSpace(operationRef) + if key == "" { + return + } + p.retryMu.Lock() + defer p.retryMu.Unlock() + timer := p.retryTimers[key] + if timer != nil { + timer.Stop() + } + delete(p.retryTimers, key) +} + +func payoutAcceptedForState(state *model.CardPayout) bool { + if state == nil { + return false + } + switch state.Status { + case model.PayoutStatusFailed, model.PayoutStatusCancelled: + return false + default: + return true + } +} + +func cardPayoutResponseFromState( + state *model.CardPayout, + accepted bool, + errorCode string, + errorMessage string, +) *mntxv1.CardPayoutResponse { + return &mntxv1.CardPayoutResponse{ + Payout: StateToProto(state), + Accepted: accepted, + ProviderRequestId: strings.TrimSpace(firstNonEmpty(state.ProviderPaymentID)), + ErrorCode: strings.TrimSpace(errorCode), + ErrorMessage: strings.TrimSpace(errorMessage), + } +} + +func cardTokenPayoutResponseFromState( + state *model.CardPayout, + accepted bool, + errorCode string, + errorMessage string, +) *mntxv1.CardTokenPayoutResponse { + return &mntxv1.CardTokenPayoutResponse{ + Payout: StateToProto(state), + Accepted: accepted, + ProviderRequestId: strings.TrimSpace(firstNonEmpty(state.ProviderPaymentID)), + ErrorCode: strings.TrimSpace(errorCode), + ErrorMessage: strings.TrimSpace(errorMessage), + } +} + +func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv1.CardPayoutRequest) (*monetix.CardPayoutSendResult, error) { + if p == nil { + return nil, merrors.Internal("card payout processor not initialised") + } + if req == nil { + return nil, merrors.InvalidArgument("card payout request is required") + } + if err := p.waitDispatchSlot(ctx); err != nil { + return nil, err + } + client := monetix.NewClient(p.config, p.httpClient, p.logger) + apiReq := buildCardPayoutRequest(req.GetProjectId(), req) + return client.CreateCardPayout(ctx, apiReq) +} + +func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req *mntxv1.CardTokenPayoutRequest) (*monetix.CardPayoutSendResult, error) { + if p == nil { + return nil, merrors.Internal("card payout processor not initialised") + } + if req == nil { + return nil, merrors.InvalidArgument("card token payout request is required") + } + if err := p.waitDispatchSlot(ctx); err != nil { + return nil, err + } + client := monetix.NewClient(p.config, p.httpClient, p.logger) + apiReq := buildCardTokenPayoutRequest(req.GetProjectId(), req) + return client.CreateCardTokenPayout(ctx, apiReq) +} + +func maxDispatchAttempts(v uint32) uint32 { + if v == 0 { + return defaultMaxDispatchAttempts + } + return v +} + +func (p *cardPayoutProcessor) scheduleRetryTimer(operationRef string, delay time.Duration, run func()) { + if p == nil || run == nil { + return + } + key := strings.TrimSpace(operationRef) + if key == "" { + return + } + if delay < 0 { + delay = 0 + } + p.retryMu.Lock() + defer p.retryMu.Unlock() + if old := p.retryTimers[key]; old != nil { + old.Stop() + } + + var timer *time.Timer + timer = time.AfterFunc(delay, func() { + select { + case <-p.retryCtx.Done(): + return + default: + } + p.retryMu.Lock() + if p.retryTimers[key] == timer { + delete(p.retryTimers, key) + } + p.retryMu.Unlock() + run() + }) + p.retryTimers[key] = timer +} + +func retryDelayDuration(attempt uint32) time.Duration { + return time.Duration(retryDelayForAttempt(attempt)) * time.Second +} + +func (p *cardPayoutProcessor) scheduleCardPayoutRetry(req *mntxv1.CardPayoutRequest, failedAttempt uint32, maxAttempts uint32) { + if p == nil || req == nil { + return + } + maxAttempts = maxDispatchAttempts(maxAttempts) + nextAttempt := failedAttempt + 1 + if nextAttempt > maxAttempts { + return + } + cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest) + if !ok { + return + } + operationRef := findOperationRef(cloned.GetOperationRef(), cloned.GetPayoutId()) + delay := retryDelayDuration(failedAttempt) + if p.retryDelayFn != nil { + delay = p.retryDelayFn(failedAttempt) + } + p.scheduleRetryTimer(operationRef, delay, func() { + p.runCardPayoutRetry(cloned, nextAttempt, maxAttempts) + }) +} + +func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, failedAttempt uint32, maxAttempts uint32) { + if p == nil || req == nil { + return + } + maxAttempts = maxDispatchAttempts(maxAttempts) + nextAttempt := failedAttempt + 1 + if nextAttempt > maxAttempts { + return + } + cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest) + if !ok { + return + } + operationRef := findOperationRef(cloned.GetOperationRef(), cloned.GetPayoutId()) + delay := retryDelayDuration(failedAttempt) + if p.retryDelayFn != nil { + delay = p.retryDelayFn(failedAttempt) + } + p.scheduleRetryTimer(operationRef, delay, func() { + p.runCardTokenPayoutRetry(cloned, nextAttempt, maxAttempts) + }) +} + +func (p *cardPayoutProcessor) retryContext() (context.Context, context.CancelFunc) { + if p == nil { + return context.Background(), func() {} + } + ctx := p.retryCtx + if ctx == nil { + ctx = context.Background() + } + timeout := p.config.Timeout() + if timeout <= 0 { + return ctx, func() {} + } + return context.WithTimeout(ctx, timeout) +} + +func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest, attempt uint32, maxAttempts uint32) { + if p == nil || req == nil { + return + } + operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) + if operationRef == "" { + return + } + ctx, cancel := p.retryContext() + defer cancel() + + state, err := p.store.Payouts().FindByOperationRef(ctx, operationRef) + if err != nil || state == nil { + p.logger.Warn("Retry payout state lookup failed", + zap.String("operation_ref", operationRef), + zap.Uint32("attempt", attempt), + zap.Error(err), + ) + return + } + if isFinalStatus(state) { + p.clearRetryTimer(operationRef) + return + } + + result, err := p.dispatchCardPayout(ctx, req) + now := p.clock.Now() + maxAttempts = maxDispatchAttempts(maxAttempts) + if err != nil { + decision := p.retryPolicy.decideTransportFailure() + state.ProviderCode = "" + state.ProviderMessage = err.Error() + state.UpdatedAt = now + if decision.Action == payoutFailureActionRetry && attempt < maxAttempts { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist retryable payout transport failure", zap.Error(upErr)) + return + } + p.scheduleCardPayoutRetry(req, attempt, maxAttempts) + return + } + + state.Status = model.PayoutStatusFailed + state.FailureReason = payoutFailureReason("", err.Error()) + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist terminal payout transport failure", zap.Error(upErr)) + } + p.clearRetryTimer(operationRef) + return + } + + applyCardPayoutSendResult(state, result) + state.UpdatedAt = now + if result.Accepted { + state.FailureReason = "" + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist accepted payout retry result", zap.Error(upErr)) + } + p.clearRetryTimer(operationRef) + return + } + + decision := p.retryPolicy.decideProviderFailure(result.ErrorCode) + if decision.Action == payoutFailureActionRetry && attempt < maxAttempts { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist retryable payout provider failure", zap.Error(upErr)) + return + } + p.scheduleCardPayoutRetry(req, attempt, maxAttempts) + return + } + + state.Status = model.PayoutStatusFailed + state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist terminal payout provider failure", zap.Error(upErr)) + } + p.clearRetryTimer(operationRef) +} + +func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, attempt uint32, maxAttempts uint32) { + if p == nil || req == nil { + return + } + operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) + if operationRef == "" { + return + } + ctx, cancel := p.retryContext() + defer cancel() + + state, err := p.store.Payouts().FindByOperationRef(ctx, operationRef) + if err != nil || state == nil { + p.logger.Warn("Retry token payout state lookup failed", + zap.String("operation_ref", operationRef), + zap.Uint32("attempt", attempt), + zap.Error(err), + ) + return + } + if isFinalStatus(state) { + p.clearRetryTimer(operationRef) + return + } + + result, err := p.dispatchCardTokenPayout(ctx, req) + now := p.clock.Now() + maxAttempts = maxDispatchAttempts(maxAttempts) + if err != nil { + decision := p.retryPolicy.decideTransportFailure() + state.ProviderCode = "" + state.ProviderMessage = err.Error() + state.UpdatedAt = now + if decision.Action == payoutFailureActionRetry && attempt < maxAttempts { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist retryable token payout transport failure", zap.Error(upErr)) + return + } + p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts) + return + } + + state.Status = model.PayoutStatusFailed + state.FailureReason = payoutFailureReason("", err.Error()) + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist terminal token payout transport failure", zap.Error(upErr)) + } + p.clearRetryTimer(operationRef) + return + } + + applyCardPayoutSendResult(state, result) + state.UpdatedAt = now + if result.Accepted { + state.FailureReason = "" + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist accepted token payout retry result", zap.Error(upErr)) + } + p.clearRetryTimer(operationRef) + return + } + + decision := p.retryPolicy.decideProviderFailure(result.ErrorCode) + if decision.Action == payoutFailureActionRetry && attempt < maxAttempts { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist retryable token payout provider failure", zap.Error(upErr)) + return + } + p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts) + return + } + + state.Status = model.PayoutStatusFailed + state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) + if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { + p.logger.Warn("Failed to persist terminal token payout provider failure", zap.Error(upErr)) + } + p.clearRetryTimer(operationRef) +} + func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) { if p == nil { return nil, merrors.Internal("card payout processor not initialised") @@ -292,6 +776,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout if err != nil { return nil, err } + req.ProjectId = projectID now := p.clock.Now() @@ -307,38 +792,76 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout CustomerID: strings.TrimSpace(req.GetCustomerId()), AmountMinor: req.GetAmountMinor(), Currency: strings.ToUpper(strings.TrimSpace(req.GetCurrency())), - Status: model.PayoutStatusWaiting, + Status: model.PayoutStatusProcessing, CreatedAt: now, UpdatedAt: now, } // Keep CreatedAt/refs if record already exists. - _, _ = p.findAndMergePayoutState(ctx, state) + existing, _ := p.findAndMergePayoutState(ctx, state) + if existing != nil { + switch existing.Status { + case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled: + return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil + } + } - client := monetix.NewClient(p.config, p.httpClient, p.logger) - apiReq := buildCardPayoutRequest(projectID, req) - - result, err := client.CreateCardPayout(ctx, apiReq) + result, err := p.dispatchCardPayout(ctx, req) if err != nil { - state.Status = model.PayoutStatusFailed + decision := p.retryPolicy.decideTransportFailure() state.ProviderMessage = err.Error() state.UpdatedAt = p.clock.Now() + maxAttempts := maxDispatchAttempts(0) + if decision.Action == payoutFailureActionRetry && maxAttempts > 1 { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + if e := p.updatePayoutStatus(ctx, state); e != nil { + fields := append([]zap.Field{zap.Error(e)}, payoutStateLogFields(state)...) + p.logger.Warn("Failed to update payout status", fields...) + return nil, e + } + p.scheduleCardPayoutRetry(req, 1, maxAttempts) + return cardPayoutResponseFromState(state, true, "", ""), nil + } + state.Status = model.PayoutStatusFailed + state.FailureReason = payoutFailureReason("", err.Error()) if e := p.updatePayoutStatus(ctx, state); e != nil { fields := append([]zap.Field{zap.Error(e)}, payoutStateLogFields(state)...) p.logger.Warn("Failed to update payout status", fields...) + return nil, e } - fields := append([]zap.Field{zap.Error(err)}, payoutStateLogFields(state)...) p.logger.Warn("Monetix payout submission failed", fields...) - + p.clearRetryTimer(state.OperationRef) return nil, err } - // Provider request id is the provider-side payment id in your model. applyCardPayoutSendResult(state, result) - state.UpdatedAt = p.clock.Now() + accepted := result.Accepted + errorCode := strings.TrimSpace(result.ErrorCode) + errorMessage := strings.TrimSpace(result.ErrorMessage) + + if !result.Accepted { + decision := p.retryPolicy.decideProviderFailure(result.ErrorCode) + maxAttempts := maxDispatchAttempts(0) + if decision.Action == payoutFailureActionRetry && maxAttempts > 1 { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + accepted = true + errorCode = "" + errorMessage = "" + p.scheduleCardPayoutRetry(req, 1, maxAttempts) + } else { + state.Status = model.PayoutStatusFailed + state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) + p.clearRetryTimer(state.OperationRef) + } + } else { + p.clearRetryTimer(state.OperationRef) + } + if err := p.updatePayoutStatus(ctx, state); err != nil { p.logger.Warn("Failed to store payout", zap.Error(err), @@ -347,22 +870,16 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout zap.String("operation_ref", state.OperationRef), zap.String("idempotency_key", state.IdempotencyKey), ) - // do not fail request here: provider already answered and client expects response + return nil, err } - resp := &mntxv1.CardPayoutResponse{ - Payout: StateToProto(state), - Accepted: result.Accepted, - ProviderRequestId: result.ProviderRequestID, - ErrorCode: result.ErrorCode, - ErrorMessage: result.ErrorMessage, - } + resp := cardPayoutResponseFromState(state, accepted, errorCode, errorMessage) p.logger.Info("Card payout submission stored", zap.String("payment_ref", state.PaymentRef), zap.String("status", string(state.Status)), - zap.Bool("accepted", result.Accepted), - zap.String("provider_request_id", result.ProviderRequestID), + zap.Bool("accepted", accepted), + zap.String("provider_request_id", resp.GetProviderRequestId()), ) return resp, nil @@ -417,6 +934,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT if err != nil { return nil, err } + req.ProjectId = projectID now := p.clock.Now() state := &model.CardPayout{ @@ -428,34 +946,72 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT CustomerID: strings.TrimSpace(req.GetCustomerId()), AmountMinor: req.GetAmountMinor(), Currency: strings.ToUpper(strings.TrimSpace(req.GetCurrency())), - Status: model.PayoutStatusWaiting, + Status: model.PayoutStatusProcessing, CreatedAt: now, UpdatedAt: now, } - _, _ = p.findAndMergePayoutState(ctx, state) + existing, _ := p.findAndMergePayoutState(ctx, state) + if existing != nil { + switch existing.Status { + case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled: + return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil + } + } - client := monetix.NewClient(p.config, p.httpClient, p.logger) - apiReq := buildCardTokenPayoutRequest(projectID, req) - - result, err := client.CreateCardTokenPayout(ctx, apiReq) + result, err := p.dispatchCardTokenPayout(ctx, req) if err != nil { - state.Status = model.PayoutStatusFailed + decision := p.retryPolicy.decideTransportFailure() state.ProviderMessage = err.Error() state.UpdatedAt = p.clock.Now() + maxAttempts := maxDispatchAttempts(0) + if decision.Action == payoutFailureActionRetry && maxAttempts > 1 { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + if e := p.updatePayoutStatus(ctx, state); e != nil { + return nil, e + } + p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts) + return cardTokenPayoutResponseFromState(state, true, "", ""), nil + } - _ = p.updatePayoutStatus(ctx, state) - + state.Status = model.PayoutStatusFailed + state.FailureReason = payoutFailureReason("", err.Error()) + if e := p.updatePayoutStatus(ctx, state); e != nil { + return nil, e + } + p.clearRetryTimer(state.OperationRef) p.logger.Warn("Monetix token payout submission failed", zap.String("payment_ref", state.PaymentRef), zap.String("customer_id", state.CustomerID), zap.Error(err), ) - return nil, err } applyCardPayoutSendResult(state, result) + accepted := result.Accepted + errorCode := strings.TrimSpace(result.ErrorCode) + errorMessage := strings.TrimSpace(result.ErrorMessage) + + if !result.Accepted { + decision := p.retryPolicy.decideProviderFailure(result.ErrorCode) + maxAttempts := maxDispatchAttempts(0) + if decision.Action == payoutFailureActionRetry && maxAttempts > 1 { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + accepted = true + errorCode = "" + errorMessage = "" + p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts) + } else { + state.Status = model.PayoutStatusFailed + state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) + p.clearRetryTimer(state.OperationRef) + } + } else { + p.clearRetryTimer(state.OperationRef) + } state.UpdatedAt = p.clock.Now() if err := p.updatePayoutStatus(ctx, state); err != nil { @@ -463,19 +1019,13 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT return nil, err } - resp := &mntxv1.CardTokenPayoutResponse{ - Payout: StateToProto(state), - Accepted: result.Accepted, - ProviderRequestId: result.ProviderRequestID, - ErrorCode: result.ErrorCode, - ErrorMessage: result.ErrorMessage, - } + resp := cardTokenPayoutResponseFromState(state, accepted, errorCode, errorMessage) p.logger.Info("Card token payout submission stored", zap.String("payment_ref", state.PaymentRef), zap.String("status", string(state.Status)), - zap.Bool("accepted", result.Accepted), - zap.String("provider_request_id", result.ProviderRequestID), + zap.Bool("accepted", accepted), + zap.String("provider_request_id", resp.GetProviderRequestId()), ) return resp, nil @@ -635,10 +1185,18 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt state.FailureReason = existing.FailureReason } } + if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled { + if strings.TrimSpace(state.FailureReason) == "" { + state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage) + } + } if err := p.updatePayoutStatus(ctx, state); err != nil { p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err)) } + if isFinalStatus(state) { + p.clearRetryTimer(state.OperationRef) + } monetix.ObserveCallback(statusLabel) p.logger.Info("Monetix payout callback processed", diff --git a/api/gateway/mntx/internal/service/gateway/card_processor_test.go b/api/gateway/mntx/internal/service/gateway/card_processor_test.go index 80f8b7c6..ca230bc5 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor_test.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor_test.go @@ -8,6 +8,8 @@ import ( "fmt" "io" "net/http" + "strings" + "sync/atomic" "testing" "time" @@ -399,3 +401,140 @@ func TestCardPayoutProcessor_ProcessCallback_UpdatesMatchingOperationWithinSameP t.Fatalf("second parent payment ref mismatch: got=%q want=%q", got, want) } } + +func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineUntilSuccess(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + ProjectID: 99, + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + var calls atomic.Int32 + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + n := calls.Add(1) + resp := monetix.APIResponse{} + if n == 1 { + resp.Code = providerCodeDeclineAmountOrFrequencyLimit + resp.Message = "Decline due to amount or frequency limit" + body, _ := json.Marshal(resp) + return &http.Response{ + StatusCode: http.StatusTooManyRequests, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + } + resp.Operation.RequestID = "req-retry-success" + body, _ := json.Marshal(resp) + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2026, 3, 4, 1, 2, 3, 0, time.UTC)}, + repo, + httpClient, + nil, + ) + defer processor.stopRetries() + processor.dispatchThrottleInterval = 0 + processor.retryDelayFn = func(uint32) time.Duration { return 10 * time.Millisecond } + + req := validCardPayoutRequest() + resp, err := processor.Submit(context.Background(), req) + if err != nil { + t.Fatalf("submit returned error: %v", err) + } + if !resp.GetAccepted() { + t.Fatalf("expected accepted response when retry is scheduled") + } + + deadline := time.Now().Add(2 * time.Second) + for { + state, ok := repo.payouts.Get(req.GetPayoutId()) + if ok && state != nil && state.Status == model.PayoutStatusWaiting && state.ProviderPaymentID == "req-retry-success" { + break + } + if time.Now().After(deadline) { + t.Fatalf("timeout waiting for successful retry result") + } + time.Sleep(20 * time.Millisecond) + } + if got, want := calls.Load(), int32(2); got != want { + t.Fatalf("unexpected provider call count: got=%d want=%d", got, want) + } +} + +func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + ProjectID: 99, + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + var calls atomic.Int32 + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + _ = calls.Add(1) + resp := monetix.APIResponse{ + Code: providerCodeDeclineAmountOrFrequencyLimit, + Message: "Decline due to amount or frequency limit", + } + body, _ := json.Marshal(resp) + return &http.Response{ + StatusCode: http.StatusTooManyRequests, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2026, 3, 4, 1, 2, 3, 0, time.UTC)}, + repo, + httpClient, + nil, + ) + defer processor.stopRetries() + processor.dispatchThrottleInterval = 0 + processor.retryDelayFn = func(uint32) time.Duration { return time.Millisecond } + + req := validCardPayoutRequest() + resp, err := processor.Submit(context.Background(), req) + if err != nil { + t.Fatalf("submit returned error: %v", err) + } + if !resp.GetAccepted() { + t.Fatalf("expected accepted response when retry is scheduled") + } + + deadline := time.Now().Add(2 * time.Second) + for { + state, ok := repo.payouts.Get(req.GetPayoutId()) + if ok && state != nil && state.Status == model.PayoutStatusFailed { + if !strings.Contains(state.FailureReason, providerCodeDeclineAmountOrFrequencyLimit) { + t.Fatalf("expected failure reason to include provider code, got=%q", state.FailureReason) + } + break + } + if time.Now().After(deadline) { + t.Fatalf("timeout waiting for terminal failed status") + } + time.Sleep(10 * time.Millisecond) + } + if got, want := calls.Load(), int32(defaultMaxDispatchAttempts); got != want { + t.Fatalf("unexpected provider call count: got=%d want=%d", got, want) + } +} diff --git a/api/gateway/mntx/internal/service/gateway/payout_failure_policy.go b/api/gateway/mntx/internal/service/gateway/payout_failure_policy.go new file mode 100644 index 00000000..83b84e9d --- /dev/null +++ b/api/gateway/mntx/internal/service/gateway/payout_failure_policy.go @@ -0,0 +1,87 @@ +package gateway + +import ( + "strings" +) + +const ( + providerCodeDeclineAmountOrFrequencyLimit = "10101" +) + +type payoutFailureAction int + +const ( + payoutFailureActionFail payoutFailureAction = iota + 1 + payoutFailureActionRetry +) + +type payoutFailureDecision struct { + Action payoutFailureAction + Reason string +} + +type payoutFailurePolicy struct { + providerCodeActions map[string]payoutFailureAction +} + +func defaultPayoutFailurePolicy() payoutFailurePolicy { + return payoutFailurePolicy{ + providerCodeActions: map[string]payoutFailureAction{ + providerCodeDeclineAmountOrFrequencyLimit: payoutFailureActionRetry, + }, + } +} + +func (p payoutFailurePolicy) decideProviderFailure(code string) payoutFailureDecision { + normalized := strings.TrimSpace(code) + if normalized == "" { + return payoutFailureDecision{ + Action: payoutFailureActionFail, + Reason: "provider_failure", + } + } + if action, ok := p.providerCodeActions[normalized]; ok { + return payoutFailureDecision{ + Action: action, + Reason: "provider_code_" + normalized, + } + } + return payoutFailureDecision{ + Action: payoutFailureActionFail, + Reason: "provider_code_" + normalized, + } +} + +func (p payoutFailurePolicy) decideTransportFailure() payoutFailureDecision { + return payoutFailureDecision{ + Action: payoutFailureActionRetry, + Reason: "transport_failure", + } +} + +func payoutFailureReason(code, message string) string { + cleanCode := strings.TrimSpace(code) + cleanMessage := strings.TrimSpace(message) + switch { + case cleanCode != "" && cleanMessage != "": + return cleanCode + ": " + cleanMessage + case cleanCode != "": + return cleanCode + default: + return cleanMessage + } +} + +func retryDelayForAttempt(attempt uint32) int { + // Backoff in seconds by attempt number (attempt starts at 1). + switch { + case attempt <= 1: + return 5 + case attempt == 2: + return 15 + case attempt == 3: + return 30 + default: + return 60 + } +} diff --git a/api/gateway/mntx/internal/service/gateway/payout_failure_policy_test.go b/api/gateway/mntx/internal/service/gateway/payout_failure_policy_test.go new file mode 100644 index 00000000..58b8b3df --- /dev/null +++ b/api/gateway/mntx/internal/service/gateway/payout_failure_policy_test.go @@ -0,0 +1,53 @@ +package gateway + +import "testing" + +func TestPayoutFailurePolicy_DecideProviderFailure(t *testing.T) { + policy := defaultPayoutFailurePolicy() + + cases := []struct { + name string + code string + action payoutFailureAction + }{ + { + name: "retryable provider limit code", + code: providerCodeDeclineAmountOrFrequencyLimit, + action: payoutFailureActionRetry, + }, + { + name: "unknown provider code", + code: "99999", + action: payoutFailureActionFail, + }, + { + name: "empty provider code", + code: "", + action: payoutFailureActionFail, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Helper() + got := policy.decideProviderFailure(tc.code) + if got.Action != tc.action { + t.Fatalf("action mismatch: got=%v want=%v", got.Action, tc.action) + } + }) + } +} + +func TestPayoutFailureReason(t *testing.T) { + if got, want := payoutFailureReason("10101", "Decline due to amount or frequency limit"), "10101: Decline due to amount or frequency limit"; got != want { + t.Fatalf("failure reason mismatch: got=%q want=%q", got, want) + } + if got, want := payoutFailureReason("", "network error"), "network error"; got != want { + t.Fatalf("failure reason mismatch: got=%q want=%q", got, want) + } + if got, want := payoutFailureReason("10101", ""), "10101"; got != want { + t.Fatalf("failure reason mismatch: got=%q want=%q", got, want) + } +} + diff --git a/api/gateway/mntx/internal/service/gateway/service.go b/api/gateway/mntx/internal/service/gateway/service.go index 7a2c158a..5a7e1b04 100644 --- a/api/gateway/mntx/internal/service/gateway/service.go +++ b/api/gateway/mntx/internal/service/gateway/service.go @@ -112,6 +112,9 @@ func (s *Service) Shutdown() { if s == nil { return } + if s.card != nil { + s.card.stopRetries() + } s.outbox.Stop() if s.announcer != nil { s.announcer.Stop()