From d92be5eedc92fa791e634684e2c42a2085c3c44d Mon Sep 17 00:00:00 2001 From: Stephan D Date: Wed, 4 Mar 2026 09:18:15 +0100 Subject: [PATCH] fixed rescheduling supporting callback error code processing --- .../service/gateway/card_payout_store_test.go | 14 + .../service/gateway/card_processor.go | 329 ++++++++++++++++-- .../service/gateway/card_processor_test.go | 94 +++++ 3 files changed, 405 insertions(+), 32 deletions(-) diff --git a/api/gateway/mntx/internal/service/gateway/card_payout_store_test.go b/api/gateway/mntx/internal/service/gateway/card_payout_store_test.go index 4e5c9234..3af5a946 100644 --- a/api/gateway/mntx/internal/service/gateway/card_payout_store_test.go +++ b/api/gateway/mntx/internal/service/gateway/card_payout_store_test.go @@ -2,6 +2,7 @@ package gateway import ( "context" + "sync" "github.com/tech/sendico/gateway/mntx/storage" "github.com/tech/sendico/gateway/mntx/storage/model" @@ -24,6 +25,7 @@ func (r *mockRepository) Payouts() storage.PayoutsStore { // cardPayoutStore implements storage.PayoutsStore for tests. type cardPayoutStore struct { + mu sync.RWMutex data map[string]*model.CardPayout } @@ -44,6 +46,8 @@ func newCardPayoutStore() *cardPayoutStore { } func (s *cardPayoutStore) FindByIdempotencyKey(_ context.Context, key string) (*model.CardPayout, error) { + s.mu.RLock() + defer s.mu.RUnlock() for _, v := range s.data { if v.IdempotencyKey == key { return v, nil @@ -53,6 +57,8 @@ func (s *cardPayoutStore) FindByIdempotencyKey(_ context.Context, key string) (* } func (s *cardPayoutStore) FindByOperationRef(_ context.Context, ref string) (*model.CardPayout, error) { + s.mu.RLock() + defer s.mu.RUnlock() for _, v := range s.data { if v.OperationRef == ref { return v, nil @@ -62,6 +68,8 @@ func (s *cardPayoutStore) FindByOperationRef(_ context.Context, ref string) (*mo } func (s *cardPayoutStore) FindByPaymentID(_ context.Context, id string) (*model.CardPayout, error) { + s.mu.RLock() + defer s.mu.RUnlock() for _, v := range s.data { if v.PaymentRef == id { return v, nil @@ -71,17 +79,23 @@ func (s *cardPayoutStore) FindByPaymentID(_ context.Context, id string) (*model. } func (s *cardPayoutStore) Upsert(_ context.Context, record *model.CardPayout) error { + s.mu.Lock() + defer s.mu.Unlock() s.data[payoutStoreKey(record)] = record return nil } // Save is a helper for tests to pre-populate data. func (s *cardPayoutStore) Save(state *model.CardPayout) { + s.mu.Lock() + defer s.mu.Unlock() s.data[payoutStoreKey(state)] = state } // Get is a helper for tests to retrieve data. func (s *cardPayoutStore) Get(id string) (*model.CardPayout, bool) { + s.mu.RLock() + defer s.mu.RUnlock() if v, ok := s.data[id]; ok { return v, true } diff --git a/api/gateway/mntx/internal/service/gateway/card_processor.go b/api/gateway/mntx/internal/service/gateway/card_processor.go index b20b06ba..fe7beae7 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor.go @@ -46,6 +46,7 @@ type cardPayoutProcessor struct { perTxMinAmountMinor int64 perTxMinAmountMinorByCurrency map[string]int64 dispatchThrottleInterval time.Duration + dispatchMaxAttempts uint32 dispatchMu sync.Mutex nextDispatchAllowed time.Time @@ -57,6 +58,13 @@ type cardPayoutProcessor struct { retryTimers map[string]*time.Timer retryCtx context.Context retryStop context.CancelFunc + + retryReqMu sync.RWMutex + cardRetryRequests map[string]*mntxv1.CardPayoutRequest + cardTokenRetryRequest map[string]*mntxv1.CardTokenPayoutRequest + + attemptMu sync.Mutex + dispatchAttempts map[string]uint32 } func mergePayoutStateWithExisting(state, existing *model.CardPayout) { @@ -180,11 +188,15 @@ func newCardPayoutProcessor( httpClient: client, producer: producer, dispatchThrottleInterval: defaultDispatchThrottleInterval, + dispatchMaxAttempts: defaultMaxDispatchAttempts, retryPolicy: defaultPayoutFailurePolicy(), retryDelayFn: retryDelayDuration, retryTimers: map[string]*time.Timer{}, retryCtx: retryCtx, retryStop: retryStop, + cardRetryRequests: map[string]*mntxv1.CardPayoutRequest{}, + cardTokenRetryRequest: map[string]*mntxv1.CardTokenPayoutRequest{}, + dispatchAttempts: map[string]uint32{}, } } @@ -362,6 +374,14 @@ func (p *cardPayoutProcessor) stopRetries() { } delete(p.retryTimers, key) } + p.retryReqMu.Lock() + p.cardRetryRequests = map[string]*mntxv1.CardPayoutRequest{} + p.cardTokenRetryRequest = map[string]*mntxv1.CardTokenPayoutRequest{} + p.retryReqMu.Unlock() + + p.attemptMu.Lock() + p.dispatchAttempts = map[string]uint32{} + p.attemptMu.Unlock() } func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) { @@ -381,6 +401,147 @@ func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) { delete(p.retryTimers, key) } +func (p *cardPayoutProcessor) maxDispatchAttempts() uint32 { + if p == nil { + return defaultMaxDispatchAttempts + } + return maxDispatchAttempts(p.dispatchMaxAttempts) +} + +func (p *cardPayoutProcessor) rememberCardRetryRequest(req *mntxv1.CardPayoutRequest) { + if p == nil || req == nil { + return + } + key := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) + if key == "" { + return + } + cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest) + if !ok { + return + } + p.retryReqMu.Lock() + defer p.retryReqMu.Unlock() + p.cardRetryRequests[key] = cloned +} + +func (p *cardPayoutProcessor) rememberCardTokenRetryRequest(req *mntxv1.CardTokenPayoutRequest) { + if p == nil || req == nil { + return + } + key := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) + if key == "" { + return + } + cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest) + if !ok { + return + } + p.retryReqMu.Lock() + defer p.retryReqMu.Unlock() + p.cardTokenRetryRequest[key] = cloned +} + +func (p *cardPayoutProcessor) loadCardRetryRequest(operationRef string) *mntxv1.CardPayoutRequest { + if p == nil { + return nil + } + key := strings.TrimSpace(operationRef) + if key == "" { + return nil + } + p.retryReqMu.RLock() + defer p.retryReqMu.RUnlock() + req := p.cardRetryRequests[key] + if req == nil { + return nil + } + cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest) + if !ok { + return nil + } + return cloned +} + +func (p *cardPayoutProcessor) loadCardTokenRetryRequest(operationRef string) *mntxv1.CardTokenPayoutRequest { + if p == nil { + return nil + } + key := strings.TrimSpace(operationRef) + if key == "" { + return nil + } + p.retryReqMu.RLock() + defer p.retryReqMu.RUnlock() + req := p.cardTokenRetryRequest[key] + if req == nil { + return nil + } + cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest) + if !ok { + return nil + } + return cloned +} + +func (p *cardPayoutProcessor) incrementDispatchAttempt(operationRef string) uint32 { + if p == nil { + return 0 + } + key := strings.TrimSpace(operationRef) + if key == "" { + return 0 + } + p.attemptMu.Lock() + defer p.attemptMu.Unlock() + next := p.dispatchAttempts[key] + 1 + p.dispatchAttempts[key] = next + return next +} + +func (p *cardPayoutProcessor) currentDispatchAttempt(operationRef string) uint32 { + if p == nil { + return 0 + } + key := strings.TrimSpace(operationRef) + if key == "" { + return 0 + } + p.attemptMu.Lock() + defer p.attemptMu.Unlock() + return p.dispatchAttempts[key] +} + +func (p *cardPayoutProcessor) clearDispatchAttempt(operationRef string) { + if p == nil { + return + } + key := strings.TrimSpace(operationRef) + if key == "" { + return + } + p.attemptMu.Lock() + defer p.attemptMu.Unlock() + delete(p.dispatchAttempts, key) +} + +func (p *cardPayoutProcessor) clearRetryState(operationRef string) { + if p == nil { + return + } + key := strings.TrimSpace(operationRef) + if key == "" { + return + } + p.clearRetryTimer(key) + p.clearDispatchAttempt(key) + + p.retryReqMu.Lock() + defer p.retryReqMu.Unlock() + delete(p.cardRetryRequests, key) + delete(p.cardTokenRetryRequest, key) +} + func payoutAcceptedForState(state *model.CardPayout) bool { if state == nil { return false @@ -433,6 +594,12 @@ func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv if err := p.waitDispatchSlot(ctx); err != nil { return nil, err } + opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) + attempt := p.incrementDispatchAttempt(opRef) + p.logger.Info("Dispatching card payout attempt", + zap.String("operation_ref", opRef), + zap.Uint32("attempt", attempt), + ) client := monetix.NewClient(p.config, p.httpClient, p.logger) apiReq := buildCardPayoutRequest(req.GetProjectId(), req) return client.CreateCardPayout(ctx, apiReq) @@ -448,6 +615,12 @@ func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req * if err := p.waitDispatchSlot(ctx); err != nil { return nil, err } + opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) + attempt := p.incrementDispatchAttempt(opRef) + p.logger.Info("Dispatching card token payout attempt", + zap.String("operation_ref", opRef), + zap.Uint32("attempt", attempt), + ) client := monetix.NewClient(p.config, p.httpClient, p.logger) apiReq := buildCardTokenPayoutRequest(req.GetProjectId(), req) return client.CreateCardTokenPayout(ctx, apiReq) @@ -516,6 +689,13 @@ func (p *cardPayoutProcessor) scheduleCardPayoutRetry(req *mntxv1.CardPayoutRequ if p.retryDelayFn != nil { delay = p.retryDelayFn(failedAttempt) } + p.logger.Info("Scheduling card payout retry", + zap.String("operation_ref", operationRef), + zap.Uint32("failed_attempt", failedAttempt), + zap.Uint32("next_attempt", nextAttempt), + zap.Uint32("max_attempts", maxAttempts), + zap.Duration("delay", delay), + ) p.scheduleRetryTimer(operationRef, delay, func() { p.runCardPayoutRetry(cloned, nextAttempt, maxAttempts) }) @@ -539,6 +719,13 @@ func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry(req *mntxv1.CardToken if p.retryDelayFn != nil { delay = p.retryDelayFn(failedAttempt) } + p.logger.Info("Scheduling card token payout retry", + zap.String("operation_ref", operationRef), + zap.Uint32("failed_attempt", failedAttempt), + zap.Uint32("next_attempt", nextAttempt), + zap.Uint32("max_attempts", maxAttempts), + zap.Duration("delay", delay), + ) p.scheduleRetryTimer(operationRef, delay, func() { p.runCardTokenPayoutRetry(cloned, nextAttempt, maxAttempts) }) @@ -567,6 +754,11 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest, if operationRef == "" { return } + p.logger.Info("Executing scheduled card payout retry", + zap.String("operation_ref", operationRef), + zap.Uint32("attempt", attempt), + zap.Uint32("max_attempts", maxAttempts), + ) ctx, cancel := p.retryContext() defer cancel() @@ -580,7 +772,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest, return } if isFinalStatus(state) { - p.clearRetryTimer(operationRef) + p.clearRetryState(operationRef) return } @@ -608,7 +800,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest, if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { p.logger.Warn("Failed to persist terminal payout transport failure", zap.Error(upErr)) } - p.clearRetryTimer(operationRef) + p.clearRetryState(operationRef) return } @@ -640,7 +832,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest, if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { p.logger.Warn("Failed to persist terminal payout provider failure", zap.Error(upErr)) } - p.clearRetryTimer(operationRef) + p.clearRetryState(operationRef) } func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, attempt uint32, maxAttempts uint32) { @@ -651,6 +843,11 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou if operationRef == "" { return } + p.logger.Info("Executing scheduled card token payout retry", + zap.String("operation_ref", operationRef), + zap.Uint32("attempt", attempt), + zap.Uint32("max_attempts", maxAttempts), + ) ctx, cancel := p.retryContext() defer cancel() @@ -664,7 +861,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou return } if isFinalStatus(state) { - p.clearRetryTimer(operationRef) + p.clearRetryState(operationRef) return } @@ -692,7 +889,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { p.logger.Warn("Failed to persist terminal token payout transport failure", zap.Error(upErr)) } - p.clearRetryTimer(operationRef) + p.clearRetryState(operationRef) return } @@ -724,7 +921,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { p.logger.Warn("Failed to persist terminal token payout provider failure", zap.Error(upErr)) } - p.clearRetryTimer(operationRef) + p.clearRetryState(operationRef) } func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) { @@ -798,20 +995,24 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout } // Keep CreatedAt/refs if record already exists. - existing, _ := p.findAndMergePayoutState(ctx, state) + existing, err := p.findAndMergePayoutState(ctx, state) + if err != nil { + return nil, err + } if existing != nil { switch existing.Status { case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled: return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil } } + p.rememberCardRetryRequest(req) result, err := p.dispatchCardPayout(ctx, req) if err != nil { decision := p.retryPolicy.decideTransportFailure() state.ProviderMessage = err.Error() state.UpdatedAt = p.clock.Now() - maxAttempts := maxDispatchAttempts(0) + maxAttempts := p.maxDispatchAttempts() if decision.Action == payoutFailureActionRetry && maxAttempts > 1 { state.Status = model.PayoutStatusProcessing state.FailureReason = "" @@ -833,7 +1034,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout } fields := append([]zap.Field{zap.Error(err)}, payoutStateLogFields(state)...) p.logger.Warn("Monetix payout submission failed", fields...) - p.clearRetryTimer(state.OperationRef) + p.clearRetryState(state.OperationRef) return nil, err } @@ -842,21 +1043,24 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout accepted := result.Accepted errorCode := strings.TrimSpace(result.ErrorCode) errorMessage := strings.TrimSpace(result.ErrorMessage) + scheduleRetry := false + retryMaxAttempts := uint32(0) if !result.Accepted { decision := p.retryPolicy.decideProviderFailure(result.ErrorCode) - maxAttempts := maxDispatchAttempts(0) + maxAttempts := p.maxDispatchAttempts() if decision.Action == payoutFailureActionRetry && maxAttempts > 1 { state.Status = model.PayoutStatusProcessing state.FailureReason = "" accepted = true errorCode = "" errorMessage = "" - p.scheduleCardPayoutRetry(req, 1, maxAttempts) + scheduleRetry = true + retryMaxAttempts = maxAttempts } else { state.Status = model.PayoutStatusFailed state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) - p.clearRetryTimer(state.OperationRef) + p.clearRetryState(state.OperationRef) } } else { p.clearRetryTimer(state.OperationRef) @@ -872,6 +1076,9 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout ) return nil, err } + if scheduleRetry { + p.scheduleCardPayoutRetry(req, 1, retryMaxAttempts) + } resp := cardPayoutResponseFromState(state, accepted, errorCode, errorMessage) @@ -951,20 +1158,24 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT UpdatedAt: now, } - existing, _ := p.findAndMergePayoutState(ctx, state) + existing, err := p.findAndMergePayoutState(ctx, state) + if err != nil { + return nil, err + } if existing != nil { switch existing.Status { case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled: return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil } } + p.rememberCardTokenRetryRequest(req) result, err := p.dispatchCardTokenPayout(ctx, req) if err != nil { decision := p.retryPolicy.decideTransportFailure() state.ProviderMessage = err.Error() state.UpdatedAt = p.clock.Now() - maxAttempts := maxDispatchAttempts(0) + maxAttempts := p.maxDispatchAttempts() if decision.Action == payoutFailureActionRetry && maxAttempts > 1 { state.Status = model.PayoutStatusProcessing state.FailureReason = "" @@ -980,7 +1191,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT if e := p.updatePayoutStatus(ctx, state); e != nil { return nil, e } - p.clearRetryTimer(state.OperationRef) + p.clearRetryState(state.OperationRef) p.logger.Warn("Monetix token payout submission failed", zap.String("payment_ref", state.PaymentRef), zap.String("customer_id", state.CustomerID), @@ -993,21 +1204,24 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT accepted := result.Accepted errorCode := strings.TrimSpace(result.ErrorCode) errorMessage := strings.TrimSpace(result.ErrorMessage) + scheduleRetry := false + retryMaxAttempts := uint32(0) if !result.Accepted { decision := p.retryPolicy.decideProviderFailure(result.ErrorCode) - maxAttempts := maxDispatchAttempts(0) + maxAttempts := p.maxDispatchAttempts() if decision.Action == payoutFailureActionRetry && maxAttempts > 1 { state.Status = model.PayoutStatusProcessing state.FailureReason = "" accepted = true errorCode = "" errorMessage = "" - p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts) + scheduleRetry = true + retryMaxAttempts = maxAttempts } else { state.Status = model.PayoutStatusFailed state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) - p.clearRetryTimer(state.OperationRef) + p.clearRetryState(state.OperationRef) } } else { p.clearRetryTimer(state.OperationRef) @@ -1018,6 +1232,9 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT p.logger.Warn("Failed to update payout status", zap.Error(err)) return nil, err } + if scheduleRetry { + p.scheduleCardTokenPayoutRetry(req, 1, retryMaxAttempts) + } resp := cardTokenPayoutResponseFromState(state, accepted, errorCode, errorMessage) @@ -1179,23 +1396,70 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt ) return http.StatusInternalServerError, err } - if existing != nil { - // keep failure reason if you want, or override depending on callback semantics - if state.FailureReason == "" { - state.FailureReason = existing.FailureReason - } - } - if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled { - if strings.TrimSpace(state.FailureReason) == "" { - state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage) - } + operationRef := strings.TrimSpace(state.OperationRef) + if existing != nil && strings.TrimSpace(state.FailureReason) == "" { + state.FailureReason = strings.TrimSpace(existing.FailureReason) } - if err := p.updatePayoutStatus(ctx, state); err != nil { - p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err)) + retryScheduled := false + if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled { + decision := p.retryPolicy.decideProviderFailure(state.ProviderCode) + attemptsUsed := p.currentDispatchAttempt(operationRef) + maxAttempts := p.maxDispatchAttempts() + if decision.Action == payoutFailureActionRetry && attemptsUsed > 0 && attemptsUsed < maxAttempts { + if req := p.loadCardRetryRequest(operationRef); req != nil { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + p.logger.Info("Callback decline is retryable; scheduling card payout retry", + zap.String("operation_ref", operationRef), + zap.String("provider_code", strings.TrimSpace(state.ProviderCode)), + zap.Uint32("attempts_used", attemptsUsed), + zap.Uint32("max_attempts", maxAttempts), + ) + if err := p.updatePayoutStatus(ctx, state); err != nil { + p.logger.Warn("Failed to persist callback retry scheduling state", zap.Error(err)) + return http.StatusInternalServerError, err + } + p.scheduleCardPayoutRetry(req, attemptsUsed, maxAttempts) + retryScheduled = true + } else if req := p.loadCardTokenRetryRequest(operationRef); req != nil { + state.Status = model.PayoutStatusProcessing + state.FailureReason = "" + p.logger.Info("Callback decline is retryable; scheduling card token payout retry", + zap.String("operation_ref", operationRef), + zap.String("provider_code", strings.TrimSpace(state.ProviderCode)), + zap.Uint32("attempts_used", attemptsUsed), + zap.Uint32("max_attempts", maxAttempts), + ) + if err := p.updatePayoutStatus(ctx, state); err != nil { + p.logger.Warn("Failed to persist callback token retry scheduling state", zap.Error(err)) + return http.StatusInternalServerError, err + } + p.scheduleCardTokenPayoutRetry(req, attemptsUsed, maxAttempts) + retryScheduled = true + } else { + p.logger.Warn("Retryable callback decline received but no retry request snapshot found", + zap.String("operation_ref", operationRef), + zap.String("provider_code", strings.TrimSpace(state.ProviderCode)), + zap.Uint32("attempts_used", attemptsUsed), + zap.Uint32("max_attempts", maxAttempts), + ) + } + } + if !retryScheduled && strings.TrimSpace(state.FailureReason) == "" { + state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage) + } + } else if state.Status == model.PayoutStatusSuccess { + state.FailureReason = "" } - if isFinalStatus(state) { - p.clearRetryTimer(state.OperationRef) + + if !retryScheduled { + if err := p.updatePayoutStatus(ctx, state); err != nil { + p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err)) + } + if isFinalStatus(state) { + p.clearRetryState(operationRef) + } } monetix.ObserveCallback(statusLabel) @@ -1204,6 +1468,7 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt zap.String("status", statusLabel), zap.String("provider_code", state.ProviderCode), zap.String("provider_message", state.ProviderMessage), + zap.Bool("retry_scheduled", retryScheduled), zap.String("masked_account", cb.Account.Number), ) diff --git a/api/gateway/mntx/internal/service/gateway/card_processor_test.go b/api/gateway/mntx/internal/service/gateway/card_processor_test.go index ca230bc5..35d9a443 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor_test.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor_test.go @@ -538,3 +538,97 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *test t.Fatalf("unexpected provider call count: got=%d want=%d", got, want) } } + +func TestCardPayoutProcessor_ProcessCallback_RetryableDeclineSchedulesRetry(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + ProjectID: 99, + StatusSuccess: "success", + StatusProcessing: "processing", + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + var calls atomic.Int32 + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + n := calls.Add(1) + resp := monetix.APIResponse{} + if n == 1 { + resp.Operation.RequestID = "req-initial" + } else { + resp.Operation.RequestID = "req-after-callback-retry" + } + body, _ := json.Marshal(resp) + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2026, 3, 4, 2, 0, 0, 0, time.UTC)}, + repo, + httpClient, + nil, + ) + defer processor.stopRetries() + processor.dispatchThrottleInterval = 0 + processor.retryDelayFn = func(uint32) time.Duration { return 5 * time.Millisecond } + + req := validCardPayoutRequest() + resp, err := processor.Submit(context.Background(), req) + if err != nil { + t.Fatalf("submit returned error: %v", err) + } + if !resp.GetAccepted() { + t.Fatalf("expected accepted submit response") + } + + cb := baseCallback() + cb.Payment.ID = req.GetPayoutId() + cb.Payment.Status = "failed" + cb.Operation.Status = "failed" + cb.Operation.Code = providerCodeDeclineAmountOrFrequencyLimit + cb.Operation.Message = "Decline due to amount or frequency limit" + cb.Payment.Sum.Currency = "RUB" + + sig, err := monetix.SignPayload(cb, cfg.SecretKey) + if err != nil { + t.Fatalf("failed to sign callback: %v", err) + } + cb.Signature = sig + payload, err := json.Marshal(cb) + if err != nil { + t.Fatalf("failed to marshal callback: %v", err) + } + + status, err := processor.ProcessCallback(context.Background(), payload) + if err != nil { + t.Fatalf("process callback returned error: %v", err) + } + if status != http.StatusOK { + t.Fatalf("unexpected callback status: %d", status) + } + + deadline := time.Now().Add(2 * time.Second) + for { + state, ok := repo.payouts.Get(req.GetPayoutId()) + if ok && state != nil && state.Status == model.PayoutStatusWaiting && state.ProviderPaymentID == "req-after-callback-retry" { + break + } + if time.Now().After(deadline) { + t.Fatalf("timeout waiting for callback-scheduled retry result") + } + time.Sleep(10 * time.Millisecond) + } + + if got, want := calls.Load(), int32(2); got != want { + t.Fatalf("unexpected provider call count: got=%d want=%d", got, want) + } +}