1212 lines
37 KiB
Go
1212 lines
37 KiB
Go
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/shopspring/decimal"
|
|
gatewayoutbox "github.com/tech/sendico/gateway/common/outbox"
|
|
"github.com/tech/sendico/gateway/mntx/internal/service/monetix"
|
|
"github.com/tech/sendico/gateway/mntx/storage"
|
|
"github.com/tech/sendico/gateway/mntx/storage/model"
|
|
clockpkg "github.com/tech/sendico/pkg/clock"
|
|
"github.com/tech/sendico/pkg/db/storable"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
msg "github.com/tech/sendico/pkg/messaging"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
pmodel "github.com/tech/sendico/pkg/model"
|
|
gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1"
|
|
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
const (
|
|
defaultDispatchThrottleInterval = 150 * time.Millisecond
|
|
defaultMaxDispatchAttempts = uint32(5)
|
|
)
|
|
|
|
type cardPayoutProcessor struct {
|
|
logger mlogger.Logger
|
|
config monetix.Config
|
|
clock clockpkg.Clock
|
|
store storage.Repository
|
|
httpClient *http.Client
|
|
producer msg.Producer
|
|
msgCfg pmodel.SettingsT
|
|
outbox *gatewayoutbox.ReliableRuntime
|
|
|
|
perTxMinAmountMinor int64
|
|
perTxMinAmountMinorByCurrency map[string]int64
|
|
dispatchThrottleInterval time.Duration
|
|
|
|
dispatchMu sync.Mutex
|
|
nextDispatchAllowed time.Time
|
|
|
|
retryPolicy payoutFailurePolicy
|
|
retryDelayFn func(attempt uint32) time.Duration
|
|
|
|
retryMu sync.Mutex
|
|
retryTimers map[string]*time.Timer
|
|
retryCtx context.Context
|
|
retryStop context.CancelFunc
|
|
}
|
|
|
|
func mergePayoutStateWithExisting(state, existing *model.CardPayout) {
|
|
if state == nil || existing == nil {
|
|
return
|
|
}
|
|
|
|
state.ID = existing.ID // preserve ID for upsert
|
|
if !existing.CreatedAt.IsZero() {
|
|
state.CreatedAt = existing.CreatedAt
|
|
}
|
|
if state.OperationRef == "" {
|
|
state.OperationRef = existing.OperationRef
|
|
}
|
|
if state.IdempotencyKey == "" {
|
|
state.IdempotencyKey = existing.IdempotencyKey
|
|
}
|
|
if state.IntentRef == "" {
|
|
state.IntentRef = existing.IntentRef
|
|
}
|
|
if existing.PaymentRef != "" {
|
|
state.PaymentRef = existing.PaymentRef
|
|
}
|
|
}
|
|
|
|
func findOperationRef(operationRef, payoutID string) string {
|
|
ref := strings.TrimSpace(operationRef)
|
|
if ref != "" {
|
|
return ref
|
|
}
|
|
return strings.TrimSpace(payoutID)
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) findExistingPayoutState(ctx context.Context, state *model.CardPayout) (*model.CardPayout, error) {
|
|
if p == nil || state == nil {
|
|
return nil, nil
|
|
}
|
|
if opRef := strings.TrimSpace(state.OperationRef); opRef != "" {
|
|
existing, err := p.store.Payouts().FindByOperationRef(ctx, opRef)
|
|
if err == nil {
|
|
if existing != nil {
|
|
return existing, nil
|
|
}
|
|
}
|
|
if !errors.Is(err, merrors.ErrNoData) {
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) findAndMergePayoutState(ctx context.Context, state *model.CardPayout) (*model.CardPayout, error) {
|
|
if p == nil || state == nil {
|
|
return nil, nil
|
|
}
|
|
existing, err := p.findExistingPayoutState(ctx, state)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
mergePayoutStateWithExisting(state, existing)
|
|
return existing, nil
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) resolveProjectID(requestProjectID int64, logFieldKey, logFieldValue string) (int64, error) {
|
|
projectID := requestProjectID
|
|
if projectID == 0 {
|
|
projectID = p.config.ProjectID
|
|
}
|
|
if projectID == 0 {
|
|
p.logger.Warn("Monetix project_id is not configured", zap.String(logFieldKey, logFieldValue))
|
|
return 0, merrors.Internal("mcards project_id is not configured")
|
|
}
|
|
return projectID, nil
|
|
}
|
|
|
|
func applyCardPayoutSendResult(state *model.CardPayout, result *monetix.CardPayoutSendResult) {
|
|
if state == nil || result == nil {
|
|
return
|
|
}
|
|
state.ProviderPaymentID = strings.TrimSpace(result.ProviderRequestID)
|
|
if result.Accepted {
|
|
state.Status = model.PayoutStatusWaiting
|
|
state.ProviderCode = ""
|
|
state.ProviderMessage = ""
|
|
return
|
|
}
|
|
state.Status = model.PayoutStatusFailed
|
|
state.ProviderCode = strings.TrimSpace(result.ErrorCode)
|
|
state.ProviderMessage = strings.TrimSpace(result.ErrorMessage)
|
|
}
|
|
|
|
func payoutStateLogFields(state *model.CardPayout) []zap.Field {
|
|
if state == nil {
|
|
return nil
|
|
}
|
|
return []zap.Field{
|
|
zap.String("payment_ref", state.PaymentRef),
|
|
zap.String("customer_id", state.CustomerID),
|
|
zap.String("operation_ref", state.OperationRef),
|
|
zap.String("idempotency_key", state.IdempotencyKey),
|
|
zap.String("intent_ref", state.IntentRef),
|
|
}
|
|
}
|
|
|
|
func newCardPayoutProcessor(
|
|
logger mlogger.Logger,
|
|
cfg monetix.Config,
|
|
clock clockpkg.Clock,
|
|
store storage.Repository,
|
|
client *http.Client,
|
|
producer msg.Producer,
|
|
) *cardPayoutProcessor {
|
|
retryCtx, retryStop := context.WithCancel(context.Background())
|
|
return &cardPayoutProcessor{
|
|
logger: logger.Named("card_payout_processor"),
|
|
config: cfg,
|
|
clock: clock,
|
|
store: store,
|
|
httpClient: client,
|
|
producer: producer,
|
|
dispatchThrottleInterval: defaultDispatchThrottleInterval,
|
|
retryPolicy: defaultPayoutFailurePolicy(),
|
|
retryDelayFn: retryDelayDuration,
|
|
retryTimers: map[string]*time.Timer{},
|
|
retryCtx: retryCtx,
|
|
retryStop: retryStop,
|
|
}
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) applyGatewayDescriptor(descriptor *gatewayv1.GatewayInstanceDescriptor) {
|
|
if p == nil {
|
|
return
|
|
}
|
|
minAmountMinor, perCurrency := perTxMinAmountPolicy(descriptor)
|
|
p.perTxMinAmountMinor = minAmountMinor
|
|
p.perTxMinAmountMinorByCurrency = perCurrency
|
|
p.dispatchThrottleInterval = dispatchThrottleIntervalFromDescriptor(descriptor, defaultDispatchThrottleInterval)
|
|
p.logger.Info("Configured payout dispatch throttle",
|
|
zap.Duration("dispatch_interval", p.dispatchThrottleInterval),
|
|
)
|
|
}
|
|
|
|
func perTxMinAmountPolicy(descriptor *gatewayv1.GatewayInstanceDescriptor) (int64, map[string]int64) {
|
|
if descriptor == nil || descriptor.GetLimits() == nil {
|
|
return 0, nil
|
|
}
|
|
limits := descriptor.GetLimits()
|
|
globalMin, _ := decimalAmountToMinor(firstNonEmpty(limits.GetPerTxMinAmount(), limits.GetMinAmount()))
|
|
perCurrency := map[string]int64{}
|
|
for currency, override := range limits.GetCurrencyLimits() {
|
|
if override == nil {
|
|
continue
|
|
}
|
|
minor, ok := decimalAmountToMinor(override.GetMinAmount())
|
|
if !ok {
|
|
continue
|
|
}
|
|
code := strings.ToUpper(strings.TrimSpace(currency))
|
|
if code == "" {
|
|
continue
|
|
}
|
|
perCurrency[code] = minor
|
|
}
|
|
if len(perCurrency) == 0 {
|
|
perCurrency = nil
|
|
}
|
|
return globalMin, perCurrency
|
|
}
|
|
|
|
func decimalAmountToMinor(raw string) (int64, bool) {
|
|
raw = strings.TrimSpace(raw)
|
|
if raw == "" {
|
|
return 0, false
|
|
}
|
|
value, err := decimal.NewFromString(raw)
|
|
if err != nil || !value.IsPositive() {
|
|
return 0, false
|
|
}
|
|
minor := value.Mul(decimal.NewFromInt(100)).Ceil().IntPart()
|
|
if minor <= 0 {
|
|
return 0, false
|
|
}
|
|
return minor, true
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) validatePerTxMinimum(amountMinor int64, currency string) error {
|
|
if p == nil {
|
|
return nil
|
|
}
|
|
minAmountMinor := p.perTxMinimum(currency)
|
|
if minAmountMinor <= 0 || amountMinor >= minAmountMinor {
|
|
return nil
|
|
}
|
|
return newPayoutError("amount_below_minimum", merrors.InvalidArgument(
|
|
fmt.Sprintf("amount_minor must be at least %d", minAmountMinor),
|
|
"amount_minor",
|
|
))
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) perTxMinimum(currency string) int64 {
|
|
if p == nil {
|
|
return 0
|
|
}
|
|
minAmountMinor := p.perTxMinAmountMinor
|
|
if len(p.perTxMinAmountMinorByCurrency) == 0 {
|
|
return minAmountMinor
|
|
}
|
|
code := strings.ToUpper(strings.TrimSpace(currency))
|
|
if code == "" {
|
|
return minAmountMinor
|
|
}
|
|
if override, ok := p.perTxMinAmountMinorByCurrency[code]; ok && override > 0 {
|
|
return override
|
|
}
|
|
return minAmountMinor
|
|
}
|
|
|
|
func dispatchThrottleIntervalFromDescriptor(
|
|
descriptor *gatewayv1.GatewayInstanceDescriptor,
|
|
fallback time.Duration,
|
|
) time.Duration {
|
|
if fallback < 0 {
|
|
fallback = 0
|
|
}
|
|
if descriptor == nil || descriptor.GetLimits() == nil {
|
|
return fallback
|
|
}
|
|
velocity := descriptor.GetLimits().GetVelocityLimit()
|
|
if len(velocity) == 0 {
|
|
return fallback
|
|
}
|
|
|
|
interval := time.Duration(0)
|
|
for bucket, maxOps := range velocity {
|
|
cleanBucket := strings.TrimSpace(bucket)
|
|
if cleanBucket == "" || maxOps <= 0 {
|
|
continue
|
|
}
|
|
window, err := time.ParseDuration(cleanBucket)
|
|
if err != nil || window <= 0 {
|
|
continue
|
|
}
|
|
candidate := window / time.Duration(maxOps)
|
|
if candidate <= 0 {
|
|
continue
|
|
}
|
|
if candidate > interval {
|
|
interval = candidate
|
|
}
|
|
}
|
|
if interval <= 0 {
|
|
return fallback
|
|
}
|
|
return interval
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) waitDispatchSlot(ctx context.Context) error {
|
|
if p == nil {
|
|
return merrors.Internal("card payout processor not initialised")
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
if p.dispatchThrottleInterval <= 0 {
|
|
return nil
|
|
}
|
|
|
|
for {
|
|
p.dispatchMu.Lock()
|
|
now := time.Now().UTC()
|
|
if p.nextDispatchAllowed.IsZero() || !p.nextDispatchAllowed.After(now) {
|
|
p.nextDispatchAllowed = now.Add(p.dispatchThrottleInterval)
|
|
p.dispatchMu.Unlock()
|
|
return nil
|
|
}
|
|
wait := p.nextDispatchAllowed.Sub(now)
|
|
p.dispatchMu.Unlock()
|
|
|
|
timer := time.NewTimer(wait)
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return ctx.Err()
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) stopRetries() {
|
|
if p == nil {
|
|
return
|
|
}
|
|
if p.retryStop != nil {
|
|
p.retryStop()
|
|
}
|
|
p.retryMu.Lock()
|
|
defer p.retryMu.Unlock()
|
|
for key, timer := range p.retryTimers {
|
|
if timer != nil {
|
|
timer.Stop()
|
|
}
|
|
delete(p.retryTimers, key)
|
|
}
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) clearRetryTimer(operationRef string) {
|
|
if p == nil {
|
|
return
|
|
}
|
|
key := strings.TrimSpace(operationRef)
|
|
if key == "" {
|
|
return
|
|
}
|
|
p.retryMu.Lock()
|
|
defer p.retryMu.Unlock()
|
|
timer := p.retryTimers[key]
|
|
if timer != nil {
|
|
timer.Stop()
|
|
}
|
|
delete(p.retryTimers, key)
|
|
}
|
|
|
|
func payoutAcceptedForState(state *model.CardPayout) bool {
|
|
if state == nil {
|
|
return false
|
|
}
|
|
switch state.Status {
|
|
case model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func cardPayoutResponseFromState(
|
|
state *model.CardPayout,
|
|
accepted bool,
|
|
errorCode string,
|
|
errorMessage string,
|
|
) *mntxv1.CardPayoutResponse {
|
|
return &mntxv1.CardPayoutResponse{
|
|
Payout: StateToProto(state),
|
|
Accepted: accepted,
|
|
ProviderRequestId: strings.TrimSpace(firstNonEmpty(state.ProviderPaymentID)),
|
|
ErrorCode: strings.TrimSpace(errorCode),
|
|
ErrorMessage: strings.TrimSpace(errorMessage),
|
|
}
|
|
}
|
|
|
|
func cardTokenPayoutResponseFromState(
|
|
state *model.CardPayout,
|
|
accepted bool,
|
|
errorCode string,
|
|
errorMessage string,
|
|
) *mntxv1.CardTokenPayoutResponse {
|
|
return &mntxv1.CardTokenPayoutResponse{
|
|
Payout: StateToProto(state),
|
|
Accepted: accepted,
|
|
ProviderRequestId: strings.TrimSpace(firstNonEmpty(state.ProviderPaymentID)),
|
|
ErrorCode: strings.TrimSpace(errorCode),
|
|
ErrorMessage: strings.TrimSpace(errorMessage),
|
|
}
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv1.CardPayoutRequest) (*monetix.CardPayoutSendResult, error) {
|
|
if p == nil {
|
|
return nil, merrors.Internal("card payout processor not initialised")
|
|
}
|
|
if req == nil {
|
|
return nil, merrors.InvalidArgument("card payout request is required")
|
|
}
|
|
if err := p.waitDispatchSlot(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
client := monetix.NewClient(p.config, p.httpClient, p.logger)
|
|
apiReq := buildCardPayoutRequest(req.GetProjectId(), req)
|
|
return client.CreateCardPayout(ctx, apiReq)
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req *mntxv1.CardTokenPayoutRequest) (*monetix.CardPayoutSendResult, error) {
|
|
if p == nil {
|
|
return nil, merrors.Internal("card payout processor not initialised")
|
|
}
|
|
if req == nil {
|
|
return nil, merrors.InvalidArgument("card token payout request is required")
|
|
}
|
|
if err := p.waitDispatchSlot(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
client := monetix.NewClient(p.config, p.httpClient, p.logger)
|
|
apiReq := buildCardTokenPayoutRequest(req.GetProjectId(), req)
|
|
return client.CreateCardTokenPayout(ctx, apiReq)
|
|
}
|
|
|
|
func maxDispatchAttempts(v uint32) uint32 {
|
|
if v == 0 {
|
|
return defaultMaxDispatchAttempts
|
|
}
|
|
return v
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) scheduleRetryTimer(operationRef string, delay time.Duration, run func()) {
|
|
if p == nil || run == nil {
|
|
return
|
|
}
|
|
key := strings.TrimSpace(operationRef)
|
|
if key == "" {
|
|
return
|
|
}
|
|
if delay < 0 {
|
|
delay = 0
|
|
}
|
|
p.retryMu.Lock()
|
|
defer p.retryMu.Unlock()
|
|
if old := p.retryTimers[key]; old != nil {
|
|
old.Stop()
|
|
}
|
|
|
|
var timer *time.Timer
|
|
timer = time.AfterFunc(delay, func() {
|
|
select {
|
|
case <-p.retryCtx.Done():
|
|
return
|
|
default:
|
|
}
|
|
p.retryMu.Lock()
|
|
if p.retryTimers[key] == timer {
|
|
delete(p.retryTimers, key)
|
|
}
|
|
p.retryMu.Unlock()
|
|
run()
|
|
})
|
|
p.retryTimers[key] = timer
|
|
}
|
|
|
|
func retryDelayDuration(attempt uint32) time.Duration {
|
|
return time.Duration(retryDelayForAttempt(attempt)) * time.Second
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) scheduleCardPayoutRetry(req *mntxv1.CardPayoutRequest, failedAttempt uint32, maxAttempts uint32) {
|
|
if p == nil || req == nil {
|
|
return
|
|
}
|
|
maxAttempts = maxDispatchAttempts(maxAttempts)
|
|
nextAttempt := failedAttempt + 1
|
|
if nextAttempt > maxAttempts {
|
|
return
|
|
}
|
|
cloned, ok := proto.Clone(req).(*mntxv1.CardPayoutRequest)
|
|
if !ok {
|
|
return
|
|
}
|
|
operationRef := findOperationRef(cloned.GetOperationRef(), cloned.GetPayoutId())
|
|
delay := retryDelayDuration(failedAttempt)
|
|
if p.retryDelayFn != nil {
|
|
delay = p.retryDelayFn(failedAttempt)
|
|
}
|
|
p.scheduleRetryTimer(operationRef, delay, func() {
|
|
p.runCardPayoutRetry(cloned, nextAttempt, maxAttempts)
|
|
})
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, failedAttempt uint32, maxAttempts uint32) {
|
|
if p == nil || req == nil {
|
|
return
|
|
}
|
|
maxAttempts = maxDispatchAttempts(maxAttempts)
|
|
nextAttempt := failedAttempt + 1
|
|
if nextAttempt > maxAttempts {
|
|
return
|
|
}
|
|
cloned, ok := proto.Clone(req).(*mntxv1.CardTokenPayoutRequest)
|
|
if !ok {
|
|
return
|
|
}
|
|
operationRef := findOperationRef(cloned.GetOperationRef(), cloned.GetPayoutId())
|
|
delay := retryDelayDuration(failedAttempt)
|
|
if p.retryDelayFn != nil {
|
|
delay = p.retryDelayFn(failedAttempt)
|
|
}
|
|
p.scheduleRetryTimer(operationRef, delay, func() {
|
|
p.runCardTokenPayoutRetry(cloned, nextAttempt, maxAttempts)
|
|
})
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) retryContext() (context.Context, context.CancelFunc) {
|
|
if p == nil {
|
|
return context.Background(), func() {}
|
|
}
|
|
ctx := p.retryCtx
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
timeout := p.config.Timeout()
|
|
if timeout <= 0 {
|
|
return ctx, func() {}
|
|
}
|
|
return context.WithTimeout(ctx, timeout)
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest, attempt uint32, maxAttempts uint32) {
|
|
if p == nil || req == nil {
|
|
return
|
|
}
|
|
operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
|
if operationRef == "" {
|
|
return
|
|
}
|
|
ctx, cancel := p.retryContext()
|
|
defer cancel()
|
|
|
|
state, err := p.store.Payouts().FindByOperationRef(ctx, operationRef)
|
|
if err != nil || state == nil {
|
|
p.logger.Warn("Retry payout state lookup failed",
|
|
zap.String("operation_ref", operationRef),
|
|
zap.Uint32("attempt", attempt),
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
if isFinalStatus(state) {
|
|
p.clearRetryTimer(operationRef)
|
|
return
|
|
}
|
|
|
|
result, err := p.dispatchCardPayout(ctx, req)
|
|
now := p.clock.Now()
|
|
maxAttempts = maxDispatchAttempts(maxAttempts)
|
|
if err != nil {
|
|
decision := p.retryPolicy.decideTransportFailure()
|
|
state.ProviderCode = ""
|
|
state.ProviderMessage = err.Error()
|
|
state.UpdatedAt = now
|
|
if decision.Action == payoutFailureActionRetry && attempt < maxAttempts {
|
|
state.Status = model.PayoutStatusProcessing
|
|
state.FailureReason = ""
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist retryable payout transport failure", zap.Error(upErr))
|
|
return
|
|
}
|
|
p.scheduleCardPayoutRetry(req, attempt, maxAttempts)
|
|
return
|
|
}
|
|
|
|
state.Status = model.PayoutStatusFailed
|
|
state.FailureReason = payoutFailureReason("", err.Error())
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist terminal payout transport failure", zap.Error(upErr))
|
|
}
|
|
p.clearRetryTimer(operationRef)
|
|
return
|
|
}
|
|
|
|
applyCardPayoutSendResult(state, result)
|
|
state.UpdatedAt = now
|
|
if result.Accepted {
|
|
state.FailureReason = ""
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist accepted payout retry result", zap.Error(upErr))
|
|
}
|
|
p.clearRetryTimer(operationRef)
|
|
return
|
|
}
|
|
|
|
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
|
if decision.Action == payoutFailureActionRetry && attempt < maxAttempts {
|
|
state.Status = model.PayoutStatusProcessing
|
|
state.FailureReason = ""
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist retryable payout provider failure", zap.Error(upErr))
|
|
return
|
|
}
|
|
p.scheduleCardPayoutRetry(req, attempt, maxAttempts)
|
|
return
|
|
}
|
|
|
|
state.Status = model.PayoutStatusFailed
|
|
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist terminal payout provider failure", zap.Error(upErr))
|
|
}
|
|
p.clearRetryTimer(operationRef)
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, attempt uint32, maxAttempts uint32) {
|
|
if p == nil || req == nil {
|
|
return
|
|
}
|
|
operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
|
if operationRef == "" {
|
|
return
|
|
}
|
|
ctx, cancel := p.retryContext()
|
|
defer cancel()
|
|
|
|
state, err := p.store.Payouts().FindByOperationRef(ctx, operationRef)
|
|
if err != nil || state == nil {
|
|
p.logger.Warn("Retry token payout state lookup failed",
|
|
zap.String("operation_ref", operationRef),
|
|
zap.Uint32("attempt", attempt),
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
if isFinalStatus(state) {
|
|
p.clearRetryTimer(operationRef)
|
|
return
|
|
}
|
|
|
|
result, err := p.dispatchCardTokenPayout(ctx, req)
|
|
now := p.clock.Now()
|
|
maxAttempts = maxDispatchAttempts(maxAttempts)
|
|
if err != nil {
|
|
decision := p.retryPolicy.decideTransportFailure()
|
|
state.ProviderCode = ""
|
|
state.ProviderMessage = err.Error()
|
|
state.UpdatedAt = now
|
|
if decision.Action == payoutFailureActionRetry && attempt < maxAttempts {
|
|
state.Status = model.PayoutStatusProcessing
|
|
state.FailureReason = ""
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist retryable token payout transport failure", zap.Error(upErr))
|
|
return
|
|
}
|
|
p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts)
|
|
return
|
|
}
|
|
|
|
state.Status = model.PayoutStatusFailed
|
|
state.FailureReason = payoutFailureReason("", err.Error())
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist terminal token payout transport failure", zap.Error(upErr))
|
|
}
|
|
p.clearRetryTimer(operationRef)
|
|
return
|
|
}
|
|
|
|
applyCardPayoutSendResult(state, result)
|
|
state.UpdatedAt = now
|
|
if result.Accepted {
|
|
state.FailureReason = ""
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist accepted token payout retry result", zap.Error(upErr))
|
|
}
|
|
p.clearRetryTimer(operationRef)
|
|
return
|
|
}
|
|
|
|
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
|
if decision.Action == payoutFailureActionRetry && attempt < maxAttempts {
|
|
state.Status = model.PayoutStatusProcessing
|
|
state.FailureReason = ""
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist retryable token payout provider failure", zap.Error(upErr))
|
|
return
|
|
}
|
|
p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts)
|
|
return
|
|
}
|
|
|
|
state.Status = model.PayoutStatusFailed
|
|
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
|
if upErr := p.updatePayoutStatus(ctx, state); upErr != nil {
|
|
p.logger.Warn("Failed to persist terminal token payout provider failure", zap.Error(upErr))
|
|
}
|
|
p.clearRetryTimer(operationRef)
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayoutRequest) (*mntxv1.CardPayoutResponse, error) {
|
|
if p == nil {
|
|
return nil, merrors.Internal("card payout processor not initialised")
|
|
}
|
|
|
|
req = sanitizeCardPayoutRequest(req)
|
|
operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
|
parentPaymentRef := strings.TrimSpace(req.GetParentPaymentRef())
|
|
|
|
p.logger.Info("Submitting card payout",
|
|
zap.String("parent_payment_ref", parentPaymentRef),
|
|
zap.String("customer_id", strings.TrimSpace(req.GetCustomerId())),
|
|
zap.Int64("amount_minor", req.GetAmountMinor()),
|
|
zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))),
|
|
zap.String("operation_ref", operationRef),
|
|
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
|
|
)
|
|
|
|
if strings.TrimSpace(p.config.BaseURL) == "" || strings.TrimSpace(p.config.SecretKey) == "" {
|
|
p.logger.Warn("Monetix configuration is incomplete for payout submission")
|
|
return nil, merrors.Internal("monetix configuration is incomplete")
|
|
}
|
|
|
|
if err := validateCardPayoutRequest(req, p.config); err != nil {
|
|
p.logger.Warn("Card payout validation failed",
|
|
zap.String("parent_payment_ref", parentPaymentRef),
|
|
zap.String("operation_ref", operationRef),
|
|
zap.String("customer_id", req.GetCustomerId()),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
if err := p.validatePerTxMinimum(req.GetAmountMinor(), req.GetCurrency()); err != nil {
|
|
p.logger.Warn("Card payout amount below configured minimum",
|
|
zap.String("parent_payment_ref", parentPaymentRef),
|
|
zap.String("operation_ref", operationRef),
|
|
zap.String("customer_id", req.GetCustomerId()),
|
|
zap.Int64("amount_minor", req.GetAmountMinor()),
|
|
zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))),
|
|
zap.Int64("configured_min_amount_minor", p.perTxMinimum(req.GetCurrency())),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
projectID, err := p.resolveProjectID(req.GetProjectId(), "operation_ref", operationRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.ProjectId = projectID
|
|
|
|
now := p.clock.Now()
|
|
|
|
state := &model.CardPayout{
|
|
Base: storable.Base{
|
|
ID: bson.NilObjectID,
|
|
},
|
|
PaymentRef: parentPaymentRef,
|
|
OperationRef: operationRef,
|
|
IdempotencyKey: strings.TrimSpace(req.GetIdempotencyKey()),
|
|
IntentRef: strings.TrimSpace(req.GetIntentRef()),
|
|
ProjectID: projectID,
|
|
CustomerID: strings.TrimSpace(req.GetCustomerId()),
|
|
AmountMinor: req.GetAmountMinor(),
|
|
Currency: strings.ToUpper(strings.TrimSpace(req.GetCurrency())),
|
|
Status: model.PayoutStatusProcessing,
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}
|
|
|
|
// Keep CreatedAt/refs if record already exists.
|
|
existing, _ := p.findAndMergePayoutState(ctx, state)
|
|
if existing != nil {
|
|
switch existing.Status {
|
|
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
|
return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
|
}
|
|
}
|
|
|
|
result, err := p.dispatchCardPayout(ctx, req)
|
|
if err != nil {
|
|
decision := p.retryPolicy.decideTransportFailure()
|
|
state.ProviderMessage = err.Error()
|
|
state.UpdatedAt = p.clock.Now()
|
|
maxAttempts := maxDispatchAttempts(0)
|
|
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
|
state.Status = model.PayoutStatusProcessing
|
|
state.FailureReason = ""
|
|
if e := p.updatePayoutStatus(ctx, state); e != nil {
|
|
fields := append([]zap.Field{zap.Error(e)}, payoutStateLogFields(state)...)
|
|
p.logger.Warn("Failed to update payout status", fields...)
|
|
return nil, e
|
|
}
|
|
p.scheduleCardPayoutRetry(req, 1, maxAttempts)
|
|
return cardPayoutResponseFromState(state, true, "", ""), nil
|
|
}
|
|
|
|
state.Status = model.PayoutStatusFailed
|
|
state.FailureReason = payoutFailureReason("", err.Error())
|
|
if e := p.updatePayoutStatus(ctx, state); e != nil {
|
|
fields := append([]zap.Field{zap.Error(e)}, payoutStateLogFields(state)...)
|
|
p.logger.Warn("Failed to update payout status", fields...)
|
|
return nil, e
|
|
}
|
|
fields := append([]zap.Field{zap.Error(err)}, payoutStateLogFields(state)...)
|
|
p.logger.Warn("Monetix payout submission failed", fields...)
|
|
p.clearRetryTimer(state.OperationRef)
|
|
return nil, err
|
|
}
|
|
|
|
applyCardPayoutSendResult(state, result)
|
|
state.UpdatedAt = p.clock.Now()
|
|
accepted := result.Accepted
|
|
errorCode := strings.TrimSpace(result.ErrorCode)
|
|
errorMessage := strings.TrimSpace(result.ErrorMessage)
|
|
|
|
if !result.Accepted {
|
|
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
|
maxAttempts := maxDispatchAttempts(0)
|
|
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
|
state.Status = model.PayoutStatusProcessing
|
|
state.FailureReason = ""
|
|
accepted = true
|
|
errorCode = ""
|
|
errorMessage = ""
|
|
p.scheduleCardPayoutRetry(req, 1, maxAttempts)
|
|
} else {
|
|
state.Status = model.PayoutStatusFailed
|
|
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
|
p.clearRetryTimer(state.OperationRef)
|
|
}
|
|
} else {
|
|
p.clearRetryTimer(state.OperationRef)
|
|
}
|
|
|
|
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
|
p.logger.Warn("Failed to store payout",
|
|
zap.Error(err),
|
|
zap.String("payment_ref", state.PaymentRef),
|
|
zap.String("customer_id", state.CustomerID),
|
|
zap.String("operation_ref", state.OperationRef),
|
|
zap.String("idempotency_key", state.IdempotencyKey),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
resp := cardPayoutResponseFromState(state, accepted, errorCode, errorMessage)
|
|
|
|
p.logger.Info("Card payout submission stored",
|
|
zap.String("payment_ref", state.PaymentRef),
|
|
zap.String("status", string(state.Status)),
|
|
zap.Bool("accepted", accepted),
|
|
zap.String("provider_request_id", resp.GetProviderRequestId()),
|
|
)
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardTokenPayoutRequest) (*mntxv1.CardTokenPayoutResponse, error) {
|
|
if p == nil {
|
|
return nil, merrors.Internal("card payout processor not initialised")
|
|
}
|
|
|
|
req = sanitizeCardTokenPayoutRequest(req)
|
|
operationRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
|
parentPaymentRef := strings.TrimSpace(req.GetParentPaymentRef())
|
|
|
|
p.logger.Info("Submitting card token payout",
|
|
zap.String("parent_payment_ref", parentPaymentRef),
|
|
zap.String("customer_id", strings.TrimSpace(req.GetCustomerId())),
|
|
zap.Int64("amount_minor", req.GetAmountMinor()),
|
|
zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))),
|
|
zap.String("operation_ref", operationRef),
|
|
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
|
|
)
|
|
|
|
if strings.TrimSpace(p.config.BaseURL) == "" || strings.TrimSpace(p.config.SecretKey) == "" {
|
|
p.logger.Warn("Monetix configuration is incomplete for token payout submission")
|
|
return nil, merrors.Internal("monetix configuration is incomplete")
|
|
}
|
|
|
|
if err := validateCardTokenPayoutRequest(req, p.config); err != nil {
|
|
p.logger.Warn("Card token payout validation failed",
|
|
zap.String("parent_payment_ref", parentPaymentRef),
|
|
zap.String("operation_ref", operationRef),
|
|
zap.String("customer_id", req.GetCustomerId()),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
if err := p.validatePerTxMinimum(req.GetAmountMinor(), req.GetCurrency()); err != nil {
|
|
p.logger.Warn("Card token payout amount below configured minimum",
|
|
zap.String("parent_payment_ref", parentPaymentRef),
|
|
zap.String("operation_ref", operationRef),
|
|
zap.String("customer_id", req.GetCustomerId()),
|
|
zap.Int64("amount_minor", req.GetAmountMinor()),
|
|
zap.String("currency", strings.ToUpper(strings.TrimSpace(req.GetCurrency()))),
|
|
zap.Int64("configured_min_amount_minor", p.perTxMinimum(req.GetCurrency())),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
projectID, err := p.resolveProjectID(req.GetProjectId(), "operation_ref", operationRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.ProjectId = projectID
|
|
|
|
now := p.clock.Now()
|
|
state := &model.CardPayout{
|
|
PaymentRef: parentPaymentRef,
|
|
OperationRef: operationRef,
|
|
IdempotencyKey: strings.TrimSpace(req.GetIdempotencyKey()),
|
|
IntentRef: strings.TrimSpace(req.GetIntentRef()),
|
|
ProjectID: projectID,
|
|
CustomerID: strings.TrimSpace(req.GetCustomerId()),
|
|
AmountMinor: req.GetAmountMinor(),
|
|
Currency: strings.ToUpper(strings.TrimSpace(req.GetCurrency())),
|
|
Status: model.PayoutStatusProcessing,
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
}
|
|
|
|
existing, _ := p.findAndMergePayoutState(ctx, state)
|
|
if existing != nil {
|
|
switch existing.Status {
|
|
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
|
return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
|
}
|
|
}
|
|
|
|
result, err := p.dispatchCardTokenPayout(ctx, req)
|
|
if err != nil {
|
|
decision := p.retryPolicy.decideTransportFailure()
|
|
state.ProviderMessage = err.Error()
|
|
state.UpdatedAt = p.clock.Now()
|
|
maxAttempts := maxDispatchAttempts(0)
|
|
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
|
state.Status = model.PayoutStatusProcessing
|
|
state.FailureReason = ""
|
|
if e := p.updatePayoutStatus(ctx, state); e != nil {
|
|
return nil, e
|
|
}
|
|
p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts)
|
|
return cardTokenPayoutResponseFromState(state, true, "", ""), nil
|
|
}
|
|
|
|
state.Status = model.PayoutStatusFailed
|
|
state.FailureReason = payoutFailureReason("", err.Error())
|
|
if e := p.updatePayoutStatus(ctx, state); e != nil {
|
|
return nil, e
|
|
}
|
|
p.clearRetryTimer(state.OperationRef)
|
|
p.logger.Warn("Monetix token payout submission failed",
|
|
zap.String("payment_ref", state.PaymentRef),
|
|
zap.String("customer_id", state.CustomerID),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
applyCardPayoutSendResult(state, result)
|
|
accepted := result.Accepted
|
|
errorCode := strings.TrimSpace(result.ErrorCode)
|
|
errorMessage := strings.TrimSpace(result.ErrorMessage)
|
|
|
|
if !result.Accepted {
|
|
decision := p.retryPolicy.decideProviderFailure(result.ErrorCode)
|
|
maxAttempts := maxDispatchAttempts(0)
|
|
if decision.Action == payoutFailureActionRetry && maxAttempts > 1 {
|
|
state.Status = model.PayoutStatusProcessing
|
|
state.FailureReason = ""
|
|
accepted = true
|
|
errorCode = ""
|
|
errorMessage = ""
|
|
p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts)
|
|
} else {
|
|
state.Status = model.PayoutStatusFailed
|
|
state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage)
|
|
p.clearRetryTimer(state.OperationRef)
|
|
}
|
|
} else {
|
|
p.clearRetryTimer(state.OperationRef)
|
|
}
|
|
|
|
state.UpdatedAt = p.clock.Now()
|
|
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
|
p.logger.Warn("Failed to update payout status", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
resp := cardTokenPayoutResponseFromState(state, accepted, errorCode, errorMessage)
|
|
|
|
p.logger.Info("Card token payout submission stored",
|
|
zap.String("payment_ref", state.PaymentRef),
|
|
zap.String("status", string(state.Status)),
|
|
zap.Bool("accepted", accepted),
|
|
zap.String("provider_request_id", resp.GetProviderRequestId()),
|
|
)
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) Tokenize(ctx context.Context, req *mntxv1.CardTokenizeRequest) (*mntxv1.CardTokenizeResponse, error) {
|
|
if p == nil {
|
|
return nil, merrors.Internal("card payout processor not initialised")
|
|
}
|
|
|
|
p.logger.Info("Submitting card tokenization",
|
|
zap.String("request_id", strings.TrimSpace(req.GetRequestId())),
|
|
zap.String("customer_id", strings.TrimSpace(req.GetCustomerId())),
|
|
)
|
|
|
|
cardInput, err := validateCardTokenizeRequest(req, p.config)
|
|
if err != nil {
|
|
p.logger.Warn("Card tokenization validation failed",
|
|
zap.String("request_id", req.GetRequestId()),
|
|
zap.String("customer_id", req.GetCustomerId()),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
projectID, err := p.resolveProjectID(req.GetProjectId(), "request_id", req.GetRequestId())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req = sanitizeCardTokenizeRequest(req)
|
|
cardInput = extractTokenizeCard(req)
|
|
|
|
client := monetix.NewClient(p.config, p.httpClient, p.logger)
|
|
apiReq := buildCardTokenizeRequest(projectID, req, cardInput)
|
|
|
|
result, err := client.CreateCardTokenization(ctx, apiReq)
|
|
if err != nil {
|
|
p.logger.Warn("Monetix tokenization request failed",
|
|
zap.String("request_id", req.GetRequestId()),
|
|
zap.String("customer_id", req.GetCustomerId()),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
resp := &mntxv1.CardTokenizeResponse{
|
|
RequestId: req.GetRequestId(),
|
|
Success: result.Accepted,
|
|
ErrorCode: result.ErrorCode,
|
|
ErrorMessage: result.ErrorMessage,
|
|
}
|
|
resp.Token = result.Token
|
|
resp.MaskedPan = result.MaskedPAN
|
|
resp.ExpiryMonth = result.ExpiryMonth
|
|
resp.ExpiryYear = result.ExpiryYear
|
|
resp.CardBrand = result.CardBrand
|
|
|
|
p.logger.Info("Card tokenization completed",
|
|
zap.String("request_id", resp.GetRequestId()),
|
|
zap.Bool("success", resp.GetSuccess()),
|
|
zap.String("provider_request_id", result.ProviderRequestID),
|
|
)
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) Status(ctx context.Context, payoutID string) (*mntxv1.CardPayoutState, error) {
|
|
if p == nil {
|
|
return nil, merrors.Internal("card payout processor not initialised")
|
|
}
|
|
|
|
id := strings.TrimSpace(payoutID)
|
|
p.logger.Info("Card payout status requested", zap.String("operation_ref", id))
|
|
|
|
if id == "" {
|
|
p.logger.Warn("Payout status requested with empty payout_id")
|
|
return nil, merrors.InvalidArgument("payout_id is required", "payout_id")
|
|
}
|
|
|
|
state, err := p.store.Payouts().FindByOperationRef(ctx, id)
|
|
if err != nil && !errors.Is(err, merrors.ErrNoData) {
|
|
p.logger.Warn("Payout status lookup by operation ref failed", zap.String("operation_ref", id), zap.Error(err))
|
|
return nil, err
|
|
}
|
|
if state == nil || errors.Is(err, merrors.ErrNoData) {
|
|
p.logger.Warn("Payout status not found", zap.String("operation_ref", id))
|
|
return nil, merrors.NoData("payout not found")
|
|
}
|
|
|
|
p.logger.Info("Card payout status resolved",
|
|
zap.String("payment_ref", state.PaymentRef),
|
|
zap.String("operation_ref", state.OperationRef),
|
|
zap.String("status", string(state.Status)),
|
|
)
|
|
|
|
return StateToProto(state), nil
|
|
}
|
|
|
|
func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byte) (int, error) {
|
|
if p == nil {
|
|
return http.StatusInternalServerError, merrors.Internal("card payout processor not initialised")
|
|
}
|
|
|
|
p.logger.Debug("Processing Monetix callback", zap.Int("payload_bytes", len(payload)))
|
|
|
|
if len(payload) == 0 {
|
|
p.logger.Warn("Received empty Monetix callback payload")
|
|
return http.StatusBadRequest, merrors.InvalidArgument("callback body is empty")
|
|
}
|
|
|
|
if strings.TrimSpace(p.config.SecretKey) == "" {
|
|
p.logger.Warn("Monetix secret key is not configured; cannot verify callback")
|
|
return http.StatusInternalServerError, merrors.Internal("monetix secret key is not configured")
|
|
}
|
|
|
|
var cb monetixCallback
|
|
if err := json.Unmarshal(payload, &cb); err != nil {
|
|
p.logger.Warn("Failed to unmarshal Monetix callback", zap.Error(err))
|
|
return http.StatusBadRequest, err
|
|
}
|
|
|
|
signature, err := verifyCallbackSignature(payload, p.config.SecretKey)
|
|
if err != nil {
|
|
status := http.StatusBadRequest
|
|
if errors.Is(err, merrors.ErrDataConflict) {
|
|
status = http.StatusForbidden
|
|
}
|
|
p.logger.Warn("Monetix callback signature check failed",
|
|
zap.String("payout_id", cb.Payment.ID),
|
|
zap.String("signature", signature),
|
|
zap.String("payload", string(payload)),
|
|
zap.Error(err),
|
|
)
|
|
return status, err
|
|
}
|
|
|
|
// mapCallbackToState currently returns proto-state in your code.
|
|
// Convert it to mongo model and preserve internal refs if record exists.
|
|
pbState, statusLabel := mapCallbackToState(p.clock, p.config, cb)
|
|
|
|
// Convert proto -> mongo (operationRef/idempotencyKey are internal; keep empty for now)
|
|
state := CardPayoutStateFromProto(p.clock, pbState)
|
|
|
|
// Preserve CreatedAt + internal keys from existing record if present.
|
|
existing, err := p.findAndMergePayoutState(ctx, state)
|
|
if err != nil {
|
|
p.logger.Warn("Failed to fetch payout state while processing callback",
|
|
zap.Error(err),
|
|
zap.String("payment_ref", state.PaymentRef),
|
|
)
|
|
return http.StatusInternalServerError, err
|
|
}
|
|
if existing != nil {
|
|
// keep failure reason if you want, or override depending on callback semantics
|
|
if state.FailureReason == "" {
|
|
state.FailureReason = existing.FailureReason
|
|
}
|
|
}
|
|
if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled {
|
|
if strings.TrimSpace(state.FailureReason) == "" {
|
|
state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage)
|
|
}
|
|
}
|
|
|
|
if err := p.updatePayoutStatus(ctx, state); err != nil {
|
|
p.logger.Warn("Failed to update payout state while processing callback", zap.Error(err))
|
|
}
|
|
if isFinalStatus(state) {
|
|
p.clearRetryTimer(state.OperationRef)
|
|
}
|
|
monetix.ObserveCallback(statusLabel)
|
|
|
|
p.logger.Info("Monetix payout callback processed",
|
|
zap.String("payment_ref", state.PaymentRef),
|
|
zap.String("status", statusLabel),
|
|
zap.String("provider_code", state.ProviderCode),
|
|
zap.String("provider_message", state.ProviderMessage),
|
|
zap.String("masked_account", cb.Account.Number),
|
|
)
|
|
|
|
return http.StatusOK, nil
|
|
}
|