fixed rescheduling supporting callback error code processing #631

Merged
tech merged 1 commits from mntx-627 into main 2026-03-04 08:19:13 +00:00
3 changed files with 405 additions and 32 deletions

View File

@@ -2,6 +2,7 @@ package gateway
import (
"context"
"sync"
"github.com/tech/sendico/gateway/mntx/storage"
"github.com/tech/sendico/gateway/mntx/storage/model"
@@ -24,6 +25,7 @@ func (r *mockRepository) Payouts() storage.PayoutsStore {
// cardPayoutStore implements storage.PayoutsStore for tests.
type cardPayoutStore struct {
mu sync.RWMutex
data map[string]*model.CardPayout
}
@@ -44,6 +46,8 @@ func newCardPayoutStore() *cardPayoutStore {
}
func (s *cardPayoutStore) FindByIdempotencyKey(_ context.Context, key string) (*model.CardPayout, error) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, v := range s.data {
if v.IdempotencyKey == key {
return v, nil
@@ -53,6 +57,8 @@ func (s *cardPayoutStore) FindByIdempotencyKey(_ context.Context, key string) (*
}
func (s *cardPayoutStore) FindByOperationRef(_ context.Context, ref string) (*model.CardPayout, error) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, v := range s.data {
if v.OperationRef == ref {
return v, nil
@@ -62,6 +68,8 @@ func (s *cardPayoutStore) FindByOperationRef(_ context.Context, ref string) (*mo
}
func (s *cardPayoutStore) FindByPaymentID(_ context.Context, id string) (*model.CardPayout, error) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, v := range s.data {
if v.PaymentRef == id {
return v, nil
@@ -71,17 +79,23 @@ func (s *cardPayoutStore) FindByPaymentID(_ context.Context, id string) (*model.
}
func (s *cardPayoutStore) Upsert(_ context.Context, record *model.CardPayout) error {
s.mu.Lock()
defer s.mu.Unlock()
s.data[payoutStoreKey(record)] = record
return nil
}
// Save is a helper for tests to pre-populate data.
func (s *cardPayoutStore) Save(state *model.CardPayout) {
s.mu.Lock()
defer s.mu.Unlock()
s.data[payoutStoreKey(state)] = state
}
// Get is a helper for tests to retrieve data.
func (s *cardPayoutStore) Get(id string) (*model.CardPayout, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
if v, ok := s.data[id]; ok {
return v, true
}

View File

@@ -46,6 +46,7 @@ type cardPayoutProcessor struct {
perTxMinAmountMinor int64
perTxMinAmountMinorByCurrency map[string]int64
dispatchThrottleInterval time.Duration
dispatchMaxAttempts uint32
dispatchMu sync.Mutex
nextDispatchAllowed time.Time
@@ -57,6 +58,13 @@ type cardPayoutProcessor struct {
retryTimers map[string]*time.Timer
retryCtx context.Context
retryStop context.CancelFunc
retryReqMu sync.RWMutex
cardRetryRequests map[string]*mntxv1.CardPayoutRequest
cardTokenRetryRequest map[string]*mntxv1.CardTokenPayoutRequest
attemptMu sync.Mutex
dispatchAttempts map[string]uint32
}
func mergePayoutStateWithExisting(state, existing *model.CardPayout) {
@@ -180,11 +188,15 @@ func newCardPayoutProcessor(
httpClient: client,
producer: producer,
dispatchThrottleInterval: defaultDispatchThrottleInterval,
dispatchMaxAttempts: defaultMaxDispatchAttempts,
retryPolicy: defaultPayoutFailurePolicy(),
retryDelayFn: retryDelayDuration,
retryTimers: map[string]*time.Timer{},
retryCtx: retryCtx,
retryStop: retryStop,
cardRetryRequests: map[string]*mntxv1.CardPayoutRequest{},
cardTokenRetryRequest: map[string]*mntxv1.CardTokenPayoutRequest{},
dispatchAttempts: map[string]uint32{},
}
}
@@ -362,6 +374,14 @@ func (p *cardPayoutProcessor) stopRetries() {
}
delete(p.retryTimers, key)
}
p.retryReqMu.Lock()
p.cardRetryRequests = map[string]*mntxv1.CardPayoutRequest{}
p.cardTokenRetryRequest = map[string]*mntxv1.CardTokenPayoutRequest{}
p.retryReqMu.Unlock()
p.attemptMu.Lock()
p.dispatchAttempts = map[string]uint32{}
p.attemptMu.Unlock()
}
func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) {
@@ -381,6 +401,147 @@ func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) {
delete(p.retryTimers, key)
}
func (p *cardPayoutProcessor) maxDispatchAttempts() uint32 {
if p == nil {
return defaultMaxDispatchAttempts
}
return maxDispatchAttempts(p.dispatchMaxAttempts)
}
func (p *cardPayoutProcessor) rememberCardRetryRequest(req *mntxv1.CardPayoutRequest) {
if p == nil || req == nil {
return
}
key := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if key == "" {
return
}
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
if !ok {
return
}
p.retryReqMu.Lock()
defer p.retryReqMu.Unlock()
p.cardRetryRequests[key] = cloned
}
func (p *cardPayoutProcessor) rememberCardTokenRetryRequest(req *mntxv1.CardTokenPayoutRequest) {
if p == nil || req == nil {
return
}
key := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if key == "" {
return
}
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
if !ok {
return
}
p.retryReqMu.Lock()
defer p.retryReqMu.Unlock()
p.cardTokenRetryRequest[key] = cloned
}
func (p *cardPayoutProcessor) loadCardRetryRequest(operationRef string) *mntxv1.CardPayoutRequest {
if p == nil {
return nil
}
key := strings.TrimSpace(operationRef)
if key == "" {
return nil
}
p.retryReqMu.RLock()
defer p.retryReqMu.RUnlock()
req := p.cardRetryRequests[key]
if req == nil {
return nil
}
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
if !ok {
return nil
}
return cloned
}
func (p *cardPayoutProcessor) loadCardTokenRetryRequest(operationRef string) *mntxv1.CardTokenPayoutRequest {
if p == nil {
return nil
}
key := strings.TrimSpace(operationRef)
if key == "" {
return nil
}
p.retryReqMu.RLock()
defer p.retryReqMu.RUnlock()
req := p.cardTokenRetryRequest[key]
if req == nil {
return nil
}
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
if !ok {
return nil
}
return cloned
}
func (p *cardPayoutProcessor) incrementDispatchAttempt(operationRef string) uint32 {
if p == nil {
return 0
}
key := strings.TrimSpace(operationRef)
if key == "" {
return 0
}
p.attemptMu.Lock()
defer p.attemptMu.Unlock()
next := p.dispatchAttempts[key] + 1
p.dispatchAttempts[key] = next
return next
}
func (p *cardPayoutProcessor) currentDispatchAttempt(operationRef string) uint32 {
if p == nil {
return 0
}
key := strings.TrimSpace(operationRef)
if key == "" {
return 0
}
p.attemptMu.Lock()
defer p.attemptMu.Unlock()
return p.dispatchAttempts[key]
}
func (p *cardPayoutProcessor) clearDispatchAttempt(operationRef string) {
if p == nil {
return
}
key := strings.TrimSpace(operationRef)
if key == "" {
return
}
p.attemptMu.Lock()
defer p.attemptMu.Unlock()
delete(p.dispatchAttempts, key)
}
func (p *cardPayoutProcessor) clearRetryState(operationRef string) {
if p == nil {
return
}
key := strings.TrimSpace(operationRef)
if key == "" {
return
}
p.clearRetryTimer(key)
p.clearDispatchAttempt(key)
p.retryReqMu.Lock()
defer p.retryReqMu.Unlock()
delete(p.cardRetryRequests, key)
delete(p.cardTokenRetryRequest, key)
}
func payoutAcceptedForState(state *model.CardPayout) bool {
if state == nil {
return false
@@ -433,6 +594,12 @@ func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv
if err := p.waitDispatchSlot(ctx); err != nil {
return nil, err
}
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
attempt := p.incrementDispatchAttempt(opRef)
p.logger.Info("Dispatching card payout attempt",
zap.String("operation_ref", opRef),
zap.Uint32("attempt", attempt),
)
client := monetix.NewClient(p.config, p.httpClient, p.logger)
apiReq := buildCardPayoutRequest(req.GetProjectId(), req)
return client.CreateCardPayout(ctx, apiReq)
@@ -448,6 +615,12 @@ func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req *
if err := p.waitDispatchSlot(ctx); err != nil {
return nil, err
}
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
attempt := p.incrementDispatchAttempt(opRef)
p.logger.Info("Dispatching card token payout attempt",
zap.String("operation_ref", opRef),
zap.Uint32("attempt", attempt),
)
client := monetix.NewClient(p.config, p.httpClient, p.logger)
apiReq := buildCardTokenPayoutRequest(req.GetProjectId(), req)
return client.CreateCardTokenPayout(ctx, apiReq)
@@ -516,6 +689,13 @@ func (p *cardPayoutProcessor) scheduleCardPayoutRetry(req *mntxv1.CardPayoutRequ
if p.retryDelayFn != nil {
delay = p.retryDelayFn(failedAttempt)
}
p.logger.Info("Scheduling card payout retry",
zap.String("operation_ref", operationRef),
zap.Uint32("failed_attempt", failedAttempt),
zap.Uint32("next_attempt", nextAttempt),
zap.Uint32("max_attempts", maxAttempts),
zap.Duration("delay", delay),
)
p.scheduleRetryTimer(operationRef, delay, func() {
p.runCardPayoutRetry(cloned, nextAttempt, maxAttempts)
})
@@ -539,6 +719,13 @@ func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry(req *mntxv1.CardToken
if p.retryDelayFn != nil {
delay = p.retryDelayFn(failedAttempt)
}
p.logger.Info("Scheduling card token payout retry",
zap.String("operation_ref", operationRef),
zap.Uint32("failed_attempt", failedAttempt),
zap.Uint32("next_attempt", nextAttempt),
zap.Uint32("max_attempts", maxAttempts),
zap.Duration("delay", delay),
)
p.scheduleRetryTimer(operationRef, delay, func() {
p.runCardTokenPayoutRetry(cloned, nextAttempt, maxAttempts)
})
@@ -567,6 +754,11 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
if operationRef == "" {
return
}
p.logger.Info("Executing scheduled card payout retry",
zap.String("operation_ref", operationRef),
zap.Uint32("attempt", attempt),
zap.Uint32("max_attempts", maxAttempts),
)
ctx, cancel := p.retryContext()
defer cancel()
@@ -580,7 +772,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
return
}
if isFinalStatus(state) {
p.clearRetryTimer(operationRef)
p.clearRetryState(operationRef)
return
}
@@ -608,7 +800,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist terminal payout transport failure", zap.Error(upErr))
}
p.clearRetryTimer(operationRef)
p.clearRetryState(operationRef)
return
}
@@ -640,7 +832,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist terminal payout provider failure", zap.Error(upErr))
}
p.clearRetryTimer(operationRef)
p.clearRetryState(operationRef)
}
func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, attempt uint32, maxAttempts uint32) {
@@ -651,6 +843,11 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
if operationRef == "" {
return
}
p.logger.Info("Executing scheduled card token payout retry",
zap.String("operation_ref", operationRef),
zap.Uint32("attempt", attempt),
zap.Uint32("max_attempts", maxAttempts),
)
ctx, cancel := p.retryContext()
defer cancel()
@@ -664,7 +861,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
return
}
if isFinalStatus(state) {
p.clearRetryTimer(operationRef)
p.clearRetryState(operationRef)
return
}
@@ -692,7 +889,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist terminal token payout transport failure", zap.Error(upErr))
}
p.clearRetryTimer(operationRef)
p.clearRetryState(operationRef)
return
}
@@ -724,7 +921,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist terminal token payout provider failure", zap.Error(upErr))
}
p.clearRetryTimer(operationRef)
p.clearRetryState(operationRef)
}
func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
@@ -798,20 +995,24 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
}
// Keep CreatedAt/refs if record already exists.
existing, _ := p.findAndMergePayoutState(ctx, state)
existing, err := p.findAndMergePayoutState(ctx, state)
if err != nil {
return nil, err
}
if existing != nil {
switch existing.Status {
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
}
}
p.rememberCardRetryRequest(req)
result, err := p.dispatchCardPayout(ctx, req)
if err != nil {
decision := p.retryPolicy.decideTransportFailure()
state.ProviderMessage = err.Error()
state.UpdatedAt = p.clock.Now()
maxAttempts := maxDispatchAttempts(0)
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
@@ -833,7 +1034,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
}
fields := append([]zap.Field{zap.Error(err)}, payoutStateLogFields(state)...)
p.logger.Warn("Monetix payout submission failed", fields...)
p.clearRetryTimer(state.OperationRef)
p.clearRetryState(state.OperationRef)
return nil, err
}
@@ -842,21 +1043,24 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
accepted := result.Accepted
errorCode := strings.TrimSpace(result.ErrorCode)
errorMessage := strings.TrimSpace(result.ErrorMessage)
scheduleRetry := false
retryMaxAttempts := uint32(0)
if !result.Accepted {
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
maxAttempts := maxDispatchAttempts(0)
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
accepted = true
errorCode = ""
errorMessage = ""
p.scheduleCardPayoutRetry(req, 1, maxAttempts)
scheduleRetry = true
retryMaxAttempts = maxAttempts
} else {
state.Status = model.PayoutStatusFailed
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
p.clearRetryTimer(state.OperationRef)
p.clearRetryState(state.OperationRef)
}
} else {
p.clearRetryTimer(state.OperationRef)
@@ -872,6 +1076,9 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
)
return nil, err
}
if scheduleRetry {
p.scheduleCardPayoutRetry(req, 1, retryMaxAttempts)
}
resp := cardPayoutResponseFromState(state, accepted, errorCode, errorMessage)
@@ -951,20 +1158,24 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
UpdatedAt: now,
}
existing, _ := p.findAndMergePayoutState(ctx, state)
existing, err := p.findAndMergePayoutState(ctx, state)
if err != nil {
return nil, err
}
if existing != nil {
switch existing.Status {
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
}
}
p.rememberCardTokenRetryRequest(req)
result, err := p.dispatchCardTokenPayout(ctx, req)
if err != nil {
decision := p.retryPolicy.decideTransportFailure()
state.ProviderMessage = err.Error()
state.UpdatedAt = p.clock.Now()
maxAttempts := maxDispatchAttempts(0)
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
@@ -980,7 +1191,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
if e := p.updatePayoutStatus(ctx, state); e != nil {
return nil, e
}
p.clearRetryTimer(state.OperationRef)
p.clearRetryState(state.OperationRef)
p.logger.Warn("Monetix token payout submission failed",
zap.String("payment_ref", state.PaymentRef),
zap.String("customer_id", state.CustomerID),
@@ -993,21 +1204,24 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
accepted := result.Accepted
errorCode := strings.TrimSpace(result.ErrorCode)
errorMessage := strings.TrimSpace(result.ErrorMessage)
scheduleRetry := false
retryMaxAttempts := uint32(0)
if !result.Accepted {
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
maxAttempts := maxDispatchAttempts(0)
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
accepted = true
errorCode = ""
errorMessage = ""
p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts)
scheduleRetry = true
retryMaxAttempts = maxAttempts
} else {
state.Status = model.PayoutStatusFailed
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
p.clearRetryTimer(state.OperationRef)
p.clearRetryState(state.OperationRef)
}
} else {
p.clearRetryTimer(state.OperationRef)
@@ -1018,6 +1232,9 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
p.logger.Warn("Failed to update payout status", zap.Error(err))
return nil, err
}
if scheduleRetry {
p.scheduleCardTokenPayoutRetry(req, 1, retryMaxAttempts)
}
resp := cardTokenPayoutResponseFromState(state, accepted, errorCode, errorMessage)
@@ -1179,23 +1396,70 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt
)
return http.StatusInternalServerError, err
}
if existing != nil {
// keep failure reason if you want, or override depending on callback semantics
if state.FailureReason == "" {
state.FailureReason = existing.FailureReason
}
}
if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled {
if strings.TrimSpace(state.FailureReason) == "" {
state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage)
}
operationRef := strings.TrimSpace(state.OperationRef)
if existing != nil && strings.TrimSpace(state.FailureReason) == "" {
state.FailureReason = strings.TrimSpace(existing.FailureReason)
}
retryScheduled := false
if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled {
decision := p.retryPolicy.decideProviderFailure(state.ProviderCode)
attemptsUsed := p.currentDispatchAttempt(operationRef)
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && attemptsUsed > 0 && attemptsUsed < maxAttempts {
if req := p.loadCardRetryRequest(operationRef); req != nil {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
p.logger.Info("Callback decline is retryable; scheduling card payout retry",
zap.String("operation_ref", operationRef),
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
zap.Uint32("attempts_used", attemptsUsed),
zap.Uint32("max_attempts", maxAttempts),
)
if err := p.updatePayoutStatus(ctx, state); err != nil {
p.logger.Warn("Failed to persist callback retry scheduling state", zap.Error(err))
return http.StatusInternalServerError, err
}
p.scheduleCardPayoutRetry(req, attemptsUsed, maxAttempts)
retryScheduled = true
} else if req := p.loadCardTokenRetryRequest(operationRef); req != nil {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
p.logger.Info("Callback decline is retryable; scheduling card token payout retry",
zap.String("operation_ref", operationRef),
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
zap.Uint32("attempts_used", attemptsUsed),
zap.Uint32("max_attempts", maxAttempts),
)
if err := p.updatePayoutStatus(ctx, state); err != nil {
p.logger.Warn("Failed to persist callback token retry scheduling state", zap.Error(err))
return http.StatusInternalServerError, err
}
p.scheduleCardTokenPayoutRetry(req, attemptsUsed, maxAttempts)
retryScheduled = true
} else {
p.logger.Warn("Retryable callback decline received but no retry request snapshot found",
zap.String("operation_ref", operationRef),
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
zap.Uint32("attempts_used", attemptsUsed),
zap.Uint32("max_attempts", maxAttempts),
)
}
}
if !retryScheduled && strings.TrimSpace(state.FailureReason) == "" {
state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage)
}
} else if state.Status == model.PayoutStatusSuccess {
state.FailureReason = ""
}
if !retryScheduled {
if err := p.updatePayoutStatus(ctx, state); err != nil {
p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err))
}
if isFinalStatus(state) {
p.clearRetryTimer(state.OperationRef)
p.clearRetryState(operationRef)
}
}
monetix.ObserveCallback(statusLabel)
@@ -1204,6 +1468,7 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt
zap.String("status", statusLabel),
zap.String("provider_code", state.ProviderCode),
zap.String("provider_message", state.ProviderMessage),
zap.Bool("retry_scheduled", retryScheduled),
zap.String("masked_account", cb.Account.Number),
)

View File

@@ -538,3 +538,97 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *test
t.Fatalf("unexpected provider call count: got=%d want=%d", got, want)
}
}
func TestCardPayoutProcessor_ProcessCallback_RetryableDeclineSchedulesRetry(t *testing.T) {
cfg := monetix.Config{
BaseURL: "https://monetix.test",
SecretKey: "secret",
ProjectID: 99,
StatusSuccess: "success",
StatusProcessing: "processing",
AllowedCurrencies: []string{"RUB"},
}
repo := newMockRepository()
var calls atomic.Int32
httpClient := &http.Client{
Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) {
n := calls.Add(1)
resp := monetix.APIResponse{}
if n == 1 {
resp.Operation.RequestID = "req-initial"
} else {
resp.Operation.RequestID = "req-after-callback-retry"
}
body, _ := json.Marshal(resp)
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader(body)),
Header: http.Header{"Content-Type": []string{"application/json"}},
}, nil
}),
}
processor := newCardPayoutProcessor(
zap.NewNop(),
cfg,
staticClock{now: time.Date(2026, 3, 4, 2, 0, 0, 0, time.UTC)},
repo,
httpClient,
nil,
)
defer processor.stopRetries()
processor.dispatchThrottleInterval = 0
processor.retryDelayFn = func(uint32) time.Duration { return 5 * time.Millisecond }
req := validCardPayoutRequest()
resp, err := processor.Submit(context.Background(), req)
if err != nil {
t.Fatalf("submit returned error: %v", err)
}
if !resp.GetAccepted() {
t.Fatalf("expected accepted submit response")
}
cb := baseCallback()
cb.Payment.ID = req.GetPayoutId()
cb.Payment.Status = "failed"
cb.Operation.Status = "failed"
cb.Operation.Code = providerCodeDeclineAmountOrFrequencyLimit
cb.Operation.Message = "Decline due to amount or frequency limit"
cb.Payment.Sum.Currency = "RUB"
sig, err := monetix.SignPayload(cb, cfg.SecretKey)
if err != nil {
t.Fatalf("failed to sign callback: %v", err)
}
cb.Signature = sig
payload, err := json.Marshal(cb)
if err != nil {
t.Fatalf("failed to marshal callback: %v", err)
}
status, err := processor.ProcessCallback(context.Background(), payload)
if err != nil {
t.Fatalf("process callback returned error: %v", err)
}
if status != http.StatusOK {
t.Fatalf("unexpected callback status: %d", status)
}
deadline := time.Now().Add(2 * time.Second)
for {
state, ok := repo.payouts.Get(req.GetPayoutId())
if ok && state != nil && state.Status == model.PayoutStatusWaiting && state.ProviderPaymentID == "req-after-callback-retry" {
break
}
if time.Now().After(deadline) {
t.Fatalf("timeout waiting for callback-scheduled retry result")
}
time.Sleep(10 * time.Millisecond)
}
if got, want := calls.Load(), int32(2); got != want {
t.Fatalf("unexpected provider call count: got=%d want=%d", got, want)
}
}