Merge pull request 'fixed rescheduling supporting callback error code processing' (#631) from mntx-627 into main
All checks were successful
ci/woodpecker/push/gateway_mntx Pipeline was successful
All checks were successful
ci/woodpecker/push/gateway_mntx Pipeline was successful
Reviewed-on: #631
This commit was merged in pull request #631.
This commit is contained in:
@@ -2,6 +2,7 @@ package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/tech/sendico/gateway/mntx/storage"
|
||||
"github.com/tech/sendico/gateway/mntx/storage/model"
|
||||
@@ -24,6 +25,7 @@ func (r *mockRepository) Payouts() storage.PayoutsStore {
|
||||
|
||||
// cardPayoutStore implements storage.PayoutsStore for tests.
|
||||
type cardPayoutStore struct {
|
||||
mu sync.RWMutex
|
||||
data map[string]*model.CardPayout
|
||||
}
|
||||
|
||||
@@ -44,6 +46,8 @@ func newCardPayoutStore() *cardPayoutStore {
|
||||
}
|
||||
|
||||
func (s *cardPayoutStore) FindByIdempotencyKey(_ context.Context, key string) (*model.CardPayout, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
for _, v := range s.data {
|
||||
if v.IdempotencyKey == key {
|
||||
return v, nil
|
||||
@@ -53,6 +57,8 @@ func (s *cardPayoutStore) FindByIdempotencyKey(_ context.Context, key string) (*
|
||||
}
|
||||
|
||||
func (s *cardPayoutStore) FindByOperationRef(_ context.Context, ref string) (*model.CardPayout, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
for _, v := range s.data {
|
||||
if v.OperationRef == ref {
|
||||
return v, nil
|
||||
@@ -62,6 +68,8 @@ func (s *cardPayoutStore) FindByOperationRef(_ context.Context, ref string) (*mo
|
||||
}
|
||||
|
||||
func (s *cardPayoutStore) FindByPaymentID(_ context.Context, id string) (*model.CardPayout, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
for _, v := range s.data {
|
||||
if v.PaymentRef == id {
|
||||
return v, nil
|
||||
@@ -71,17 +79,23 @@ func (s *cardPayoutStore) FindByPaymentID(_ context.Context, id string) (*model.
|
||||
}
|
||||
|
||||
func (s *cardPayoutStore) Upsert(_ context.Context, record *model.CardPayout) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.data[payoutStoreKey(record)] = record
|
||||
return nil
|
||||
}
|
||||
|
||||
// Save is a helper for tests to pre-populate data.
|
||||
func (s *cardPayoutStore) Save(state *model.CardPayout) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.data[payoutStoreKey(state)] = state
|
||||
}
|
||||
|
||||
// Get is a helper for tests to retrieve data.
|
||||
func (s *cardPayoutStore) Get(id string) (*model.CardPayout, bool) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if v, ok := s.data[id]; ok {
|
||||
return v, true
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ type cardPayoutProcessor struct {
|
||||
perTxMinAmountMinor int64
|
||||
perTxMinAmountMinorByCurrency map[string]int64
|
||||
dispatchThrottleInterval time.Duration
|
||||
dispatchMaxAttempts uint32
|
||||
|
||||
dispatchMu sync.Mutex
|
||||
nextDispatchAllowed time.Time
|
||||
@@ -57,6 +58,13 @@ type cardPayoutProcessor struct {
|
||||
retryTimers map[string]*time.Timer
|
||||
retryCtx context.Context
|
||||
retryStop context.CancelFunc
|
||||
|
||||
retryReqMu sync.RWMutex
|
||||
cardRetryRequests map[string]*mntxv1.CardPayoutRequest
|
||||
cardTokenRetryRequest map[string]*mntxv1.CardTokenPayoutRequest
|
||||
|
||||
attemptMu sync.Mutex
|
||||
dispatchAttempts map[string]uint32
|
||||
}
|
||||
|
||||
func mergePayoutStateWithExisting(state, existing *model.CardPayout) {
|
||||
@@ -180,11 +188,15 @@ func newCardPayoutProcessor(
|
||||
httpClient: client,
|
||||
producer: producer,
|
||||
dispatchThrottleInterval: defaultDispatchThrottleInterval,
|
||||
dispatchMaxAttempts: defaultMaxDispatchAttempts,
|
||||
retryPolicy: defaultPayoutFailurePolicy(),
|
||||
retryDelayFn: retryDelayDuration,
|
||||
retryTimers: map[string]*time.Timer{},
|
||||
retryCtx: retryCtx,
|
||||
retryStop: retryStop,
|
||||
cardRetryRequests: map[string]*mntxv1.CardPayoutRequest{},
|
||||
cardTokenRetryRequest: map[string]*mntxv1.CardTokenPayoutRequest{},
|
||||
dispatchAttempts: map[string]uint32{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -362,6 +374,14 @@ func (p *cardPayoutProcessor) stopRetries() {
|
||||
}
|
||||
delete(p.retryTimers, key)
|
||||
}
|
||||
p.retryReqMu.Lock()
|
||||
p.cardRetryRequests = map[string]*mntxv1.CardPayoutRequest{}
|
||||
p.cardTokenRetryRequest = map[string]*mntxv1.CardTokenPayoutRequest{}
|
||||
p.retryReqMu.Unlock()
|
||||
|
||||
p.attemptMu.Lock()
|
||||
p.dispatchAttempts = map[string]uint32{}
|
||||
p.attemptMu.Unlock()
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) {
|
||||
@@ -381,6 +401,147 @@ func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) {
|
||||
delete(p.retryTimers, key)
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) maxDispatchAttempts() uint32 {
|
||||
if p == nil {
|
||||
return defaultMaxDispatchAttempts
|
||||
}
|
||||
return maxDispatchAttempts(p.dispatchMaxAttempts)
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) rememberCardRetryRequest(req *mntxv1.CardPayoutRequest) {
|
||||
if p == nil || req == nil {
|
||||
return
|
||||
}
|
||||
key := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
p.retryReqMu.Lock()
|
||||
defer p.retryReqMu.Unlock()
|
||||
p.cardRetryRequests[key] = cloned
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) rememberCardTokenRetryRequest(req *mntxv1.CardTokenPayoutRequest) {
|
||||
if p == nil || req == nil {
|
||||
return
|
||||
}
|
||||
key := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
p.retryReqMu.Lock()
|
||||
defer p.retryReqMu.Unlock()
|
||||
p.cardTokenRetryRequest[key] = cloned
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) loadCardRetryRequest(operationRef string) *mntxv1.CardPayoutRequest {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
key := strings.TrimSpace(operationRef)
|
||||
if key == "" {
|
||||
return nil
|
||||
}
|
||||
p.retryReqMu.RLock()
|
||||
defer p.retryReqMu.RUnlock()
|
||||
req := p.cardRetryRequests[key]
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return cloned
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) loadCardTokenRetryRequest(operationRef string) *mntxv1.CardTokenPayoutRequest {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
key := strings.TrimSpace(operationRef)
|
||||
if key == "" {
|
||||
return nil
|
||||
}
|
||||
p.retryReqMu.RLock()
|
||||
defer p.retryReqMu.RUnlock()
|
||||
req := p.cardTokenRetryRequest[key]
|
||||
if req == nil {
|
||||
return nil
|
||||
}
|
||||
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return cloned
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) incrementDispatchAttempt(operationRef string) uint32 {
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
key := strings.TrimSpace(operationRef)
|
||||
if key == "" {
|
||||
return 0
|
||||
}
|
||||
p.attemptMu.Lock()
|
||||
defer p.attemptMu.Unlock()
|
||||
next := p.dispatchAttempts[key] + 1
|
||||
p.dispatchAttempts[key] = next
|
||||
return next
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) currentDispatchAttempt(operationRef string) uint32 {
|
||||
if p == nil {
|
||||
return 0
|
||||
}
|
||||
key := strings.TrimSpace(operationRef)
|
||||
if key == "" {
|
||||
return 0
|
||||
}
|
||||
p.attemptMu.Lock()
|
||||
defer p.attemptMu.Unlock()
|
||||
return p.dispatchAttempts[key]
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) clearDispatchAttempt(operationRef string) {
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
key := strings.TrimSpace(operationRef)
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
p.attemptMu.Lock()
|
||||
defer p.attemptMu.Unlock()
|
||||
delete(p.dispatchAttempts, key)
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) clearRetryState(operationRef string) {
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
key := strings.TrimSpace(operationRef)
|
||||
if key == "" {
|
||||
return
|
||||
}
|
||||
p.clearRetryTimer(key)
|
||||
p.clearDispatchAttempt(key)
|
||||
|
||||
p.retryReqMu.Lock()
|
||||
defer p.retryReqMu.Unlock()
|
||||
delete(p.cardRetryRequests, key)
|
||||
delete(p.cardTokenRetryRequest, key)
|
||||
}
|
||||
|
||||
func payoutAcceptedForState(state *model.CardPayout) bool {
|
||||
if state == nil {
|
||||
return false
|
||||
@@ -433,6 +594,12 @@ func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv
|
||||
if err := p.waitDispatchSlot(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||
attempt := p.incrementDispatchAttempt(opRef)
|
||||
p.logger.Info("Dispatching card payout attempt",
|
||||
zap.String("operation_ref", opRef),
|
||||
zap.Uint32("attempt", attempt),
|
||||
)
|
||||
client := monetix.NewClient(p.config, p.httpClient, p.logger)
|
||||
apiReq := buildCardPayoutRequest(req.GetProjectId(), req)
|
||||
return client.CreateCardPayout(ctx, apiReq)
|
||||
@@ -448,6 +615,12 @@ func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req *
|
||||
if err := p.waitDispatchSlot(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||
attempt := p.incrementDispatchAttempt(opRef)
|
||||
p.logger.Info("Dispatching card token payout attempt",
|
||||
zap.String("operation_ref", opRef),
|
||||
zap.Uint32("attempt", attempt),
|
||||
)
|
||||
client := monetix.NewClient(p.config, p.httpClient, p.logger)
|
||||
apiReq := buildCardTokenPayoutRequest(req.GetProjectId(), req)
|
||||
return client.CreateCardTokenPayout(ctx, apiReq)
|
||||
@@ -516,6 +689,13 @@ func (p *cardPayoutProcessor) scheduleCardPayoutRetry(req *mntxv1.CardPayoutRequ
|
||||
if p.retryDelayFn != nil {
|
||||
delay = p.retryDelayFn(failedAttempt)
|
||||
}
|
||||
p.logger.Info("Scheduling card payout retry",
|
||||
zap.String("operation_ref", operationRef),
|
||||
zap.Uint32("failed_attempt", failedAttempt),
|
||||
zap.Uint32("next_attempt", nextAttempt),
|
||||
zap.Uint32("max_attempts", maxAttempts),
|
||||
zap.Duration("delay", delay),
|
||||
)
|
||||
p.scheduleRetryTimer(operationRef, delay, func() {
|
||||
p.runCardPayoutRetry(cloned, nextAttempt, maxAttempts)
|
||||
})
|
||||
@@ -539,6 +719,13 @@ func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry(req *mntxv1.CardToken
|
||||
if p.retryDelayFn != nil {
|
||||
delay = p.retryDelayFn(failedAttempt)
|
||||
}
|
||||
p.logger.Info("Scheduling card token payout retry",
|
||||
zap.String("operation_ref", operationRef),
|
||||
zap.Uint32("failed_attempt", failedAttempt),
|
||||
zap.Uint32("next_attempt", nextAttempt),
|
||||
zap.Uint32("max_attempts", maxAttempts),
|
||||
zap.Duration("delay", delay),
|
||||
)
|
||||
p.scheduleRetryTimer(operationRef, delay, func() {
|
||||
p.runCardTokenPayoutRetry(cloned, nextAttempt, maxAttempts)
|
||||
})
|
||||
@@ -567,6 +754,11 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
|
||||
if operationRef == "" {
|
||||
return
|
||||
}
|
||||
p.logger.Info("Executing scheduled card payout retry",
|
||||
zap.String("operation_ref", operationRef),
|
||||
zap.Uint32("attempt", attempt),
|
||||
zap.Uint32("max_attempts", maxAttempts),
|
||||
)
|
||||
ctx, cancel := p.retryContext()
|
||||
defer cancel()
|
||||
|
||||
@@ -580,7 +772,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
|
||||
return
|
||||
}
|
||||
if isFinalStatus(state) {
|
||||
p.clearRetryTimer(operationRef)
|
||||
p.clearRetryState(operationRef)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -608,7 +800,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
|
||||
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
||||
p.logger.Warn("Failed to persist terminal payout transport failure", zap.Error(upErr))
|
||||
}
|
||||
p.clearRetryTimer(operationRef)
|
||||
p.clearRetryState(operationRef)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -640,7 +832,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
|
||||
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
||||
p.logger.Warn("Failed to persist terminal payout provider failure", zap.Error(upErr))
|
||||
}
|
||||
p.clearRetryTimer(operationRef)
|
||||
p.clearRetryState(operationRef)
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, attempt uint32, maxAttempts uint32) {
|
||||
@@ -651,6 +843,11 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
|
||||
if operationRef == "" {
|
||||
return
|
||||
}
|
||||
p.logger.Info("Executing scheduled card token payout retry",
|
||||
zap.String("operation_ref", operationRef),
|
||||
zap.Uint32("attempt", attempt),
|
||||
zap.Uint32("max_attempts", maxAttempts),
|
||||
)
|
||||
ctx, cancel := p.retryContext()
|
||||
defer cancel()
|
||||
|
||||
@@ -664,7 +861,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
|
||||
return
|
||||
}
|
||||
if isFinalStatus(state) {
|
||||
p.clearRetryTimer(operationRef)
|
||||
p.clearRetryState(operationRef)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -692,7 +889,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
|
||||
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
||||
p.logger.Warn("Failed to persist terminal token payout transport failure", zap.Error(upErr))
|
||||
}
|
||||
p.clearRetryTimer(operationRef)
|
||||
p.clearRetryState(operationRef)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -724,7 +921,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
|
||||
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
||||
p.logger.Warn("Failed to persist terminal token payout provider failure", zap.Error(upErr))
|
||||
}
|
||||
p.clearRetryTimer(operationRef)
|
||||
p.clearRetryState(operationRef)
|
||||
}
|
||||
|
||||
func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
|
||||
@@ -798,20 +995,24 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
|
||||
}
|
||||
|
||||
// Keep CreatedAt/refs if record already exists.
|
||||
existing, _ := p.findAndMergePayoutState(ctx, state)
|
||||
existing, err := p.findAndMergePayoutState(ctx, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if existing != nil {
|
||||
switch existing.Status {
|
||||
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
||||
return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
||||
}
|
||||
}
|
||||
p.rememberCardRetryRequest(req)
|
||||
|
||||
result, err := p.dispatchCardPayout(ctx, req)
|
||||
if err != nil {
|
||||
decision := p.retryPolicy.decideTransportFailure()
|
||||
state.ProviderMessage = err.Error()
|
||||
state.UpdatedAt = p.clock.Now()
|
||||
maxAttempts := maxDispatchAttempts(0)
|
||||
maxAttempts := p.maxDispatchAttempts()
|
||||
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
||||
state.Status = model.PayoutStatusProcessing
|
||||
state.FailureReason = ""
|
||||
@@ -833,7 +1034,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
|
||||
}
|
||||
fields := append([]zap.Field{zap.Error(err)}, payoutStateLogFields(state)...)
|
||||
p.logger.Warn("Monetix payout submission failed", fields...)
|
||||
p.clearRetryTimer(state.OperationRef)
|
||||
p.clearRetryState(state.OperationRef)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -842,21 +1043,24 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
|
||||
accepted := result.Accepted
|
||||
errorCode := strings.TrimSpace(result.ErrorCode)
|
||||
errorMessage := strings.TrimSpace(result.ErrorMessage)
|
||||
scheduleRetry := false
|
||||
retryMaxAttempts := uint32(0)
|
||||
|
||||
if !result.Accepted {
|
||||
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
||||
maxAttempts := maxDispatchAttempts(0)
|
||||
maxAttempts := p.maxDispatchAttempts()
|
||||
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
||||
state.Status = model.PayoutStatusProcessing
|
||||
state.FailureReason = ""
|
||||
accepted = true
|
||||
errorCode = ""
|
||||
errorMessage = ""
|
||||
p.scheduleCardPayoutRetry(req, 1, maxAttempts)
|
||||
scheduleRetry = true
|
||||
retryMaxAttempts = maxAttempts
|
||||
} else {
|
||||
state.Status = model.PayoutStatusFailed
|
||||
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
||||
p.clearRetryTimer(state.OperationRef)
|
||||
p.clearRetryState(state.OperationRef)
|
||||
}
|
||||
} else {
|
||||
p.clearRetryTimer(state.OperationRef)
|
||||
@@ -872,6 +1076,9 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
if scheduleRetry {
|
||||
p.scheduleCardPayoutRetry(req, 1, retryMaxAttempts)
|
||||
}
|
||||
|
||||
resp := cardPayoutResponseFromState(state, accepted, errorCode, errorMessage)
|
||||
|
||||
@@ -951,20 +1158,24 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
existing, _ := p.findAndMergePayoutState(ctx, state)
|
||||
existing, err := p.findAndMergePayoutState(ctx, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if existing != nil {
|
||||
switch existing.Status {
|
||||
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
||||
return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
||||
}
|
||||
}
|
||||
p.rememberCardTokenRetryRequest(req)
|
||||
|
||||
result, err := p.dispatchCardTokenPayout(ctx, req)
|
||||
if err != nil {
|
||||
decision := p.retryPolicy.decideTransportFailure()
|
||||
state.ProviderMessage = err.Error()
|
||||
state.UpdatedAt = p.clock.Now()
|
||||
maxAttempts := maxDispatchAttempts(0)
|
||||
maxAttempts := p.maxDispatchAttempts()
|
||||
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
||||
state.Status = model.PayoutStatusProcessing
|
||||
state.FailureReason = ""
|
||||
@@ -980,7 +1191,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
|
||||
if e := p.updatePayoutStatus(ctx, state); e != nil {
|
||||
return nil, e
|
||||
}
|
||||
p.clearRetryTimer(state.OperationRef)
|
||||
p.clearRetryState(state.OperationRef)
|
||||
p.logger.Warn("Monetix token payout submission failed",
|
||||
zap.String("payment_ref", state.PaymentRef),
|
||||
zap.String("customer_id", state.CustomerID),
|
||||
@@ -993,21 +1204,24 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
|
||||
accepted := result.Accepted
|
||||
errorCode := strings.TrimSpace(result.ErrorCode)
|
||||
errorMessage := strings.TrimSpace(result.ErrorMessage)
|
||||
scheduleRetry := false
|
||||
retryMaxAttempts := uint32(0)
|
||||
|
||||
if !result.Accepted {
|
||||
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
||||
maxAttempts := maxDispatchAttempts(0)
|
||||
maxAttempts := p.maxDispatchAttempts()
|
||||
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
||||
state.Status = model.PayoutStatusProcessing
|
||||
state.FailureReason = ""
|
||||
accepted = true
|
||||
errorCode = ""
|
||||
errorMessage = ""
|
||||
p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts)
|
||||
scheduleRetry = true
|
||||
retryMaxAttempts = maxAttempts
|
||||
} else {
|
||||
state.Status = model.PayoutStatusFailed
|
||||
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
||||
p.clearRetryTimer(state.OperationRef)
|
||||
p.clearRetryState(state.OperationRef)
|
||||
}
|
||||
} else {
|
||||
p.clearRetryTimer(state.OperationRef)
|
||||
@@ -1018,6 +1232,9 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
|
||||
p.logger.Warn("Failed to update payout status", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
if scheduleRetry {
|
||||
p.scheduleCardTokenPayoutRetry(req, 1, retryMaxAttempts)
|
||||
}
|
||||
|
||||
resp := cardTokenPayoutResponseFromState(state, accepted, errorCode, errorMessage)
|
||||
|
||||
@@ -1179,23 +1396,70 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt
|
||||
)
|
||||
return http.StatusInternalServerError, err
|
||||
}
|
||||
if existing != nil {
|
||||
// keep failure reason if you want, or override depending on callback semantics
|
||||
if state.FailureReason == "" {
|
||||
state.FailureReason = existing.FailureReason
|
||||
}
|
||||
}
|
||||
if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled {
|
||||
if strings.TrimSpace(state.FailureReason) == "" {
|
||||
state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage)
|
||||
}
|
||||
operationRef := strings.TrimSpace(state.OperationRef)
|
||||
if existing != nil && strings.TrimSpace(state.FailureReason) == "" {
|
||||
state.FailureReason = strings.TrimSpace(existing.FailureReason)
|
||||
}
|
||||
|
||||
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
||||
p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err))
|
||||
retryScheduled := false
|
||||
if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled {
|
||||
decision := p.retryPolicy.decideProviderFailure(state.ProviderCode)
|
||||
attemptsUsed := p.currentDispatchAttempt(operationRef)
|
||||
maxAttempts := p.maxDispatchAttempts()
|
||||
if decision.Action == payoutFailureActionRetry && attemptsUsed > 0 && attemptsUsed < maxAttempts {
|
||||
if req := p.loadCardRetryRequest(operationRef); req != nil {
|
||||
state.Status = model.PayoutStatusProcessing
|
||||
state.FailureReason = ""
|
||||
p.logger.Info("Callback decline is retryable; scheduling card payout retry",
|
||||
zap.String("operation_ref", operationRef),
|
||||
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
|
||||
zap.Uint32("attempts_used", attemptsUsed),
|
||||
zap.Uint32("max_attempts", maxAttempts),
|
||||
)
|
||||
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
||||
p.logger.Warn("Failed to persist callback retry scheduling state", zap.Error(err))
|
||||
return http.StatusInternalServerError, err
|
||||
}
|
||||
p.scheduleCardPayoutRetry(req, attemptsUsed, maxAttempts)
|
||||
retryScheduled = true
|
||||
} else if req := p.loadCardTokenRetryRequest(operationRef); req != nil {
|
||||
state.Status = model.PayoutStatusProcessing
|
||||
state.FailureReason = ""
|
||||
p.logger.Info("Callback decline is retryable; scheduling card token payout retry",
|
||||
zap.String("operation_ref", operationRef),
|
||||
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
|
||||
zap.Uint32("attempts_used", attemptsUsed),
|
||||
zap.Uint32("max_attempts", maxAttempts),
|
||||
)
|
||||
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
||||
p.logger.Warn("Failed to persist callback token retry scheduling state", zap.Error(err))
|
||||
return http.StatusInternalServerError, err
|
||||
}
|
||||
p.scheduleCardTokenPayoutRetry(req, attemptsUsed, maxAttempts)
|
||||
retryScheduled = true
|
||||
} else {
|
||||
p.logger.Warn("Retryable callback decline received but no retry request snapshot found",
|
||||
zap.String("operation_ref", operationRef),
|
||||
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
|
||||
zap.Uint32("attempts_used", attemptsUsed),
|
||||
zap.Uint32("max_attempts", maxAttempts),
|
||||
)
|
||||
}
|
||||
}
|
||||
if !retryScheduled && strings.TrimSpace(state.FailureReason) == "" {
|
||||
state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage)
|
||||
}
|
||||
} else if state.Status == model.PayoutStatusSuccess {
|
||||
state.FailureReason = ""
|
||||
}
|
||||
if isFinalStatus(state) {
|
||||
p.clearRetryTimer(state.OperationRef)
|
||||
|
||||
if !retryScheduled {
|
||||
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
||||
p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err))
|
||||
}
|
||||
if isFinalStatus(state) {
|
||||
p.clearRetryState(operationRef)
|
||||
}
|
||||
}
|
||||
monetix.ObserveCallback(statusLabel)
|
||||
|
||||
@@ -1204,6 +1468,7 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt
|
||||
zap.String("status", statusLabel),
|
||||
zap.String("provider_code", state.ProviderCode),
|
||||
zap.String("provider_message", state.ProviderMessage),
|
||||
zap.Bool("retry_scheduled", retryScheduled),
|
||||
zap.String("masked_account", cb.Account.Number),
|
||||
)
|
||||
|
||||
|
||||
@@ -538,3 +538,97 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *test
|
||||
t.Fatalf("unexpected provider call count: got=%d want=%d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCardPayoutProcessor_ProcessCallback_RetryableDeclineSchedulesRetry(t *testing.T) {
|
||||
cfg := monetix.Config{
|
||||
BaseURL: "https://monetix.test",
|
||||
SecretKey: "secret",
|
||||
ProjectID: 99,
|
||||
StatusSuccess: "success",
|
||||
StatusProcessing: "processing",
|
||||
AllowedCurrencies: []string{"RUB"},
|
||||
}
|
||||
|
||||
repo := newMockRepository()
|
||||
var calls atomic.Int32
|
||||
httpClient := &http.Client{
|
||||
Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) {
|
||||
n := calls.Add(1)
|
||||
resp := monetix.APIResponse{}
|
||||
if n == 1 {
|
||||
resp.Operation.RequestID = "req-initial"
|
||||
} else {
|
||||
resp.Operation.RequestID = "req-after-callback-retry"
|
||||
}
|
||||
body, _ := json.Marshal(resp)
|
||||
return &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Body: io.NopCloser(bytes.NewReader(body)),
|
||||
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||
}, nil
|
||||
}),
|
||||
}
|
||||
|
||||
processor := newCardPayoutProcessor(
|
||||
zap.NewNop(),
|
||||
cfg,
|
||||
staticClock{now: time.Date(2026, 3, 4, 2, 0, 0, 0, time.UTC)},
|
||||
repo,
|
||||
httpClient,
|
||||
nil,
|
||||
)
|
||||
defer processor.stopRetries()
|
||||
processor.dispatchThrottleInterval = 0
|
||||
processor.retryDelayFn = func(uint32) time.Duration { return 5 * time.Millisecond }
|
||||
|
||||
req := validCardPayoutRequest()
|
||||
resp, err := processor.Submit(context.Background(), req)
|
||||
if err != nil {
|
||||
t.Fatalf("submit returned error: %v", err)
|
||||
}
|
||||
if !resp.GetAccepted() {
|
||||
t.Fatalf("expected accepted submit response")
|
||||
}
|
||||
|
||||
cb := baseCallback()
|
||||
cb.Payment.ID = req.GetPayoutId()
|
||||
cb.Payment.Status = "failed"
|
||||
cb.Operation.Status = "failed"
|
||||
cb.Operation.Code = providerCodeDeclineAmountOrFrequencyLimit
|
||||
cb.Operation.Message = "Decline due to amount or frequency limit"
|
||||
cb.Payment.Sum.Currency = "RUB"
|
||||
|
||||
sig, err := monetix.SignPayload(cb, cfg.SecretKey)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to sign callback: %v", err)
|
||||
}
|
||||
cb.Signature = sig
|
||||
payload, err := json.Marshal(cb)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to marshal callback: %v", err)
|
||||
}
|
||||
|
||||
status, err := processor.ProcessCallback(context.Background(), payload)
|
||||
if err != nil {
|
||||
t.Fatalf("process callback returned error: %v", err)
|
||||
}
|
||||
if status != http.StatusOK {
|
||||
t.Fatalf("unexpected callback status: %d", status)
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for {
|
||||
state, ok := repo.payouts.Get(req.GetPayoutId())
|
||||
if ok && state != nil && state.Status == model.PayoutStatusWaiting && state.ProviderPaymentID == "req-after-callback-retry" {
|
||||
break
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
t.Fatalf("timeout waiting for callback-scheduled retry result")
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
if got, want := calls.Load(), int32(2); got != want {
|
||||
t.Fatalf("unexpected provider call count: got=%d want=%d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user