serial payouts

This commit is contained in:
Stephan D
2026-03-04 10:32:37 +01:00
parent d92be5eedc
commit eb8b7b3402
12 changed files with 450 additions and 3 deletions

View File

@@ -13,6 +13,7 @@ This service now supports Monetix “payout by card”.
- `MONETIX_PROJECT_ID` integer project ID
- `MONETIX_SECRET_KEY` signature secret
- Optional: `allowed_currencies`, `require_customer_address`, `request_timeout_seconds`
- Optional execution mode: `mcards.strict_operation_mode=true` to process only one unresolved payout operation at a time (strict isolated mode)
- Gateway descriptor: `gateway.id`, optional `gateway.currencies`, `gateway.limits` (for per-payout minimum use `gateway.limits.per_tx_min_amount`)
- Callback server: `MNTX_GATEWAY_HTTP_PORT` (exposed as 8084), `http.callback.path`, optional `allowed_cidrs`

View File

@@ -44,6 +44,7 @@ mcards:
request_timeout_seconds: 15
status_success: "success"
status_processing: "processing"
strict_operation_mode: false
gateway:
id: "mcards"

View File

@@ -44,6 +44,7 @@ mcards:
request_timeout_seconds: 15
status_success: "success"
status_processing: "processing"
strict_operation_mode: true
gateway:
id: "mcards"

View File

@@ -59,6 +59,7 @@ type monetixConfig struct {
RequestTimeoutSeconds int `yaml:"request_timeout_seconds"`
StatusSuccess string `yaml:"status_success"`
StatusProcessing string `yaml:"status_processing"`
StrictOperationMode bool `yaml:"strict_operation_mode"`
}
type gatewayConfig struct {
@@ -167,6 +168,7 @@ func (i *Imp) Start() error {
zap.Duration("request_timeout", monetixCfg.RequestTimeout),
zap.String("status_success", monetixCfg.SuccessStatus()),
zap.String("status_processing", monetixCfg.ProcessingStatus()),
zap.Bool("strict_operation_mode", cfg.Monetix.StrictOperationMode),
)
gatewayDescriptor := resolveGatewayDescriptor(cfg.Gateway, monetixCfg)
@@ -196,6 +198,7 @@ func (i *Imp) Start() error {
mntxservice.WithDiscoveryInvokeURI(invokeURI),
mntxservice.WithProducer(producer),
mntxservice.WithMonetixConfig(monetixCfg),
mntxservice.WithStrictOperationIsolation(cfg.Monetix.StrictOperationMode),
mntxservice.WithGatewayDescriptor(gatewayDescriptor),
mntxservice.WithHTTPClient(&http.Client{Timeout: monetixCfg.Timeout()}),
mntxservice.WithStorage(repo),

View File

@@ -47,9 +47,11 @@ type cardPayoutProcessor struct {
perTxMinAmountMinorByCurrency map[string]int64
dispatchThrottleInterval time.Duration
dispatchMaxAttempts uint32
executionMode payoutExecutionMode
dispatchMu sync.Mutex
nextDispatchAllowed time.Time
dispatchSerialGate chan struct{}
retryPolicy payoutFailurePolicy
retryDelayFn func(attempt uint32) time.Duration
@@ -189,6 +191,8 @@ func newCardPayoutProcessor(
producer: producer,
dispatchThrottleInterval: defaultDispatchThrottleInterval,
dispatchMaxAttempts: defaultMaxDispatchAttempts,
executionMode: newDefaultPayoutExecutionMode(),
dispatchSerialGate: make(chan struct{}, 1),
retryPolicy: defaultPayoutFailurePolicy(),
retryDelayFn: retryDelayDuration,
retryTimers: map[string]*time.Timer{},
@@ -210,9 +214,28 @@ func (p *cardPayoutProcessor) applyGatewayDescriptor(descriptor *gatewayv1.Gatew
p.dispatchThrottleInterval = dispatchThrottleIntervalFromDescriptor(descriptor, defaultDispatchThrottleInterval)
p.logger.Info("Configured payout dispatch throttle",
zap.Duration("dispatch_interval", p.dispatchThrottleInterval),
zap.Bool("sequential_dispatch", p.dispatchSerialGate != nil),
zap.String("execution_mode", payoutExecutionModeName(p.executionMode)),
)
}
func (p *cardPayoutProcessor) setExecutionMode(mode payoutExecutionMode) {
if p == nil {
return
}
p.executionMode = normalizePayoutExecutionMode(mode)
}
func (p *cardPayoutProcessor) observeExecutionState(state *model.CardPayout) {
if p == nil || state == nil {
return
}
if p.executionMode == nil {
return
}
p.executionMode.OnPersistedState(state.OperationRef, state.Status)
}
func perTxMinAmountPolicy(descriptor *gatewayv1.GatewayInstanceDescriptor) (int64, map[string]int64) {
if descriptor == nil || descriptor.GetLimits() == nil {
return 0, nil
@@ -359,10 +382,33 @@ func (p *cardPayoutProcessor) waitDispatchSlot(ctx context.Context) error {
}
}
func (p *cardPayoutProcessor) acquireDispatchExecution(ctx context.Context) (func(), error) {
if p == nil {
return nil, merrors.Internal("card payout processor not initialised")
}
if ctx == nil {
ctx = context.Background()
}
if p.dispatchSerialGate == nil {
return func() {}, nil
}
select {
case p.dispatchSerialGate <- struct{}{}:
return func() {
<-p.dispatchSerialGate
}, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (p *cardPayoutProcessor) stopRetries() {
if p == nil {
return
}
if p.executionMode != nil {
p.executionMode.Shutdown()
}
if p.retryStop != nil {
p.retryStop()
}
@@ -591,10 +637,20 @@ func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv
if req == nil {
return nil, merrors.InvalidArgument("card payout request is required")
}
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if mode := p.executionMode; mode != nil {
if err := mode.BeforeDispatch(ctx, opRef); err != nil {
return nil, err
}
}
release, err := p.acquireDispatchExecution(ctx)
if err != nil {
return nil, err
}
defer release()
if err := p.waitDispatchSlot(ctx); err != nil {
return nil, err
}
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
attempt := p.incrementDispatchAttempt(opRef)
p.logger.Info("Dispatching card payout attempt",
zap.String("operation_ref", opRef),
@@ -612,10 +668,20 @@ func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req *
if req == nil {
return nil, merrors.InvalidArgument("card token payout request is required")
}
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
if mode := p.executionMode; mode != nil {
if err := mode.BeforeDispatch(ctx, opRef); err != nil {
return nil, err
}
}
release, err := p.acquireDispatchExecution(ctx)
if err != nil {
return nil, err
}
defer release()
if err := p.waitDispatchSlot(ctx); err != nil {
return nil, err
}
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
attempt := p.incrementDispatchAttempt(opRef)
p.logger.Info("Dispatching card token payout attempt",
zap.String("operation_ref", opRef),
@@ -1002,6 +1068,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
if existing != nil {
switch existing.Status {
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
p.observeExecutionState(existing)
return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
}
}
@@ -1165,6 +1232,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
if existing != nil {
switch existing.Status {
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
p.observeExecutionState(existing)
return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
}
}

View File

@@ -318,6 +318,114 @@ func TestCardPayoutProcessor_Submit_SameParentDifferentOperationsStoredSeparatel
}
}
func TestCardPayoutProcessor_StrictMode_BlocksSecondOperationUntilFirstFinalCallback(t *testing.T) {
cfg := monetix.Config{
BaseURL: "https://monetix.test",
SecretKey: "secret",
ProjectID: 99,
StatusSuccess: "success",
StatusProcessing: "processing",
AllowedCurrencies: []string{"RUB"},
}
repo := newMockRepository()
var callN atomic.Int32
httpClient := &http.Client{
Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) {
n := callN.Add(1)
resp := monetix.APIResponse{}
resp.Operation.RequestID = fmt.Sprintf("req-%d", n)
body, _ := json.Marshal(resp)
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(bytes.NewReader(body)),
Header: http.Header{"Content-Type": []string{"application/json"}},
}, nil
}),
}
processor := newCardPayoutProcessor(
zap.NewNop(),
cfg,
staticClock{now: time.Date(2026, 3, 4, 2, 3, 4, 0, time.UTC)},
repo,
httpClient,
nil,
)
defer processor.stopRetries()
processor.dispatchThrottleInterval = 0
processor.setExecutionMode(newStrictIsolatedPayoutExecutionMode())
req1 := validCardPayoutRequest()
req1.PayoutId = ""
req1.OperationRef = "op-strict-1"
req1.ParentPaymentRef = "payment-strict-1"
req1.IdempotencyKey = "idem-strict-1"
req1.CardPan = "2204310000002456"
req2 := validCardPayoutRequest()
req2.PayoutId = ""
req2.OperationRef = "op-strict-2"
req2.ParentPaymentRef = "payment-strict-2"
req2.IdempotencyKey = "idem-strict-2"
req2.CardPan = "2204320000009754"
if _, err := processor.Submit(context.Background(), req1); err != nil {
t.Fatalf("first submit failed: %v", err)
}
secondDone := make(chan error, 1)
go func() {
_, err := processor.Submit(context.Background(), req2)
secondDone <- err
}()
select {
case err := <-secondDone:
t.Fatalf("second submit should block before first operation is final, err=%v", err)
case <-time.After(120 * time.Millisecond):
}
cb := baseCallback()
cb.Payment.ID = req1.GetOperationRef()
cb.Payment.Status = "success"
cb.Operation.Status = "success"
cb.Operation.Code = "0"
cb.Operation.Message = "Success"
cb.Payment.Sum.Currency = "RUB"
sig, err := monetix.SignPayload(cb, cfg.SecretKey)
if err != nil {
t.Fatalf("failed to sign callback: %v", err)
}
cb.Signature = sig
payload, err := json.Marshal(cb)
if err != nil {
t.Fatalf("failed to marshal callback: %v", err)
}
status, err := processor.ProcessCallback(context.Background(), payload)
if err != nil {
t.Fatalf("callback failed: %v", err)
}
if status != http.StatusOK {
t.Fatalf("unexpected callback status: %d", status)
}
select {
case err := <-secondDone:
if err != nil {
t.Fatalf("second submit returned error: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for second submit to unblock")
}
if got, want := callN.Load(), int32(2); got != want {
t.Fatalf("unexpected provider call count: got=%d want=%d", got, want)
}
}
func TestCardPayoutProcessor_ProcessCallback_UpdatesMatchingOperationWithinSameParent(t *testing.T) {
cfg := monetix.Config{
SecretKey: "secret",

View File

@@ -77,3 +77,10 @@ func WithMessagingSettings(settings pmodel.SettingsT) Option {
}
}
}
// WithStrictOperationIsolation serialises payout processing to one unresolved operation at a time.
func WithStrictOperationIsolation(enabled bool) Option {
return func(s *Service) {
s.strictIsolation = enabled
}
}

View File

@@ -0,0 +1,168 @@
package gateway
import (
"context"
"errors"
"strings"
"sync"
"github.com/tech/sendico/gateway/mntx/storage/model"
)
const (
payoutExecutionModeDefaultName = "default"
payoutExecutionModeStrictIsolatedName = "strict_isolated"
)
var errPayoutExecutionModeStopped = errors.New("payout execution mode stopped")
type payoutExecutionMode interface {
Name() string
BeforeDispatch(ctx context.Context, operationRef string) error
OnPersistedState(operationRef string, status model.PayoutStatus)
Shutdown()
}
type defaultPayoutExecutionMode struct{}
func newDefaultPayoutExecutionMode() payoutExecutionMode {
return &defaultPayoutExecutionMode{}
}
func (m *defaultPayoutExecutionMode) Name() string {
return payoutExecutionModeDefaultName
}
func (m *defaultPayoutExecutionMode) BeforeDispatch(_ context.Context, _ string) error {
return nil
}
func (m *defaultPayoutExecutionMode) OnPersistedState(_ string, _ model.PayoutStatus) {}
func (m *defaultPayoutExecutionMode) Shutdown() {}
type strictIsolatedPayoutExecutionMode struct {
mu sync.Mutex
activeOperation string
waitCh chan struct{}
stopped bool
}
func newStrictIsolatedPayoutExecutionMode() payoutExecutionMode {
return &strictIsolatedPayoutExecutionMode{
waitCh: make(chan struct{}),
}
}
func (m *strictIsolatedPayoutExecutionMode) Name() string {
return payoutExecutionModeStrictIsolatedName
}
func (m *strictIsolatedPayoutExecutionMode) BeforeDispatch(ctx context.Context, operationRef string) error {
opRef := strings.TrimSpace(operationRef)
if opRef == "" {
return nil
}
if ctx == nil {
ctx = context.Background()
}
for {
waitCh, allowed, err := m.tryAcquire(opRef)
if allowed {
return nil
}
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-waitCh:
}
}
}
func (m *strictIsolatedPayoutExecutionMode) OnPersistedState(operationRef string, status model.PayoutStatus) {
opRef := strings.TrimSpace(operationRef)
if opRef == "" {
return
}
m.mu.Lock()
defer m.mu.Unlock()
if m.stopped {
return
}
if isFinalPayoutStatus(status) {
if m.activeOperation == opRef {
m.activeOperation = ""
m.signalLocked()
}
return
}
if m.activeOperation == "" {
m.activeOperation = opRef
m.signalLocked()
}
}
func (m *strictIsolatedPayoutExecutionMode) Shutdown() {
m.mu.Lock()
defer m.mu.Unlock()
if m.stopped {
return
}
m.stopped = true
m.activeOperation = ""
m.signalLocked()
}
func (m *strictIsolatedPayoutExecutionMode) tryAcquire(operationRef string) (<-chan struct{}, bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.stopped {
return nil, false, errPayoutExecutionModeStopped
}
switch owner := strings.TrimSpace(m.activeOperation); {
case owner == "":
m.activeOperation = operationRef
m.signalLocked()
return nil, true, nil
case owner == operationRef:
return nil, true, nil
default:
return m.waitCh, false, nil
}
}
func (m *strictIsolatedPayoutExecutionMode) signalLocked() {
if m.waitCh == nil {
m.waitCh = make(chan struct{})
return
}
close(m.waitCh)
m.waitCh = make(chan struct{})
}
func normalizePayoutExecutionMode(mode payoutExecutionMode) payoutExecutionMode {
if mode == nil {
return newDefaultPayoutExecutionMode()
}
return mode
}
func payoutExecutionModeName(mode payoutExecutionMode) string {
if mode == nil {
return payoutExecutionModeDefaultName
}
name := strings.TrimSpace(mode.Name())
if name == "" {
return payoutExecutionModeDefaultName
}
return name
}

View File

@@ -0,0 +1,58 @@
package gateway
import (
"context"
"testing"
"time"
"github.com/tech/sendico/gateway/mntx/storage/model"
)
func TestStrictIsolatedPayoutExecutionMode_BlocksOtherOperationUntilFinalStatus(t *testing.T) {
mode := newStrictIsolatedPayoutExecutionMode()
if err := mode.BeforeDispatch(context.Background(), "op-1"); err != nil {
t.Fatalf("first acquire failed: %v", err)
}
waitCtx, waitCancel := context.WithTimeout(context.Background(), time.Second)
defer waitCancel()
secondDone := make(chan error, 1)
go func() {
secondDone <- mode.BeforeDispatch(waitCtx, "op-2")
}()
select {
case err := <-secondDone:
t.Fatalf("second operation should be blocked before final status, got err=%v", err)
case <-time.After(80 * time.Millisecond):
}
mode.OnPersistedState("op-1", model.PayoutStatusWaiting)
select {
case err := <-secondDone:
t.Fatalf("second operation should remain blocked on non-final status, got err=%v", err)
case <-time.After(80 * time.Millisecond):
}
mode.OnPersistedState("op-1", model.PayoutStatusSuccess)
select {
case err := <-secondDone:
if err != nil {
t.Fatalf("second operation should proceed after final status, got err=%v", err)
}
case <-time.After(time.Second):
t.Fatalf("timeout waiting for second operation to proceed")
}
}
func TestStrictIsolatedPayoutExecutionMode_AllowsSameOperationReentry(t *testing.T) {
mode := newStrictIsolatedPayoutExecutionMode()
if err := mode.BeforeDispatch(context.Background(), "op-1"); err != nil {
t.Fatalf("first acquire failed: %v", err)
}
if err := mode.BeforeDispatch(context.Background(), "op-1"); err != nil {
t.Fatalf("same operation should be re-entrant, got err=%v", err)
}
}

View File

@@ -36,6 +36,7 @@ type Service struct {
gatewayDescriptor *gatewayv1.GatewayInstanceDescriptor
announcer *discovery.Announcer
invokeURI string
strictIsolation bool
connectorv1.UnimplementedConnectorServiceServer
}
@@ -90,6 +91,9 @@ func NewService(logger mlogger.Logger, opts ...Option) *Service {
}
svc.card = newCardPayoutProcessor(svc.logger, svc.config, svc.clock, svc.storage, svc.httpClient, svc.producer)
if svc.strictIsolation {
svc.card.setExecutionMode(newStrictIsolatedPayoutExecutionMode())
}
svc.card.outbox = &svc.outbox
svc.card.msgCfg = svc.msgCfg
if err := svc.card.startOutboxReliableProducer(); err != nil {

View File

@@ -0,0 +1,19 @@
package gateway
import (
"testing"
"go.uber.org/zap"
)
func TestNewService_StrictOperationIsolationOption(t *testing.T) {
svc := NewService(zap.NewNop(), WithStrictOperationIsolation(true))
t.Cleanup(svc.Shutdown)
if svc.card == nil {
t.Fatalf("expected card processor to be initialised")
}
if got, want := payoutExecutionModeName(svc.card.executionMode), payoutExecutionModeStrictIsolatedName; got != want {
t.Fatalf("execution mode mismatch: got=%q want=%q", got, want)
}
}

View File

@@ -16,7 +16,14 @@ import (
)
func isFinalStatus(t *model.CardPayout) bool {
switch t.Status {
if t == nil {
return false
}
return isFinalPayoutStatus(t.Status)
}
func isFinalPayoutStatus(status model.PayoutStatus) bool {
switch status {
case model.PayoutStatusFailed, model.PayoutStatusSuccess, model.PayoutStatusCancelled:
return true
default:
@@ -45,6 +52,7 @@ func (p *cardPayoutProcessor) updatePayoutStatus(ctx context.Context, state *mod
)
return err
}
p.observeExecutionState(state)
return nil
}
@@ -65,6 +73,7 @@ func (p *cardPayoutProcessor) updatePayoutStatus(ctx context.Context, state *mod
)
return err
}
p.observeExecutionState(state)
return nil
}