Files
sendico/api/gateway/mntx/internal/service/gateway/card_processor.go
2026-03-06 12:14:32 +01:00

1574 lines
48 KiB
Go

package gateway
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/shopspring/decimal"
gatewayoutbox "github.com/tech/sendico/gateway/common/outbox"
"github.com/tech/sendico/gateway/mntx/internal/service/monetix"
"github.com/tech/sendico/gateway/mntx/storage"
"github.com/tech/sendico/gateway/mntx/storage/model"
clockpkg "github.com/tech/sendico/pkg/clock"
"github.com/tech/sendico/pkg/db/storable"
"github.com/tech/sendico/pkg/merrors"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/mlogger"
pmodel "github.com/tech/sendico/pkg/model"
gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
const (
defaultDispatchThrottleInterval = 150 * time.Millisecond
defaultMaxDispatchAttempts = uint32(5)
)
type cardPayoutProcessor struct {
logger mlogger.Logger
config monetix.Config
clock clockpkg.Clock
store storage.Repository
httpClient *http.Client
producer msg.Producer
msgCfg pmodel.SettingsT
outbox *gatewayoutbox.ReliableRuntime
perTxMinAmountMinor int64
perTxMinAmountMinorByCurrency map[string]int64
dispatchThrottleInterval time.Duration
dispatchMaxAttempts uint32
executionMode payoutExecutionMode
dispatchMu sync.Mutex
nextDispatchAllowed time.Time
dispatchSerialGate chan struct{}
retryPolicy payoutFailurePolicy
retryDelayFn func(attempt uint32, strategy payoutRetryStrategy) time.Duration
retryMu sync.Mutex
retryTimers map[string]*time.Timer
retryCtx context.Context
retryStop context.CancelFunc
retryReqMu sync.RWMutex
cardRetryRequests map[string]*mntxv1.CardPayoutRequest
cardTokenRetryRequest map[string]*mntxv1.CardTokenPayoutRequest
attemptMu sync.Mutex
dispatchAttempts map[string]uint32
}
func mergePayoutStateWithExisting(state, existing *model.CardPayout) {
if state == nil || existing == nil {
return
}
state.ID = existing.ID // preserve ID for upsert
if !existing.CreatedAt.IsZero() {
state.CreatedAt = existing.CreatedAt
}
if state.OperationRef == "" {
state.OperationRef = existing.OperationRef
}
if state.IdempotencyKey == "" {
state.IdempotencyKey = existing.IdempotencyKey
}
if state.IntentRef == "" {
state.IntentRef = existing.IntentRef
}
if existing.PaymentRef != "" {
state.PaymentRef = existing.PaymentRef
}
}
func findOperationRef(operationRef, payoutID string) string {
ref := strings.TrimSpace(operationRef)
if ref != "" {
return ref
}
return strings.TrimSpace(payoutID)
}
func (p *cardPayoutProcessor) findExistingPayoutState(ctx context.Context, state *model.CardPayout) (*model.CardPayout, error) {
if p == nil || state == nil {
return nil, nil
}
if opRef := strings.TrimSpace(state.OperationRef); opRef != "" {
existing, err := p.store.Payouts().FindByOperationRef(ctx, opRef)
if err == nil {
if existing != nil {
return existing, nil
}
}
if !errors.Is(err, merrors.ErrNoData) {
if err != nil {
return nil, err
}
}
}
return nil, nil
}
func (p *cardPayoutProcessor) findAndMergePayoutState(ctx context.Context, state *model.CardPayout) (*model.CardPayout, error) {
if p == nil || state == nil {
return nil, nil
}
existing, err := p.findExistingPayoutState(ctx, state)
if err != nil {
return nil, err
}
mergePayoutStateWithExisting(state, existing)
return existing, nil
}
func (p *cardPayoutProcessor) resolveProjectID(requestProjectID int64, logFieldKey, logFieldValue string) (int64, error) {
projectID := requestProjectID
if projectID == 0 {
projectID = p.config.ProjectID
}
if projectID == 0 {
p.logger.Warn("Monetix project_id is not configured", zap.String(logFieldKey, logFieldValue))
return 0, merrors.Internal("mcards project_id is not configured")
}
return projectID, nil
}
func applyCardPayoutSendResult(state *model.CardPayout, result *monetix.CardPayoutSendResult) {
if state == nil || result == nil {
return
}
state.ProviderPaymentID = strings.TrimSpace(result.ProviderRequestID)
state.ProviderCode = strings.TrimSpace(result.ErrorCode)
state.ProviderMessage = strings.TrimSpace(result.ErrorMessage)
if result.Accepted {
state.Status = model.PayoutStatusWaiting
return
}
state.Status = model.PayoutStatusFailed
}
func payoutStateLogFields(state *model.CardPayout) []zap.Field {
if state == nil {
return nil
}
return []zap.Field{
zap.String("payment_ref", state.PaymentRef),
zap.String("customer_id", state.CustomerID),
zap.String("operation_ref", state.OperationRef),
zap.String("idempotency_key", state.IdempotencyKey),
zap.String("intent_ref", state.IntentRef),
}
}
func newCardPayoutProcessor(
logger mlogger.Logger,
cfg monetix.Config,
clock clockpkg.Clock,
store storage.Repository,
client *http.Client,
producer msg.Producer,
) *cardPayoutProcessor {
retryCtx, retryStop := context.WithCancel(context.Background())
return &cardPayoutProcessor{
logger: logger.Named("card_payout_processor"),
config: cfg,
clock: clock,
store: store,
httpClient: client,
producer: producer,
dispatchThrottleInterval: defaultDispatchThrottleInterval,
dispatchMaxAttempts: defaultMaxDispatchAttempts,
executionMode: newDefaultPayoutExecutionMode(),
dispatchSerialGate: make(chan struct{}, 1),
retryPolicy: defaultPayoutFailurePolicy(),
retryDelayFn: retryDelayDuration,
retryTimers: map[string]*time.Timer{},
retryCtx: retryCtx,
retryStop: retryStop,
cardRetryRequests: map[string]*mntxv1.CardPayoutRequest{},
cardTokenRetryRequest: map[string]*mntxv1.CardTokenPayoutRequest{},
dispatchAttempts: map[string]uint32{},
}
}
func (p *cardPayoutProcessor) applyGatewayDescriptor(descriptor *gatewayv1.GatewayInstanceDescriptor) {
if p == nil {
return
}
minAmountMinor, perCurrency := perTxMinAmountPolicy(descriptor)
p.perTxMinAmountMinor = minAmountMinor
p.perTxMinAmountMinorByCurrency = perCurrency
p.dispatchThrottleInterval = dispatchThrottleIntervalFromDescriptor(descriptor, defaultDispatchThrottleInterval)
p.logger.Info("Configured payout dispatch throttle",
zap.Duration("dispatch_interval", p.dispatchThrottleInterval),
zap.Bool("sequential_dispatch", p.dispatchSerialGate != nil),
zap.String("execution_mode", payoutExecutionModeName(p.executionMode)),
)
}
func (p *cardPayoutProcessor) setExecutionMode(mode payoutExecutionMode) {
if p == nil {
return
}
p.executionMode = normalizePayoutExecutionMode(mode)
}
func (p *cardPayoutProcessor) observeExecutionState(state *model.CardPayout) {
if p == nil || state == nil {
return
}
if p.executionMode == nil {
return
}
p.executionMode.OnPersistedState(state.OperationRef, state.Status)
}
func perTxMinAmountPolicy(descriptor *gatewayv1.GatewayInstanceDescriptor) (int64, map[string]int64) {
if descriptor == nil || descriptor.GetLimits() == nil {
return 0, nil
}
limits := descriptor.GetLimits()
globalMin, _ := decimalAmountToMinor(firstNonEmpty(limits.GetPerTxMinAmount(), limits.GetMinAmount()))
perCurrency := map[string]int64{}
for currency, override := range limits.GetCurrencyLimits() {
if override == nil {
continue
}
minor, ok := decimalAmountToMinor(override.GetMinAmount())
if !ok {
continue
}
code := strings.ToUpper(strings.TrimSpace(currency))
if code == "" {
continue
}
perCurrency[code] = minor
}
if len(perCurrency) == 0 {
perCurrency = nil
}
return globalMin, perCurrency
}
func decimalAmountToMinor(raw string) (int64, bool) {
raw = strings.TrimSpace(raw)
if raw == "" {
return 0, false
}
value, err := decimal.NewFromString(raw)
if err != nil || !value.IsPositive() {
return 0, false
}
minor := value.Mul(decimal.NewFromInt(100)).Ceil().IntPart()
if minor <= 0 {
return 0, false
}
return minor, true
}
func (p *cardPayoutProcessor) validatePerTxMinimum(amountMinor int64, currency string) error {
if p == nil {
return nil
}
minAmountMinor := p.perTxMinimum(currency)
if minAmountMinor <= 0 || amountMinor >= minAmountMinor {
return nil
}
return newPayoutError("amount_below_minimum", merrors.InvalidArgument(
fmt.Sprintf("amount_minor must be at least %d", minAmountMinor),
"amount_minor",
))
}
func (p *cardPayoutProcessor) perTxMinimum(currency string) int64 {
if p == nil {
return 0
}
minAmountMinor := p.perTxMinAmountMinor
if len(p.perTxMinAmountMinorByCurrency) == 0 {
return minAmountMinor
}
code := strings.ToUpper(strings.TrimSpace(currency))
if code == "" {
return minAmountMinor
}
if override, ok := p.perTxMinAmountMinorByCurrency[code]; ok && override > 0 {
return override
}
return minAmountMinor
}
func dispatchThrottleIntervalFromDescriptor(
descriptor *gatewayv1.GatewayInstanceDescriptor,
fallback time.Duration,
) time.Duration {
if fallback < 0 {
fallback = 0
}
if descriptor == nil || descriptor.GetLimits() == nil {
return fallback
}
velocity := descriptor.GetLimits().GetVelocityLimit()
if len(velocity) == 0 {
return fallback
}
interval := time.Duration(0)
for bucket, maxOps := range velocity {
cleanBucket := strings.TrimSpace(bucket)
if cleanBucket == "" || maxOps <= 0 {
continue
}
window, err := time.ParseDuration(cleanBucket)
if err != nil || window <= 0 {
continue
}
candidate := window / time.Duration(maxOps)
if candidate <= 0 {
continue
}
if candidate > interval {
interval = candidate
}
}
if interval <= 0 {
return fallback
}
return interval
}
func (p *cardPayoutProcessor) waitDispatchSlot(ctx context.Context) error {
if p == nil {
return merrors.Internal("card payout processor not initialised")
}
if ctx == nil {
ctx = context.Background()
}
if p.dispatchThrottleInterval <= 0 {
return nil
}
for {
p.dispatchMu.Lock()
now := time.Now().UTC()
if p.nextDispatchAllowed.IsZero() || !p.nextDispatchAllowed.After(now) {
p.nextDispatchAllowed = now.Add(p.dispatchThrottleInterval)
p.dispatchMu.Unlock()
return nil
}
wait := p.nextDispatchAllowed.Sub(now)
p.dispatchMu.Unlock()
timer := time.NewTimer(wait)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}
}
func (p *cardPayoutProcessor) acquireDispatchExecution(ctx context.Context) (func(), error) {
if p == nil {
return nil, merrors.Internal("card payout processor not initialised")
}
if ctx == nil {
ctx = context.Background()
}
if p.dispatchSerialGate == nil {
return func() {}, nil
}
select {
case p.dispatchSerialGate <- struct{}{}:
return func() {
<-p.dispatchSerialGate
}, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (p *cardPayoutProcessor) stopRetries() {
if p == nil {
return
}
if p.executionMode != nil {
p.executionMode.Shutdown()
}
if p.retryStop != nil {
p.retryStop()
}
p.retryMu.Lock()
defer p.retryMu.Unlock()
for key, timer := range p.retryTimers {
if timer != nil {
timer.Stop()
}
delete(p.retryTimers, key)
}
p.retryReqMu.Lock()
p.cardRetryRequests = map[string]*mntxv1.CardPayoutRequest{}
p.cardTokenRetryRequest = map[string]*mntxv1.CardTokenPayoutRequest{}
p.retryReqMu.Unlock()
p.attemptMu.Lock()
p.dispatchAttempts = map[string]uint32{}
p.attemptMu.Unlock()
}
func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) {
if p == nil {
return
}
key := strings.TrimSpace(operationRef)
if key == "" {
return
}
p.retryMu.Lock()
defer p.retryMu.Unlock()
timer := p.retryTimers[key]
if timer != nil {
timer.Stop()
}
delete(p.retryTimers, key)
}
func (p *cardPayoutProcessor) maxDispatchAttempts() uint32 {
if p == nil {
return defaultMaxDispatchAttempts
}
return maxDispatchAttempts(p.dispatchMaxAttempts)
}
func (p *cardPayoutProcessor) rememberCardRetryRequest(req *mntxv1.CardPayoutRequest) {
if p == nil || req == nil {
return
}
key := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if key == "" {
return
}
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
if !ok {
return
}
p.retryReqMu.Lock()
defer p.retryReqMu.Unlock()
p.cardRetryRequests[key] = cloned
}
func (p *cardPayoutProcessor) rememberCardTokenRetryRequest(req *mntxv1.CardTokenPayoutRequest) {
if p == nil || req == nil {
return
}
key := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if key == "" {
return
}
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
if !ok {
return
}
p.retryReqMu.Lock()
defer p.retryReqMu.Unlock()
p.cardTokenRetryRequest[key] = cloned
}
func (p *cardPayoutProcessor) loadCardRetryRequest(operationRef string) *mntxv1.CardPayoutRequest {
if p == nil {
return nil
}
key := strings.TrimSpace(operationRef)
if key == "" {
return nil
}
p.retryReqMu.RLock()
defer p.retryReqMu.RUnlock()
req := p.cardRetryRequests[key]
if req == nil {
return nil
}
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
if !ok {
return nil
}
return cloned
}
func (p *cardPayoutProcessor) loadCardTokenRetryRequest(operationRef string) *mntxv1.CardTokenPayoutRequest {
if p == nil {
return nil
}
key := strings.TrimSpace(operationRef)
if key == "" {
return nil
}
p.retryReqMu.RLock()
defer p.retryReqMu.RUnlock()
req := p.cardTokenRetryRequest[key]
if req == nil {
return nil
}
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
if !ok {
return nil
}
return cloned
}
func (p *cardPayoutProcessor) incrementDispatchAttempt(operationRef string) uint32 {
if p == nil {
return 0
}
key := strings.TrimSpace(operationRef)
if key == "" {
return 0
}
p.attemptMu.Lock()
defer p.attemptMu.Unlock()
next := p.dispatchAttempts[key] + 1
p.dispatchAttempts[key] = next
return next
}
func (p *cardPayoutProcessor) currentDispatchAttempt(operationRef string) uint32 {
if p == nil {
return 0
}
key := strings.TrimSpace(operationRef)
if key == "" {
return 0
}
p.attemptMu.Lock()
defer p.attemptMu.Unlock()
return p.dispatchAttempts[key]
}
func (p *cardPayoutProcessor) clearDispatchAttempt(operationRef string) {
if p == nil {
return
}
key := strings.TrimSpace(operationRef)
if key == "" {
return
}
p.attemptMu.Lock()
defer p.attemptMu.Unlock()
delete(p.dispatchAttempts, key)
}
func (p *cardPayoutProcessor) clearRetryState(operationRef string) {
if p == nil {
return
}
key := strings.TrimSpace(operationRef)
if key == "" {
return
}
p.clearRetryTimer(key)
p.clearDispatchAttempt(key)
p.retryReqMu.Lock()
defer p.retryReqMu.Unlock()
delete(p.cardRetryRequests, key)
delete(p.cardTokenRetryRequest, key)
}
func payoutAcceptedForState(state *model.CardPayout) bool {
if state == nil {
return false
}
switch state.Status {
case model.PayoutStatusFailed, model.PayoutStatusNeedsAttention, model.PayoutStatusCancelled:
return false
default:
return true
}
}
func terminalStatusAfterRetryExhausted(decision payoutFailureDecision) model.PayoutStatus {
if decision.Action == payoutFailureActionRetry {
return model.PayoutStatusNeedsAttention
}
return model.PayoutStatusFailed
}
func cardPayoutResponseFromState(
state *model.CardPayout,
accepted bool,
errorCode string,
errorMessage string,
) *mntxv1.CardPayoutResponse {
return &mntxv1.CardPayoutResponse{
Payout: StateToProto(state),
Accepted: accepted,
ProviderRequestId: strings.TrimSpace(firstNonEmpty(state.ProviderPaymentID)),
ErrorCode: strings.TrimSpace(errorCode),
ErrorMessage: strings.TrimSpace(errorMessage),
}
}
func cardTokenPayoutResponseFromState(
state *model.CardPayout,
accepted bool,
errorCode string,
errorMessage string,
) *mntxv1.CardTokenPayoutResponse {
return &mntxv1.CardTokenPayoutResponse{
Payout: StateToProto(state),
Accepted: accepted,
ProviderRequestId: strings.TrimSpace(firstNonEmpty(state.ProviderPaymentID)),
ErrorCode: strings.TrimSpace(errorCode),
ErrorMessage: strings.TrimSpace(errorMessage),
}
}
func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv1.CardPayoutRequest) (*monetix.CardPayoutSendResult, error) {
if p == nil {
return nil, merrors.Internal("card payout processor not initialised")
}
if req == nil {
return nil, merrors.InvalidArgument("card payout request is required")
}
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if mode := p.executionMode; mode != nil {
if err := mode.BeforeDispatch(ctx, opRef); err != nil {
return nil, err
}
}
release, err := p.acquireDispatchExecution(ctx)
if err != nil {
return nil, err
}
defer release()
if err := p.waitDispatchSlot(ctx); err != nil {
return nil, err
}
attempt := p.incrementDispatchAttempt(opRef)
p.logger.Info("Dispatching card payout attempt",
zap.String("operation_ref", opRef),
zap.Uint32("attempt", attempt),
)
client := monetix.NewClient(p.config, p.httpClient, p.logger)
apiReq := buildCardPayoutRequest(req.GetProjectId(), req)
return client.CreateCardPayout(ctx, apiReq)
}
func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req *mntxv1.CardTokenPayoutRequest) (*monetix.CardPayoutSendResult, error) {
if p == nil {
return nil, merrors.Internal("card payout processor not initialised")
}
if req == nil {
return nil, merrors.InvalidArgument("card token payout request is required")
}
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if mode := p.executionMode; mode != nil {
if err := mode.BeforeDispatch(ctx, opRef); err != nil {
return nil, err
}
}
release, err := p.acquireDispatchExecution(ctx)
if err != nil {
return nil, err
}
defer release()
if err := p.waitDispatchSlot(ctx); err != nil {
return nil, err
}
attempt := p.incrementDispatchAttempt(opRef)
p.logger.Info("Dispatching card token payout attempt",
zap.String("operation_ref", opRef),
zap.Uint32("attempt", attempt),
)
client := monetix.NewClient(p.config, p.httpClient, p.logger)
apiReq := buildCardTokenPayoutRequest(req.GetProjectId(), req)
return client.CreateCardTokenPayout(ctx, apiReq)
}
func maxDispatchAttempts(v uint32) uint32 {
if v == 0 {
return defaultMaxDispatchAttempts
}
return v
}
func (p *cardPayoutProcessor) scheduleRetryTimer(operationRef string, delay time.Duration, run func()) {
if p == nil || run == nil {
return
}
key := strings.TrimSpace(operationRef)
if key == "" {
return
}
if delay < 0 {
delay = 0
}
p.retryMu.Lock()
defer p.retryMu.Unlock()
if old := p.retryTimers[key]; old != nil {
old.Stop()
}
var timer *time.Timer
timer = time.AfterFunc(delay, func() {
select {
case <-p.retryCtx.Done():
return
default:
}
p.retryMu.Lock()
if p.retryTimers[key] == timer {
delete(p.retryTimers, key)
}
p.retryMu.Unlock()
run()
})
p.retryTimers[key] = timer
}
func retryDelayDuration(attempt uint32, strategy payoutRetryStrategy) time.Duration {
return time.Duration(retryDelayForAttempt(attempt, strategy)) * time.Second
}
func (p *cardPayoutProcessor) scheduleCardPayoutRetry(
req *mntxv1.CardPayoutRequest,
failedAttempt uint32,
maxAttempts uint32,
strategy payoutRetryStrategy,
) {
if p == nil || req == nil {
return
}
maxAttempts = maxDispatchAttempts(maxAttempts)
strategy = normalizeRetryStrategy(strategy)
nextAttempt := failedAttempt + 1
if nextAttempt > maxAttempts {
return
}
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
if !ok {
return
}
operationRef := findOperationRef(cloned.GetOperationRef(), cloned.GetPayoutId())
delay := retryDelayDuration(failedAttempt, strategy)
if p.retryDelayFn != nil {
delay = p.retryDelayFn(failedAttempt, strategy)
}
p.logger.Info("Scheduling card payout retry",
zap.String("operation_ref", operationRef),
zap.String("strategy", strategy.String()),
zap.Uint32("failed_attempt", failedAttempt),
zap.Uint32("next_attempt", nextAttempt),
zap.Uint32("max_attempts", maxAttempts),
zap.Duration("delay", delay),
)
p.scheduleRetryTimer(operationRef, delay, func() {
p.runCardPayoutRetry(cloned, nextAttempt, maxAttempts)
})
}
func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry(
req *mntxv1.CardTokenPayoutRequest,
failedAttempt uint32,
maxAttempts uint32,
strategy payoutRetryStrategy,
) {
if p == nil || req == nil {
return
}
maxAttempts = maxDispatchAttempts(maxAttempts)
strategy = normalizeRetryStrategy(strategy)
nextAttempt := failedAttempt + 1
if nextAttempt > maxAttempts {
return
}
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
if !ok {
return
}
operationRef := findOperationRef(cloned.GetOperationRef(), cloned.GetPayoutId())
delay := retryDelayDuration(failedAttempt, strategy)
if p.retryDelayFn != nil {
delay = p.retryDelayFn(failedAttempt, strategy)
}
p.logger.Info("Scheduling card token payout retry",
zap.String("operation_ref", operationRef),
zap.String("strategy", strategy.String()),
zap.Uint32("failed_attempt", failedAttempt),
zap.Uint32("next_attempt", nextAttempt),
zap.Uint32("max_attempts", maxAttempts),
zap.Duration("delay", delay),
)
p.scheduleRetryTimer(operationRef, delay, func() {
p.runCardTokenPayoutRetry(cloned, nextAttempt, maxAttempts)
})
}
func (p *cardPayoutProcessor) retryContext() (context.Context, context.CancelFunc) {
if p == nil {
return context.Background(), func() {}
}
ctx := p.retryCtx
if ctx == nil {
ctx = context.Background()
}
timeout := p.config.Timeout()
if timeout <= 0 {
return ctx, func() {}
}
return context.WithTimeout(ctx, timeout)
}
func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest, attempt uint32, maxAttempts uint32) {
if p == nil || req == nil {
return
}
operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if operationRef == "" {
return
}
p.logger.Info("Executing scheduled card payout retry",
zap.String("operation_ref", operationRef),
zap.Uint32("attempt", attempt),
zap.Uint32("max_attempts", maxAttempts),
)
ctx, cancel := p.retryContext()
defer cancel()
state, err := p.store.Payouts().FindByOperationRef(ctx, operationRef)
if err != nil || state == nil {
p.logger.Warn("Retry payout state lookup failed",
zap.String("operation_ref", operationRef),
zap.Uint32("attempt", attempt),
zap.Error(err),
)
return
}
if isFinalStatus(state) {
p.clearRetryState(operationRef)
return
}
result, err := p.dispatchCardPayout(ctx, req)
now := p.clock.Now()
maxAttempts = maxDispatchAttempts(maxAttempts)
if err != nil {
decision := p.retryPolicy.decideTransportFailure()
state.ProviderCode = ""
state.ProviderMessage = err.Error()
state.UpdatedAt = now
if decision.Action == payoutFailureActionRetry && attempt < maxAttempts {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist retryable payout transport failure", zap.Error(upErr))
return
}
p.scheduleCardPayoutRetry(req, attempt, maxAttempts, decision.Strategy)
return
}
state.Status = terminalStatusAfterRetryExhausted(decision)
state.FailureReason = payoutFailureReason("", err.Error())
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist terminal payout transport failure", zap.Error(upErr))
}
p.clearRetryState(operationRef)
return
}
applyCardPayoutSendResult(state, result)
state.UpdatedAt = now
if result.Accepted {
state.FailureReason = ""
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist accepted payout retry result", zap.Error(upErr))
}
p.clearRetryTimer(operationRef)
return
}
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
if decision.Action == payoutFailureActionRetry && attempt < maxAttempts {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist retryable payout provider failure", zap.Error(upErr))
return
}
p.scheduleCardPayoutRetry(req, attempt, maxAttempts, decision.Strategy)
return
}
state.Status = terminalStatusAfterRetryExhausted(decision)
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist terminal payout provider failure", zap.Error(upErr))
}
p.clearRetryState(operationRef)
}
func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, attempt uint32, maxAttempts uint32) {
if p == nil || req == nil {
return
}
operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if operationRef == "" {
return
}
p.logger.Info("Executing scheduled card token payout retry",
zap.String("operation_ref", operationRef),
zap.Uint32("attempt", attempt),
zap.Uint32("max_attempts", maxAttempts),
)
ctx, cancel := p.retryContext()
defer cancel()
state, err := p.store.Payouts().FindByOperationRef(ctx, operationRef)
if err != nil || state == nil {
p.logger.Warn("Retry token payout state lookup failed",
zap.String("operation_ref", operationRef),
zap.Uint32("attempt", attempt),
zap.Error(err),
)
return
}
if isFinalStatus(state) {
p.clearRetryState(operationRef)
return
}
result, err := p.dispatchCardTokenPayout(ctx, req)
now := p.clock.Now()
maxAttempts = maxDispatchAttempts(maxAttempts)
if err != nil {
decision := p.retryPolicy.decideTransportFailure()
state.ProviderCode = ""
state.ProviderMessage = err.Error()
state.UpdatedAt = now
if decision.Action == payoutFailureActionRetry && attempt < maxAttempts {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist retryable token payout transport failure", zap.Error(upErr))
return
}
p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts, decision.Strategy)
return
}
state.Status = terminalStatusAfterRetryExhausted(decision)
state.FailureReason = payoutFailureReason("", err.Error())
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist terminal token payout transport failure", zap.Error(upErr))
}
p.clearRetryState(operationRef)
return
}
applyCardPayoutSendResult(state, result)
state.UpdatedAt = now
if result.Accepted {
state.FailureReason = ""
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist accepted token payout retry result", zap.Error(upErr))
}
p.clearRetryTimer(operationRef)
return
}
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
if decision.Action == payoutFailureActionRetry && attempt < maxAttempts {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist retryable token payout provider failure", zap.Error(upErr))
return
}
p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts, decision.Strategy)
return
}
state.Status = terminalStatusAfterRetryExhausted(decision)
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
p.logger.Warn("Failed to persist terminal token payout provider failure", zap.Error(upErr))
}
p.clearRetryState(operationRef)
}
func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
if p == nil {
return nil, merrors.Internal("card payout processor not initialised")
}
req = sanitizeCardPayoutRequest(req)
operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
parentPaymentRef := strings.TrimSpace(req.GetParentPaymentRef())
p.logger.Info("Submitting card payout",
zap.String("parent_payment_ref", parentPaymentRef),
zap.String("customer_id", strings.TrimSpace(req.GetCustomerId())),
zap.Int64("amount_minor", req.GetAmountMinor()),
zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))),
zap.String("operation_ref", operationRef),
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
)
if strings.TrimSpace(p.config.BaseURL) == "" || strings.TrimSpace(p.config.SecretKey) == "" {
p.logger.Warn("Monetix configuration is incomplete for payout submission")
return nil, merrors.Internal("monetix configuration is incomplete")
}
if err := validateCardPayoutRequest(req, p.config); err != nil {
p.logger.Warn("Card payout validation failed",
zap.String("parent_payment_ref", parentPaymentRef),
zap.String("operation_ref", operationRef),
zap.String("customer_id", req.GetCustomerId()),
zap.Error(err),
)
return nil, err
}
if err := p.validatePerTxMinimum(req.GetAmountMinor(), req.GetCurrency()); err != nil {
p.logger.Warn("Card payout amount below configured minimum",
zap.String("parent_payment_ref", parentPaymentRef),
zap.String("operation_ref", operationRef),
zap.String("customer_id", req.GetCustomerId()),
zap.Int64("amount_minor", req.GetAmountMinor()),
zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))),
zap.Int64("configured_min_amount_minor", p.perTxMinimum(req.GetCurrency())),
zap.Error(err),
)
return nil, err
}
projectID, err := p.resolveProjectID(req.GetProjectId(), "operation_ref", operationRef)
if err != nil {
return nil, err
}
req.ProjectId = projectID
now := p.clock.Now()
state := &model.CardPayout{
Base: storable.Base{
ID: bson.NilObjectID,
},
PaymentRef: parentPaymentRef,
OperationRef: operationRef,
IdempotencyKey: strings.TrimSpace(req.GetIdempotencyKey()),
IntentRef: strings.TrimSpace(req.GetIntentRef()),
ProjectID: projectID,
CustomerID: strings.TrimSpace(req.GetCustomerId()),
AmountMinor: req.GetAmountMinor(),
Currency: strings.ToUpper(strings.TrimSpace(req.GetCurrency())),
Status: model.PayoutStatusProcessing,
CreatedAt: now,
UpdatedAt: now,
}
// Keep CreatedAt/refs if record already exists.
existing, err := p.findAndMergePayoutState(ctx, state)
if err != nil {
return nil, err
}
if existing != nil {
switch existing.Status {
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusNeedsAttention, model.PayoutStatusCancelled:
p.observeExecutionState(existing)
return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
}
}
p.rememberCardRetryRequest(req)
result, err := p.dispatchCardPayout(ctx, req)
if err != nil {
decision := p.retryPolicy.decideTransportFailure()
state.ProviderMessage = err.Error()
state.UpdatedAt = p.clock.Now()
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
if e := p.updatePayoutStatus(ctx, state); e != nil {
fields := append([]zap.Field{zap.Error(e)}, payoutStateLogFields(state)...)
p.logger.Warn("Failed to update payout status", fields...)
return nil, e
}
p.scheduleCardPayoutRetry(req, 1, maxAttempts, decision.Strategy)
return cardPayoutResponseFromState(state, true, "", ""), nil
}
state.Status = terminalStatusAfterRetryExhausted(decision)
state.FailureReason = payoutFailureReason("", err.Error())
if e := p.updatePayoutStatus(ctx, state); e != nil {
fields := append([]zap.Field{zap.Error(e)}, payoutStateLogFields(state)...)
p.logger.Warn("Failed to update payout status", fields...)
return nil, e
}
fields := append([]zap.Field{zap.Error(err)}, payoutStateLogFields(state)...)
p.logger.Warn("Monetix payout submission failed", fields...)
p.clearRetryState(state.OperationRef)
return nil, err
}
applyCardPayoutSendResult(state, result)
state.UpdatedAt = p.clock.Now()
accepted := result.Accepted
errorCode := strings.TrimSpace(result.ErrorCode)
errorMessage := strings.TrimSpace(result.ErrorMessage)
scheduleRetry := false
retryMaxAttempts := uint32(0)
retryStrategy := payoutRetryStrategyImmediate
if !result.Accepted {
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
accepted = true
errorCode = ""
errorMessage = ""
scheduleRetry = true
retryMaxAttempts = maxAttempts
retryStrategy = decision.Strategy
} else {
state.Status = terminalStatusAfterRetryExhausted(decision)
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
p.clearRetryState(state.OperationRef)
}
} else {
p.clearRetryTimer(state.OperationRef)
}
if err := p.updatePayoutStatus(ctx, state); err != nil {
p.logger.Warn("Failed to store payout",
zap.Error(err),
zap.String("payment_ref", state.PaymentRef),
zap.String("customer_id", state.CustomerID),
zap.String("operation_ref", state.OperationRef),
zap.String("idempotency_key", state.IdempotencyKey),
)
return nil, err
}
if scheduleRetry {
p.scheduleCardPayoutRetry(req, 1, retryMaxAttempts, retryStrategy)
}
resp := cardPayoutResponseFromState(state, accepted, errorCode, errorMessage)
p.logger.Info("Card payout submission stored",
zap.String("payment_ref", state.PaymentRef),
zap.String("status", string(state.Status)),
zap.Bool("accepted", accepted),
zap.String("provider_request_id", resp.GetProviderRequestId()),
)
return resp, nil
}
func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardTokenPayoutRequest) (*mntxv1.CardTokenPayoutResponse, error) {
if p == nil {
return nil, merrors.Internal("card payout processor not initialised")
}
req = sanitizeCardTokenPayoutRequest(req)
operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
parentPaymentRef := strings.TrimSpace(req.GetParentPaymentRef())
p.logger.Info("Submitting card token payout",
zap.String("parent_payment_ref", parentPaymentRef),
zap.String("customer_id", strings.TrimSpace(req.GetCustomerId())),
zap.Int64("amount_minor", req.GetAmountMinor()),
zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))),
zap.String("operation_ref", operationRef),
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
)
if strings.TrimSpace(p.config.BaseURL) == "" || strings.TrimSpace(p.config.SecretKey) == "" {
p.logger.Warn("Monetix configuration is incomplete for token payout submission")
return nil, merrors.Internal("monetix configuration is incomplete")
}
if err := validateCardTokenPayoutRequest(req, p.config); err != nil {
p.logger.Warn("Card token payout validation failed",
zap.String("parent_payment_ref", parentPaymentRef),
zap.String("operation_ref", operationRef),
zap.String("customer_id", req.GetCustomerId()),
zap.Error(err),
)
return nil, err
}
if err := p.validatePerTxMinimum(req.GetAmountMinor(), req.GetCurrency()); err != nil {
p.logger.Warn("Card token payout amount below configured minimum",
zap.String("parent_payment_ref", parentPaymentRef),
zap.String("operation_ref", operationRef),
zap.String("customer_id", req.GetCustomerId()),
zap.Int64("amount_minor", req.GetAmountMinor()),
zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))),
zap.Int64("configured_min_amount_minor", p.perTxMinimum(req.GetCurrency())),
zap.Error(err),
)
return nil, err
}
projectID, err := p.resolveProjectID(req.GetProjectId(), "operation_ref", operationRef)
if err != nil {
return nil, err
}
req.ProjectId = projectID
now := p.clock.Now()
state := &model.CardPayout{
PaymentRef: parentPaymentRef,
OperationRef: operationRef,
IdempotencyKey: strings.TrimSpace(req.GetIdempotencyKey()),
IntentRef: strings.TrimSpace(req.GetIntentRef()),
ProjectID: projectID,
CustomerID: strings.TrimSpace(req.GetCustomerId()),
AmountMinor: req.GetAmountMinor(),
Currency: strings.ToUpper(strings.TrimSpace(req.GetCurrency())),
Status: model.PayoutStatusProcessing,
CreatedAt: now,
UpdatedAt: now,
}
existing, err := p.findAndMergePayoutState(ctx, state)
if err != nil {
return nil, err
}
if existing != nil {
switch existing.Status {
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusNeedsAttention, model.PayoutStatusCancelled:
p.observeExecutionState(existing)
return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
}
}
p.rememberCardTokenRetryRequest(req)
result, err := p.dispatchCardTokenPayout(ctx, req)
if err != nil {
decision := p.retryPolicy.decideTransportFailure()
state.ProviderMessage = err.Error()
state.UpdatedAt = p.clock.Now()
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
if e := p.updatePayoutStatus(ctx, state); e != nil {
return nil, e
}
p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts, decision.Strategy)
return cardTokenPayoutResponseFromState(state, true, "", ""), nil
}
state.Status = terminalStatusAfterRetryExhausted(decision)
state.FailureReason = payoutFailureReason("", err.Error())
if e := p.updatePayoutStatus(ctx, state); e != nil {
return nil, e
}
p.clearRetryState(state.OperationRef)
p.logger.Warn("Monetix token payout submission failed",
zap.String("payment_ref", state.PaymentRef),
zap.String("customer_id", state.CustomerID),
zap.Error(err),
)
return nil, err
}
applyCardPayoutSendResult(state, result)
accepted := result.Accepted
errorCode := strings.TrimSpace(result.ErrorCode)
errorMessage := strings.TrimSpace(result.ErrorMessage)
scheduleRetry := false
retryMaxAttempts := uint32(0)
retryStrategy := payoutRetryStrategyImmediate
if !result.Accepted {
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
accepted = true
errorCode = ""
errorMessage = ""
scheduleRetry = true
retryMaxAttempts = maxAttempts
retryStrategy = decision.Strategy
} else {
state.Status = terminalStatusAfterRetryExhausted(decision)
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
p.clearRetryState(state.OperationRef)
}
} else {
p.clearRetryTimer(state.OperationRef)
}
state.UpdatedAt = p.clock.Now()
if err := p.updatePayoutStatus(ctx, state); err != nil {
p.logger.Warn("Failed to update payout status", zap.Error(err))
return nil, err
}
if scheduleRetry {
p.scheduleCardTokenPayoutRetry(req, 1, retryMaxAttempts, retryStrategy)
}
resp := cardTokenPayoutResponseFromState(state, accepted, errorCode, errorMessage)
p.logger.Info("Card token payout submission stored",
zap.String("payment_ref", state.PaymentRef),
zap.String("status", string(state.Status)),
zap.Bool("accepted", accepted),
zap.String("provider_request_id", resp.GetProviderRequestId()),
)
return resp, nil
}
func (p *cardPayoutProcessor) Tokenize(ctx context.Context, req *mntxv1.CardTokenizeRequest) (*mntxv1.CardTokenizeResponse, error) {
if p == nil {
return nil, merrors.Internal("card payout processor not initialised")
}
p.logger.Info("Submitting card tokenization",
zap.String("request_id", strings.TrimSpace(req.GetRequestId())),
zap.String("customer_id", strings.TrimSpace(req.GetCustomerId())),
)
cardInput, err := validateCardTokenizeRequest(req, p.config)
if err != nil {
p.logger.Warn("Card tokenization validation failed",
zap.String("request_id", req.GetRequestId()),
zap.String("customer_id", req.GetCustomerId()),
zap.Error(err),
)
return nil, err
}
projectID, err := p.resolveProjectID(req.GetProjectId(), "request_id", req.GetRequestId())
if err != nil {
return nil, err
}
req = sanitizeCardTokenizeRequest(req)
cardInput = extractTokenizeCard(req)
client := monetix.NewClient(p.config, p.httpClient, p.logger)
apiReq := buildCardTokenizeRequest(projectID, req, cardInput)
result, err := client.CreateCardTokenization(ctx, apiReq)
if err != nil {
p.logger.Warn("Monetix tokenization request failed",
zap.String("request_id", req.GetRequestId()),
zap.String("customer_id", req.GetCustomerId()),
zap.Error(err),
)
return nil, err
}
resp := &mntxv1.CardTokenizeResponse{
RequestId: req.GetRequestId(),
Success: result.Accepted,
ErrorCode: result.ErrorCode,
ErrorMessage: result.ErrorMessage,
}
resp.Token = result.Token
resp.MaskedPan = result.MaskedPAN
resp.ExpiryMonth = result.ExpiryMonth
resp.ExpiryYear = result.ExpiryYear
resp.CardBrand = result.CardBrand
p.logger.Info("Card tokenization completed",
zap.String("request_id", resp.GetRequestId()),
zap.Bool("success", resp.GetSuccess()),
zap.String("provider_request_id", result.ProviderRequestID),
)
return resp, nil
}
func (p *cardPayoutProcessor) Status(ctx context.Context, payoutID string) (*mntxv1.CardPayoutState, error) {
if p == nil {
return nil, merrors.Internal("card payout processor not initialised")
}
id := strings.TrimSpace(payoutID)
p.logger.Info("Card payout status requested", zap.String("operation_ref", id))
if id == "" {
p.logger.Warn("Payout status requested with empty payout_id")
return nil, merrors.InvalidArgument("payout_id is required", "payout_id")
}
state, err := p.store.Payouts().FindByOperationRef(ctx, id)
if err != nil && !errors.Is(err, merrors.ErrNoData) {
p.logger.Warn("Payout status lookup by operation ref failed", zap.String("operation_ref", id), zap.Error(err))
return nil, err
}
if state == nil || errors.Is(err, merrors.ErrNoData) {
p.logger.Warn("Payout status not found", zap.String("operation_ref", id))
return nil, merrors.NoData("payout not found")
}
p.logger.Info("Card payout status resolved",
zap.String("payment_ref", state.PaymentRef),
zap.String("operation_ref", state.OperationRef),
zap.String("status", string(state.Status)),
)
return StateToProto(state), nil
}
func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byte) (int, error) {
if p == nil {
return http.StatusInternalServerError, merrors.Internal("card payout processor not initialised")
}
p.logger.Debug("Processing Monetix callback", zap.Int("payload_bytes", len(payload)))
if len(payload) == 0 {
p.logger.Warn("Received empty Monetix callback payload")
return http.StatusBadRequest, merrors.InvalidArgument("callback body is empty")
}
if strings.TrimSpace(p.config.SecretKey) == "" {
p.logger.Warn("Monetix secret key is not configured; cannot verify callback")
return http.StatusInternalServerError, merrors.Internal("monetix secret key is not configured")
}
var cb monetixCallback
if err := json.Unmarshal(payload, &cb); err != nil {
p.logger.Warn("Failed to unmarshal Monetix callback", zap.Error(err))
return http.StatusBadRequest, err
}
signature, err := verifyCallbackSignature(payload, p.config.SecretKey)
if err != nil {
status := http.StatusBadRequest
if errors.Is(err, merrors.ErrDataConflict) {
status = http.StatusForbidden
}
p.logger.Warn("Monetix callback signature check failed",
zap.String("payout_id", cb.Payment.ID),
zap.String("signature", signature),
zap.String("payload", string(payload)),
zap.Error(err),
)
return status, err
}
// mapCallbackToState currently returns proto-state in your code.
// Convert it to mongo model and preserve internal refs if record exists.
pbState, statusLabel := mapCallbackToState(p.clock, p.config, cb)
// Convert proto -> mongo (operationRef/idempotencyKey are internal; keep empty for now)
state := CardPayoutStateFromProto(p.clock, pbState)
// Preserve CreatedAt + internal keys from existing record if present.
existing, err := p.findAndMergePayoutState(ctx, state)
if err != nil {
p.logger.Warn("Failed to fetch payout state while processing callback",
zap.Error(err),
zap.String("payment_ref", state.PaymentRef),
)
return http.StatusInternalServerError, err
}
operationRef := strings.TrimSpace(state.OperationRef)
if existing != nil && strings.TrimSpace(state.FailureReason) == "" {
state.FailureReason = strings.TrimSpace(existing.FailureReason)
}
retryScheduled := false
if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled || state.Status == model.PayoutStatusNeedsAttention {
decision := p.retryPolicy.decideProviderFailure(state.ProviderCode)
attemptsUsed := p.currentDispatchAttempt(operationRef)
maxAttempts := p.maxDispatchAttempts()
if decision.Action == payoutFailureActionRetry && attemptsUsed > 0 && attemptsUsed < maxAttempts {
if req := p.loadCardRetryRequest(operationRef); req != nil {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
p.logger.Info("Callback decline is retryable; scheduling card payout retry",
zap.String("operation_ref", operationRef),
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
zap.Uint32("attempts_used", attemptsUsed),
zap.Uint32("max_attempts", maxAttempts),
)
if err := p.updatePayoutStatus(ctx, state); err != nil {
p.logger.Warn("Failed to persist callback retry scheduling state", zap.Error(err))
return http.StatusInternalServerError, err
}
p.scheduleCardPayoutRetry(req, attemptsUsed, maxAttempts, decision.Strategy)
retryScheduled = true
} else if req := p.loadCardTokenRetryRequest(operationRef); req != nil {
state.Status = model.PayoutStatusProcessing
state.FailureReason = ""
p.logger.Info("Callback decline is retryable; scheduling card token payout retry",
zap.String("operation_ref", operationRef),
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
zap.Uint32("attempts_used", attemptsUsed),
zap.Uint32("max_attempts", maxAttempts),
)
if err := p.updatePayoutStatus(ctx, state); err != nil {
p.logger.Warn("Failed to persist callback token retry scheduling state", zap.Error(err))
return http.StatusInternalServerError, err
}
p.scheduleCardTokenPayoutRetry(req, attemptsUsed, maxAttempts, decision.Strategy)
retryScheduled = true
} else {
p.logger.Warn("Retryable callback decline received but no retry request snapshot found",
zap.String("operation_ref", operationRef),
zap.String("provider_code", strings.TrimSpace(state.ProviderCode)),
zap.Uint32("attempts_used", attemptsUsed),
zap.Uint32("max_attempts", maxAttempts),
)
}
}
if !retryScheduled && decision.Action == payoutFailureActionRetry {
state.Status = model.PayoutStatusNeedsAttention
}
if existing != nil && existing.Status == model.PayoutStatusNeedsAttention {
state.Status = model.PayoutStatusNeedsAttention
}
if !retryScheduled && strings.TrimSpace(state.FailureReason) == "" {
state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage)
}
} else if state.Status == model.PayoutStatusSuccess {
state.FailureReason = ""
}
if !retryScheduled {
if err := p.updatePayoutStatus(ctx, state); err != nil {
p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err))
}
if isFinalStatus(state) {
p.clearRetryState(operationRef)
}
}
monetix.ObserveCallback(statusLabel)
p.logger.Info("Monetix payout callback processed",
zap.String("payment_ref", state.PaymentRef),
zap.String("status", statusLabel),
zap.String("provider_code", state.ProviderCode),
zap.String("provider_message", state.ProviderMessage),
zap.Bool("retry_scheduled", retryScheduled),
zap.String("masked_account", cb.Account.Number),
)
return http.StatusOK, nil
}