fixed rescheduling supporting callback error code processing
This commit is contained in:
@@ -2,6 +2,7 @@ package gateway
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/tech/sendico/gateway/mntx/storage"
|
"github.com/tech/sendico/gateway/mntx/storage"
|
||||||
"github.com/tech/sendico/gateway/mntx/storage/model"
|
"github.com/tech/sendico/gateway/mntx/storage/model"
|
||||||
@@ -24,6 +25,7 @@ func (r *mockRepository) Payouts() storage.PayoutsStore {
|
|||||||
|
|
||||||
// cardPayoutStore implements storage.PayoutsStore for tests.
|
// cardPayoutStore implements storage.PayoutsStore for tests.
|
||||||
type cardPayoutStore struct {
|
type cardPayoutStore struct {
|
||||||
|
mu sync.RWMutex
|
||||||
data map[string]*model.CardPayout
|
data map[string]*model.CardPayout
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -44,6 +46,8 @@ func newCardPayoutStore() *cardPayoutStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *cardPayoutStore) FindByIdempotencyKey(_ context.Context, key string) (*model.CardPayout, error) {
|
func (s *cardPayoutStore) FindByIdempotencyKey(_ context.Context, key string) (*model.CardPayout, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
for _, v := range s.data {
|
for _, v := range s.data {
|
||||||
if v.IdempotencyKey == key {
|
if v.IdempotencyKey == key {
|
||||||
return v, nil
|
return v, nil
|
||||||
@@ -53,6 +57,8 @@ func (s *cardPayoutStore) FindByIdempotencyKey(_ context.Context, key string) (*
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *cardPayoutStore) FindByOperationRef(_ context.Context, ref string) (*model.CardPayout, error) {
|
func (s *cardPayoutStore) FindByOperationRef(_ context.Context, ref string) (*model.CardPayout, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
for _, v := range s.data {
|
for _, v := range s.data {
|
||||||
if v.OperationRef == ref {
|
if v.OperationRef == ref {
|
||||||
return v, nil
|
return v, nil
|
||||||
@@ -62,6 +68,8 @@ func (s *cardPayoutStore) FindByOperationRef(_ context.Context, ref string) (*mo
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *cardPayoutStore) FindByPaymentID(_ context.Context, id string) (*model.CardPayout, error) {
|
func (s *cardPayoutStore) FindByPaymentID(_ context.Context, id string) (*model.CardPayout, error) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
for _, v := range s.data {
|
for _, v := range s.data {
|
||||||
if v.PaymentRef == id {
|
if v.PaymentRef == id {
|
||||||
return v, nil
|
return v, nil
|
||||||
@@ -71,17 +79,23 @@ func (s *cardPayoutStore) FindByPaymentID(_ context.Context, id string) (*model.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *cardPayoutStore) Upsert(_ context.Context, record *model.CardPayout) error {
|
func (s *cardPayoutStore) Upsert(_ context.Context, record *model.CardPayout) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
s.data[payoutStoreKey(record)] = record
|
s.data[payoutStoreKey(record)] = record
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save is a helper for tests to pre-populate data.
|
// Save is a helper for tests to pre-populate data.
|
||||||
func (s *cardPayoutStore) Save(state *model.CardPayout) {
|
func (s *cardPayoutStore) Save(state *model.CardPayout) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
s.data[payoutStoreKey(state)] = state
|
s.data[payoutStoreKey(state)] = state
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get is a helper for tests to retrieve data.
|
// Get is a helper for tests to retrieve data.
|
||||||
func (s *cardPayoutStore) Get(id string) (*model.CardPayout, bool) {
|
func (s *cardPayoutStore) Get(id string) (*model.CardPayout, bool) {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
if v, ok := s.data[id]; ok {
|
if v, ok := s.data[id]; ok {
|
||||||
return v, true
|
return v, true
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ type cardPayoutProcessor struct {
|
|||||||
perTxMinAmountMinor int64
|
perTxMinAmountMinor int64
|
||||||
perTxMinAmountMinorByCurrency map[string]int64
|
perTxMinAmountMinorByCurrency map[string]int64
|
||||||
dispatchThrottleInterval time.Duration
|
dispatchThrottleInterval time.Duration
|
||||||
|
dispatchMaxAttempts uint32
|
||||||
|
|
||||||
dispatchMu sync.Mutex
|
dispatchMu sync.Mutex
|
||||||
nextDispatchAllowed time.Time
|
nextDispatchAllowed time.Time
|
||||||
@@ -57,6 +58,13 @@ type cardPayoutProcessor struct {
|
|||||||
retryTimers map[string]*time.Timer
|
retryTimers map[string]*time.Timer
|
||||||
retryCtx context.Context
|
retryCtx context.Context
|
||||||
retryStop context.CancelFunc
|
retryStop context.CancelFunc
|
||||||
|
|
||||||
|
retryReqMu sync.RWMutex
|
||||||
|
cardRetryRequests map[string]*mntxv1.CardPayoutRequest
|
||||||
|
cardTokenRetryRequest map[string]*mntxv1.CardTokenPayoutRequest
|
||||||
|
|
||||||
|
attemptMu sync.Mutex
|
||||||
|
dispatchAttempts map[string]uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func mergePayoutStateWithExisting(state, existing *model.CardPayout) {
|
func mergePayoutStateWithExisting(state, existing *model.CardPayout) {
|
||||||
@@ -180,11 +188,15 @@ func newCardPayoutProcessor(
|
|||||||
httpClient: client,
|
httpClient: client,
|
||||||
producer: producer,
|
producer: producer,
|
||||||
dispatchThrottleInterval: defaultDispatchThrottleInterval,
|
dispatchThrottleInterval: defaultDispatchThrottleInterval,
|
||||||
|
dispatchMaxAttempts: defaultMaxDispatchAttempts,
|
||||||
retryPolicy: defaultPayoutFailurePolicy(),
|
retryPolicy: defaultPayoutFailurePolicy(),
|
||||||
retryDelayFn: retryDelayDuration,
|
retryDelayFn: retryDelayDuration,
|
||||||
retryTimers: map[string]*time.Timer{},
|
retryTimers: map[string]*time.Timer{},
|
||||||
retryCtx: retryCtx,
|
retryCtx: retryCtx,
|
||||||
retryStop: retryStop,
|
retryStop: retryStop,
|
||||||
|
cardRetryRequests: map[string]*mntxv1.CardPayoutRequest{},
|
||||||
|
cardTokenRetryRequest: map[string]*mntxv1.CardTokenPayoutRequest{},
|
||||||
|
dispatchAttempts: map[string]uint32{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -362,6 +374,14 @@ func (p *cardPayoutProcessor) stopRetries() {
|
|||||||
}
|
}
|
||||||
delete(p.retryTimers, key)
|
delete(p.retryTimers, key)
|
||||||
}
|
}
|
||||||
|
p.retryReqMu.Lock()
|
||||||
|
p.cardRetryRequests = map[string]*mntxv1.CardPayoutRequest{}
|
||||||
|
p.cardTokenRetryRequest = map[string]*mntxv1.CardTokenPayoutRequest{}
|
||||||
|
p.retryReqMu.Unlock()
|
||||||
|
|
||||||
|
p.attemptMu.Lock()
|
||||||
|
p.dispatchAttempts = map[string]uint32{}
|
||||||
|
p.attemptMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) {
|
func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) {
|
||||||
@@ -381,6 +401,147 @@ func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) {
|
|||||||
delete(p.retryTimers, key)
|
delete(p.retryTimers, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) maxDispatchAttempts() uint32 {
|
||||||
|
if p == nil {
|
||||||
|
return defaultMaxDispatchAttempts
|
||||||
|
}
|
||||||
|
return maxDispatchAttempts(p.dispatchMaxAttempts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) rememberCardRetryRequest(req *mntxv1.CardPayoutRequest) {
|
||||||
|
if p == nil || req == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||||
|
if key == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.retryReqMu.Lock()
|
||||||
|
defer p.retryReqMu.Unlock()
|
||||||
|
p.cardRetryRequests[key] = cloned
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) rememberCardTokenRetryRequest(req *mntxv1.CardTokenPayoutRequest) {
|
||||||
|
if p == nil || req == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||||
|
if key == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.retryReqMu.Lock()
|
||||||
|
defer p.retryReqMu.Unlock()
|
||||||
|
p.cardTokenRetryRequest[key] = cloned
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) loadCardRetryRequest(operationRef string) *mntxv1.CardPayoutRequest {
|
||||||
|
if p == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
key := strings.TrimSpace(operationRef)
|
||||||
|
if key == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
p.retryReqMu.RLock()
|
||||||
|
defer p.retryReqMu.RUnlock()
|
||||||
|
req := p.cardRetryRequests[key]
|
||||||
|
if req == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return cloned
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) loadCardTokenRetryRequest(operationRef string) *mntxv1.CardTokenPayoutRequest {
|
||||||
|
if p == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
key := strings.TrimSpace(operationRef)
|
||||||
|
if key == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
p.retryReqMu.RLock()
|
||||||
|
defer p.retryReqMu.RUnlock()
|
||||||
|
req := p.cardTokenRetryRequest[key]
|
||||||
|
if req == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return cloned
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) incrementDispatchAttempt(operationRef string) uint32 {
|
||||||
|
if p == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
key := strings.TrimSpace(operationRef)
|
||||||
|
if key == "" {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
p.attemptMu.Lock()
|
||||||
|
defer p.attemptMu.Unlock()
|
||||||
|
next := p.dispatchAttempts[key] + 1
|
||||||
|
p.dispatchAttempts[key] = next
|
||||||
|
return next
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) currentDispatchAttempt(operationRef string) uint32 {
|
||||||
|
if p == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
key := strings.TrimSpace(operationRef)
|
||||||
|
if key == "" {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
p.attemptMu.Lock()
|
||||||
|
defer p.attemptMu.Unlock()
|
||||||
|
return p.dispatchAttempts[key]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) clearDispatchAttempt(operationRef string) {
|
||||||
|
if p == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := strings.TrimSpace(operationRef)
|
||||||
|
if key == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.attemptMu.Lock()
|
||||||
|
defer p.attemptMu.Unlock()
|
||||||
|
delete(p.dispatchAttempts, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) clearRetryState(operationRef string) {
|
||||||
|
if p == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := strings.TrimSpace(operationRef)
|
||||||
|
if key == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.clearRetryTimer(key)
|
||||||
|
p.clearDispatchAttempt(key)
|
||||||
|
|
||||||
|
p.retryReqMu.Lock()
|
||||||
|
defer p.retryReqMu.Unlock()
|
||||||
|
delete(p.cardRetryRequests, key)
|
||||||
|
delete(p.cardTokenRetryRequest, key)
|
||||||
|
}
|
||||||
|
|
||||||
func payoutAcceptedForState(state *model.CardPayout) bool {
|
func payoutAcceptedForState(state *model.CardPayout) bool {
|
||||||
if state == nil {
|
if state == nil {
|
||||||
return false
|
return false
|
||||||
@@ -433,6 +594,12 @@ func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv
|
|||||||
if err := p.waitDispatchSlot(ctx); err != nil {
|
if err := p.waitDispatchSlot(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||||
|
attempt := p.incrementDispatchAttempt(opRef)
|
||||||
|
p.logger.Info("Dispatching card payout attempt",
|
||||||
|
zap.String("operation_ref", opRef),
|
||||||
|
zap.Uint32("attempt", attempt),
|
||||||
|
)
|
||||||
client := monetix.NewClient(p.config, p.httpClient, p.logger)
|
client := monetix.NewClient(p.config, p.httpClient, p.logger)
|
||||||
apiReq := buildCardPayoutRequest(req.GetProjectId(), req)
|
apiReq := buildCardPayoutRequest(req.GetProjectId(), req)
|
||||||
return client.CreateCardPayout(ctx, apiReq)
|
return client.CreateCardPayout(ctx, apiReq)
|
||||||
@@ -448,6 +615,12 @@ func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req *
|
|||||||
if err := p.waitDispatchSlot(ctx); err != nil {
|
if err := p.waitDispatchSlot(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||||
|
attempt := p.incrementDispatchAttempt(opRef)
|
||||||
|
p.logger.Info("Dispatching card token payout attempt",
|
||||||
|
zap.String("operation_ref", opRef),
|
||||||
|
zap.Uint32("attempt", attempt),
|
||||||
|
)
|
||||||
client := monetix.NewClient(p.config, p.httpClient, p.logger)
|
client := monetix.NewClient(p.config, p.httpClient, p.logger)
|
||||||
apiReq := buildCardTokenPayoutRequest(req.GetProjectId(), req)
|
apiReq := buildCardTokenPayoutRequest(req.GetProjectId(), req)
|
||||||
return client.CreateCardTokenPayout(ctx, apiReq)
|
return client.CreateCardTokenPayout(ctx, apiReq)
|
||||||
@@ -516,6 +689,13 @@ func (p *cardPayoutProcessor) scheduleCardPayoutRetry(req *mntxv1.CardPayoutRequ
|
|||||||
if p.retryDelayFn != nil {
|
if p.retryDelayFn != nil {
|
||||||
delay = p.retryDelayFn(failedAttempt)
|
delay = p.retryDelayFn(failedAttempt)
|
||||||
}
|
}
|
||||||
|
p.logger.Info("Scheduling card payout retry",
|
||||||
|
zap.String("operation_ref", operationRef),
|
||||||
|
zap.Uint32("failed_attempt", failedAttempt),
|
||||||
|
zap.Uint32("next_attempt", nextAttempt),
|
||||||
|
zap.Uint32("max_attempts", maxAttempts),
|
||||||
|
zap.Duration("delay", delay),
|
||||||
|
)
|
||||||
p.scheduleRetryTimer(operationRef, delay, func() {
|
p.scheduleRetryTimer(operationRef, delay, func() {
|
||||||
p.runCardPayoutRetry(cloned, nextAttempt, maxAttempts)
|
p.runCardPayoutRetry(cloned, nextAttempt, maxAttempts)
|
||||||
})
|
})
|
||||||
@@ -539,6 +719,13 @@ func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry(req *mntxv1.CardToken
|
|||||||
if p.retryDelayFn != nil {
|
if p.retryDelayFn != nil {
|
||||||
delay = p.retryDelayFn(failedAttempt)
|
delay = p.retryDelayFn(failedAttempt)
|
||||||
}
|
}
|
||||||
|
p.logger.Info("Scheduling card token payout retry",
|
||||||
|
zap.String("operation_ref", operationRef),
|
||||||
|
zap.Uint32("failed_attempt", failedAttempt),
|
||||||
|
zap.Uint32("next_attempt", nextAttempt),
|
||||||
|
zap.Uint32("max_attempts", maxAttempts),
|
||||||
|
zap.Duration("delay", delay),
|
||||||
|
)
|
||||||
p.scheduleRetryTimer(operationRef, delay, func() {
|
p.scheduleRetryTimer(operationRef, delay, func() {
|
||||||
p.runCardTokenPayoutRetry(cloned, nextAttempt, maxAttempts)
|
p.runCardTokenPayoutRetry(cloned, nextAttempt, maxAttempts)
|
||||||
})
|
})
|
||||||
@@ -567,6 +754,11 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
|
|||||||
if operationRef == "" {
|
if operationRef == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
p.logger.Info("Executing scheduled card payout retry",
|
||||||
|
zap.String("operation_ref", operationRef),
|
||||||
|
zap.Uint32("attempt", attempt),
|
||||||
|
zap.Uint32("max_attempts", maxAttempts),
|
||||||
|
)
|
||||||
ctx, cancel := p.retryContext()
|
ctx, cancel := p.retryContext()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@@ -580,7 +772,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if isFinalStatus(state) {
|
if isFinalStatus(state) {
|
||||||
p.clearRetryTimer(operationRef)
|
p.clearRetryState(operationRef)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -608,7 +800,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
|
|||||||
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
||||||
p.logger.Warn("Failed to persist terminal payout transport failure", zap.Error(upErr))
|
p.logger.Warn("Failed to persist terminal payout transport failure", zap.Error(upErr))
|
||||||
}
|
}
|
||||||
p.clearRetryTimer(operationRef)
|
p.clearRetryState(operationRef)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -640,7 +832,7 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest,
|
|||||||
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
||||||
p.logger.Warn("Failed to persist terminal payout provider failure", zap.Error(upErr))
|
p.logger.Warn("Failed to persist terminal payout provider failure", zap.Error(upErr))
|
||||||
}
|
}
|
||||||
p.clearRetryTimer(operationRef)
|
p.clearRetryState(operationRef)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, attempt uint32, maxAttempts uint32) {
|
func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, attempt uint32, maxAttempts uint32) {
|
||||||
@@ -651,6 +843,11 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
|
|||||||
if operationRef == "" {
|
if operationRef == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
p.logger.Info("Executing scheduled card token payout retry",
|
||||||
|
zap.String("operation_ref", operationRef),
|
||||||
|
zap.Uint32("attempt", attempt),
|
||||||
|
zap.Uint32("max_attempts", maxAttempts),
|
||||||
|
)
|
||||||
ctx, cancel := p.retryContext()
|
ctx, cancel := p.retryContext()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@@ -664,7 +861,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if isFinalStatus(state) {
|
if isFinalStatus(state) {
|
||||||
p.clearRetryTimer(operationRef)
|
p.clearRetryState(operationRef)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -692,7 +889,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
|
|||||||
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
||||||
p.logger.Warn("Failed to persist terminal token payout transport failure", zap.Error(upErr))
|
p.logger.Warn("Failed to persist terminal token payout transport failure", zap.Error(upErr))
|
||||||
}
|
}
|
||||||
p.clearRetryTimer(operationRef)
|
p.clearRetryState(operationRef)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -724,7 +921,7 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou
|
|||||||
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
||||||
p.logger.Warn("Failed to persist terminal token payout provider failure", zap.Error(upErr))
|
p.logger.Warn("Failed to persist terminal token payout provider failure", zap.Error(upErr))
|
||||||
}
|
}
|
||||||
p.clearRetryTimer(operationRef)
|
p.clearRetryState(operationRef)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
|
func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
|
||||||
@@ -798,20 +995,24 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Keep CreatedAt/refs if record already exists.
|
// Keep CreatedAt/refs if record already exists.
|
||||||
existing, _ := p.findAndMergePayoutState(ctx, state)
|
existing, err := p.findAndMergePayoutState(ctx, state)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if existing != nil {
|
if existing != nil {
|
||||||
switch existing.Status {
|
switch existing.Status {
|
||||||
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
||||||
return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
p.rememberCardRetryRequest(req)
|
||||||
|
|
||||||
result, err := p.dispatchCardPayout(ctx, req)
|
result, err := p.dispatchCardPayout(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decision := p.retryPolicy.decideTransportFailure()
|
decision := p.retryPolicy.decideTransportFailure()
|
||||||
state.ProviderMessage = err.Error()
|
state.ProviderMessage = err.Error()
|
||||||
state.UpdatedAt = p.clock.Now()
|
state.UpdatedAt = p.clock.Now()
|
||||||
maxAttempts := maxDispatchAttempts(0)
|
maxAttempts := p.maxDispatchAttempts()
|
||||||
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
||||||
state.Status = model.PayoutStatusProcessing
|
state.Status = model.PayoutStatusProcessing
|
||||||
state.FailureReason = ""
|
state.FailureReason = ""
|
||||||
@@ -833,7 +1034,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
|
|||||||
}
|
}
|
||||||
fields := append([]zap.Field{zap.Error(err)}, payoutStateLogFields(state)...)
|
fields := append([]zap.Field{zap.Error(err)}, payoutStateLogFields(state)...)
|
||||||
p.logger.Warn("Monetix payout submission failed", fields...)
|
p.logger.Warn("Monetix payout submission failed", fields...)
|
||||||
p.clearRetryTimer(state.OperationRef)
|
p.clearRetryState(state.OperationRef)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -842,21 +1043,24 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
|
|||||||
accepted := result.Accepted
|
accepted := result.Accepted
|
||||||
errorCode := strings.TrimSpace(result.ErrorCode)
|
errorCode := strings.TrimSpace(result.ErrorCode)
|
||||||
errorMessage := strings.TrimSpace(result.ErrorMessage)
|
errorMessage := strings.TrimSpace(result.ErrorMessage)
|
||||||
|
scheduleRetry := false
|
||||||
|
retryMaxAttempts := uint32(0)
|
||||||
|
|
||||||
if !result.Accepted {
|
if !result.Accepted {
|
||||||
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
||||||
maxAttempts := maxDispatchAttempts(0)
|
maxAttempts := p.maxDispatchAttempts()
|
||||||
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
||||||
state.Status = model.PayoutStatusProcessing
|
state.Status = model.PayoutStatusProcessing
|
||||||
state.FailureReason = ""
|
state.FailureReason = ""
|
||||||
accepted = true
|
accepted = true
|
||||||
errorCode = ""
|
errorCode = ""
|
||||||
errorMessage = ""
|
errorMessage = ""
|
||||||
p.scheduleCardPayoutRetry(req, 1, maxAttempts)
|
scheduleRetry = true
|
||||||
|
retryMaxAttempts = maxAttempts
|
||||||
} else {
|
} else {
|
||||||
state.Status = model.PayoutStatusFailed
|
state.Status = model.PayoutStatusFailed
|
||||||
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
||||||
p.clearRetryTimer(state.OperationRef)
|
p.clearRetryState(state.OperationRef)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
p.clearRetryTimer(state.OperationRef)
|
p.clearRetryTimer(state.OperationRef)
|
||||||
@@ -872,6 +1076,9 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
|
|||||||
)
|
)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if scheduleRetry {
|
||||||
|
p.scheduleCardPayoutRetry(req, 1, retryMaxAttempts)
|
||||||
|
}
|
||||||
|
|
||||||
resp := cardPayoutResponseFromState(state, accepted, errorCode, errorMessage)
|
resp := cardPayoutResponseFromState(state, accepted, errorCode, errorMessage)
|
||||||
|
|
||||||
@@ -951,20 +1158,24 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
|
|||||||
UpdatedAt: now,
|
UpdatedAt: now,
|
||||||
}
|
}
|
||||||
|
|
||||||
existing, _ := p.findAndMergePayoutState(ctx, state)
|
existing, err := p.findAndMergePayoutState(ctx, state)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if existing != nil {
|
if existing != nil {
|
||||||
switch existing.Status {
|
switch existing.Status {
|
||||||
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
||||||
return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
p.rememberCardTokenRetryRequest(req)
|
||||||
|
|
||||||
result, err := p.dispatchCardTokenPayout(ctx, req)
|
result, err := p.dispatchCardTokenPayout(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
decision := p.retryPolicy.decideTransportFailure()
|
decision := p.retryPolicy.decideTransportFailure()
|
||||||
state.ProviderMessage = err.Error()
|
state.ProviderMessage = err.Error()
|
||||||
state.UpdatedAt = p.clock.Now()
|
state.UpdatedAt = p.clock.Now()
|
||||||
maxAttempts := maxDispatchAttempts(0)
|
maxAttempts := p.maxDispatchAttempts()
|
||||||
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
||||||
state.Status = model.PayoutStatusProcessing
|
state.Status = model.PayoutStatusProcessing
|
||||||
state.FailureReason = ""
|
state.FailureReason = ""
|
||||||
@@ -980,7 +1191,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
|
|||||||
if e := p.updatePayoutStatus(ctx, state); e != nil {
|
if e := p.updatePayoutStatus(ctx, state); e != nil {
|
||||||
return nil, e
|
return nil, e
|
||||||
}
|
}
|
||||||
p.clearRetryTimer(state.OperationRef)
|
p.clearRetryState(state.OperationRef)
|
||||||
p.logger.Warn("Monetix token payout submission failed",
|
p.logger.Warn("Monetix token payout submission failed",
|
||||||
zap.String("payment_ref", state.PaymentRef),
|
zap.String("payment_ref", state.PaymentRef),
|
||||||
zap.String("customer_id", state.CustomerID),
|
zap.String("customer_id", state.CustomerID),
|
||||||
@@ -993,21 +1204,24 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
|
|||||||
accepted := result.Accepted
|
accepted := result.Accepted
|
||||||
errorCode := strings.TrimSpace(result.ErrorCode)
|
errorCode := strings.TrimSpace(result.ErrorCode)
|
||||||
errorMessage := strings.TrimSpace(result.ErrorMessage)
|
errorMessage := strings.TrimSpace(result.ErrorMessage)
|
||||||
|
scheduleRetry := false
|
||||||
|
retryMaxAttempts := uint32(0)
|
||||||
|
|
||||||
if !result.Accepted {
|
if !result.Accepted {
|
||||||
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
||||||
maxAttempts := maxDispatchAttempts(0)
|
maxAttempts := p.maxDispatchAttempts()
|
||||||
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
||||||
state.Status = model.PayoutStatusProcessing
|
state.Status = model.PayoutStatusProcessing
|
||||||
state.FailureReason = ""
|
state.FailureReason = ""
|
||||||
accepted = true
|
accepted = true
|
||||||
errorCode = ""
|
errorCode = ""
|
||||||
errorMessage = ""
|
errorMessage = ""
|
||||||
p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts)
|
scheduleRetry = true
|
||||||
|
retryMaxAttempts = maxAttempts
|
||||||
} else {
|
} else {
|
||||||
state.Status = model.PayoutStatusFailed
|
state.Status = model.PayoutStatusFailed
|
||||||
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
||||||
p.clearRetryTimer(state.OperationRef)
|
p.clearRetryState(state.OperationRef)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
p.clearRetryTimer(state.OperationRef)
|
p.clearRetryTimer(state.OperationRef)
|
||||||
@@ -1018,6 +1232,9 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
|
|||||||
p.logger.Warn("Failed to update payout status", zap.Error(err))
|
p.logger.Warn("Failed to update payout status", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if scheduleRetry {
|
||||||
|
p.scheduleCardTokenPayoutRetry(req, 1, retryMaxAttempts)
|
||||||
|
}
|
||||||
|
|
||||||
resp := cardTokenPayoutResponseFromState(state, accepted, errorCode, errorMessage)
|
resp := cardTokenPayoutResponseFromState(state, accepted, errorCode, errorMessage)
|
||||||
|
|
||||||
@@ -1179,23 +1396,70 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt
|
|||||||
)
|
)
|
||||||
return http.StatusInternalServerError, err
|
return http.StatusInternalServerError, err
|
||||||
}
|
}
|
||||||
if existing != nil {
|
operationRef := strings.TrimSpace(state.OperationRef)
|
||||||
// keep failure reason if you want, or override depending on callback semantics
|
if existing != nil && strings.TrimSpace(state.FailureReason) == "" {
|
||||||
if state.FailureReason == "" {
|
state.FailureReason = strings.TrimSpace(existing.FailureReason)
|
||||||
state.FailureReason = existing.FailureReason
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled {
|
|
||||||
if strings.TrimSpace(state.FailureReason) == "" {
|
|
||||||
state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
retryScheduled := false
|
||||||
|
if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled {
|
||||||
|
decision := p.retryPolicy.decideProviderFailure(state.ProviderCode)
|
||||||
|
attemptsUsed := p.currentDispatchAttempt(operationRef)
|
||||||
|
maxAttempts := p.maxDispatchAttempts()
|
||||||
|
if decision.Action == payoutFailureActionRetry && attemptsUsed > 0 && attemptsUsed < maxAttempts {
|
||||||
|
if req := p.loadCardRetryRequest(operationRef); req != nil {
|
||||||
|
state.Status = model.PayoutStatusProcessing
|
||||||
|
state.FailureReason = ""
|
||||||
|
p.logger.Info("Callback decline is retryable; scheduling card payout retry",
|
||||||
|
zap.String("operation_ref", operationRef),
|
||||||
|
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
|
||||||
|
zap.Uint32("attempts_used", attemptsUsed),
|
||||||
|
zap.Uint32("max_attempts", maxAttempts),
|
||||||
|
)
|
||||||
|
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
||||||
|
p.logger.Warn("Failed to persist callback retry scheduling state", zap.Error(err))
|
||||||
|
return http.StatusInternalServerError, err
|
||||||
|
}
|
||||||
|
p.scheduleCardPayoutRetry(req, attemptsUsed, maxAttempts)
|
||||||
|
retryScheduled = true
|
||||||
|
} else if req := p.loadCardTokenRetryRequest(operationRef); req != nil {
|
||||||
|
state.Status = model.PayoutStatusProcessing
|
||||||
|
state.FailureReason = ""
|
||||||
|
p.logger.Info("Callback decline is retryable; scheduling card token payout retry",
|
||||||
|
zap.String("operation_ref", operationRef),
|
||||||
|
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
|
||||||
|
zap.Uint32("attempts_used", attemptsUsed),
|
||||||
|
zap.Uint32("max_attempts", maxAttempts),
|
||||||
|
)
|
||||||
|
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
||||||
|
p.logger.Warn("Failed to persist callback token retry scheduling state", zap.Error(err))
|
||||||
|
return http.StatusInternalServerError, err
|
||||||
|
}
|
||||||
|
p.scheduleCardTokenPayoutRetry(req, attemptsUsed, maxAttempts)
|
||||||
|
retryScheduled = true
|
||||||
|
} else {
|
||||||
|
p.logger.Warn("Retryable callback decline received but no retry request snapshot found",
|
||||||
|
zap.String("operation_ref", operationRef),
|
||||||
|
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
|
||||||
|
zap.Uint32("attempts_used", attemptsUsed),
|
||||||
|
zap.Uint32("max_attempts", maxAttempts),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !retryScheduled && strings.TrimSpace(state.FailureReason) == "" {
|
||||||
|
state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage)
|
||||||
|
}
|
||||||
|
} else if state.Status == model.PayoutStatusSuccess {
|
||||||
|
state.FailureReason = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if !retryScheduled {
|
||||||
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
||||||
p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err))
|
p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err))
|
||||||
}
|
}
|
||||||
if isFinalStatus(state) {
|
if isFinalStatus(state) {
|
||||||
p.clearRetryTimer(state.OperationRef)
|
p.clearRetryState(operationRef)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
monetix.ObserveCallback(statusLabel)
|
monetix.ObserveCallback(statusLabel)
|
||||||
|
|
||||||
@@ -1204,6 +1468,7 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt
|
|||||||
zap.String("status", statusLabel),
|
zap.String("status", statusLabel),
|
||||||
zap.String("provider_code", state.ProviderCode),
|
zap.String("provider_code", state.ProviderCode),
|
||||||
zap.String("provider_message", state.ProviderMessage),
|
zap.String("provider_message", state.ProviderMessage),
|
||||||
|
zap.Bool("retry_scheduled", retryScheduled),
|
||||||
zap.String("masked_account", cb.Account.Number),
|
zap.String("masked_account", cb.Account.Number),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -538,3 +538,97 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *test
|
|||||||
t.Fatalf("unexpected provider call count: got=%d want=%d", got, want)
|
t.Fatalf("unexpected provider call count: got=%d want=%d", got, want)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCardPayoutProcessor_ProcessCallback_RetryableDeclineSchedulesRetry(t *testing.T) {
|
||||||
|
cfg := monetix.Config{
|
||||||
|
BaseURL: "https://monetix.test",
|
||||||
|
SecretKey: "secret",
|
||||||
|
ProjectID: 99,
|
||||||
|
StatusSuccess: "success",
|
||||||
|
StatusProcessing: "processing",
|
||||||
|
AllowedCurrencies: []string{"RUB"},
|
||||||
|
}
|
||||||
|
|
||||||
|
repo := newMockRepository()
|
||||||
|
var calls atomic.Int32
|
||||||
|
httpClient := &http.Client{
|
||||||
|
Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) {
|
||||||
|
n := calls.Add(1)
|
||||||
|
resp := monetix.APIResponse{}
|
||||||
|
if n == 1 {
|
||||||
|
resp.Operation.RequestID = "req-initial"
|
||||||
|
} else {
|
||||||
|
resp.Operation.RequestID = "req-after-callback-retry"
|
||||||
|
}
|
||||||
|
body, _ := json.Marshal(resp)
|
||||||
|
return &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Body: io.NopCloser(bytes.NewReader(body)),
|
||||||
|
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||||
|
}, nil
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
processor := newCardPayoutProcessor(
|
||||||
|
zap.NewNop(),
|
||||||
|
cfg,
|
||||||
|
staticClock{now: time.Date(2026, 3, 4, 2, 0, 0, 0, time.UTC)},
|
||||||
|
repo,
|
||||||
|
httpClient,
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
defer processor.stopRetries()
|
||||||
|
processor.dispatchThrottleInterval = 0
|
||||||
|
processor.retryDelayFn = func(uint32) time.Duration { return 5 * time.Millisecond }
|
||||||
|
|
||||||
|
req := validCardPayoutRequest()
|
||||||
|
resp, err := processor.Submit(context.Background(), req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("submit returned error: %v", err)
|
||||||
|
}
|
||||||
|
if !resp.GetAccepted() {
|
||||||
|
t.Fatalf("expected accepted submit response")
|
||||||
|
}
|
||||||
|
|
||||||
|
cb := baseCallback()
|
||||||
|
cb.Payment.ID = req.GetPayoutId()
|
||||||
|
cb.Payment.Status = "failed"
|
||||||
|
cb.Operation.Status = "failed"
|
||||||
|
cb.Operation.Code = providerCodeDeclineAmountOrFrequencyLimit
|
||||||
|
cb.Operation.Message = "Decline due to amount or frequency limit"
|
||||||
|
cb.Payment.Sum.Currency = "RUB"
|
||||||
|
|
||||||
|
sig, err := monetix.SignPayload(cb, cfg.SecretKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to sign callback: %v", err)
|
||||||
|
}
|
||||||
|
cb.Signature = sig
|
||||||
|
payload, err := json.Marshal(cb)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to marshal callback: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
status, err := processor.ProcessCallback(context.Background(), payload)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("process callback returned error: %v", err)
|
||||||
|
}
|
||||||
|
if status != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected callback status: %d", status)
|
||||||
|
}
|
||||||
|
|
||||||
|
deadline := time.Now().Add(2 * time.Second)
|
||||||
|
for {
|
||||||
|
state, ok := repo.payouts.Get(req.GetPayoutId())
|
||||||
|
if ok && state != nil && state.Status == model.PayoutStatusWaiting && state.ProviderPaymentID == "req-after-callback-retry" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
t.Fatalf("timeout waiting for callback-scheduled retry result")
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, want := calls.Load(), int32(2); got != want {
|
||||||
|
t.Fatalf("unexpected provider call count: got=%d want=%d", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user