From eb8b7b34020e7289acc96a790b09d6c0e9f8fbdb Mon Sep 17 00:00:00 2001 From: Stephan D Date: Wed, 4 Mar 2026 10:32:37 +0100 Subject: [PATCH] serial payouts --- api/gateway/mntx/README.md | 1 + api/gateway/mntx/config.dev.yml | 1 + api/gateway/mntx/config.yml | 1 + .../internal/server/internal/serverimp.go | 3 + .../service/gateway/card_processor.go | 72 +++++++- .../service/gateway/card_processor_test.go | 108 +++++++++++ .../mntx/internal/service/gateway/options.go | 7 + .../service/gateway/payout_execution_mode.go | 168 ++++++++++++++++++ .../gateway/payout_execution_mode_test.go | 58 ++++++ .../mntx/internal/service/gateway/service.go | 4 + .../internal/service/gateway/service_test.go | 19 ++ .../service/gateway/transfer_notifications.go | 11 +- 12 files changed, 450 insertions(+), 3 deletions(-) create mode 100644 api/gateway/mntx/internal/service/gateway/payout_execution_mode.go create mode 100644 api/gateway/mntx/internal/service/gateway/payout_execution_mode_test.go create mode 100644 api/gateway/mntx/internal/service/gateway/service_test.go diff --git a/api/gateway/mntx/README.md b/api/gateway/mntx/README.md index bda3af63..92f4f131 100644 --- a/api/gateway/mntx/README.md +++ b/api/gateway/mntx/README.md @@ -13,6 +13,7 @@ This service now supports Monetix “payout by card”. - `MONETIX_PROJECT_ID` – integer project ID - `MONETIX_SECRET_KEY` – signature secret - Optional: `allowed_currencies`, `require_customer_address`, `request_timeout_seconds` +- Optional execution mode: `mcards.strict_operation_mode=true` to process only one unresolved payout operation at a time (strict isolated mode) - Gateway descriptor: `gateway.id`, optional `gateway.currencies`, `gateway.limits` (for per-payout minimum use `gateway.limits.per_tx_min_amount`) - Callback server: `MNTX_GATEWAY_HTTP_PORT` (exposed as 8084), `http.callback.path`, optional `allowed_cidrs` diff --git a/api/gateway/mntx/config.dev.yml b/api/gateway/mntx/config.dev.yml index 0bc18a93..e5b2b5ed 100644 --- a/api/gateway/mntx/config.dev.yml +++ b/api/gateway/mntx/config.dev.yml @@ -44,6 +44,7 @@ mcards: request_timeout_seconds: 15 status_success: "success" status_processing: "processing" + strict_operation_mode: false gateway: id: "mcards" diff --git a/api/gateway/mntx/config.yml b/api/gateway/mntx/config.yml index a9a44ff6..d53bd310 100644 --- a/api/gateway/mntx/config.yml +++ b/api/gateway/mntx/config.yml @@ -44,6 +44,7 @@ mcards: request_timeout_seconds: 15 status_success: "success" status_processing: "processing" + strict_operation_mode: true gateway: id: "mcards" diff --git a/api/gateway/mntx/internal/server/internal/serverimp.go b/api/gateway/mntx/internal/server/internal/serverimp.go index 796cd24a..84498933 100644 --- a/api/gateway/mntx/internal/server/internal/serverimp.go +++ b/api/gateway/mntx/internal/server/internal/serverimp.go @@ -59,6 +59,7 @@ type monetixConfig struct { RequestTimeoutSeconds int `yaml:"request_timeout_seconds"` StatusSuccess string `yaml:"status_success"` StatusProcessing string `yaml:"status_processing"` + StrictOperationMode bool `yaml:"strict_operation_mode"` } type gatewayConfig struct { @@ -167,6 +168,7 @@ func (i *Imp) Start() error { zap.Duration("request_timeout", monetixCfg.RequestTimeout), zap.String("status_success", monetixCfg.SuccessStatus()), zap.String("status_processing", monetixCfg.ProcessingStatus()), + zap.Bool("strict_operation_mode", cfg.Monetix.StrictOperationMode), ) gatewayDescriptor := resolveGatewayDescriptor(cfg.Gateway, monetixCfg) @@ -196,6 +198,7 @@ func (i *Imp) Start() error { mntxservice.WithDiscoveryInvokeURI(invokeURI), mntxservice.WithProducer(producer), mntxservice.WithMonetixConfig(monetixCfg), + mntxservice.WithStrictOperationIsolation(cfg.Monetix.StrictOperationMode), mntxservice.WithGatewayDescriptor(gatewayDescriptor), mntxservice.WithHTTPClient(&http.Client{Timeout: monetixCfg.Timeout()}), mntxservice.WithStorage(repo), diff --git a/api/gateway/mntx/internal/service/gateway/card_processor.go b/api/gateway/mntx/internal/service/gateway/card_processor.go index fe7beae7..6ad8d5c1 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor.go @@ -47,9 +47,11 @@ type cardPayoutProcessor struct { perTxMinAmountMinorByCurrency map[string]int64 dispatchThrottleInterval time.Duration dispatchMaxAttempts uint32 + executionMode payoutExecutionMode dispatchMu sync.Mutex nextDispatchAllowed time.Time + dispatchSerialGate chan struct{} retryPolicy payoutFailurePolicy retryDelayFn func(attempt uint32) time.Duration @@ -189,6 +191,8 @@ func newCardPayoutProcessor( producer: producer, dispatchThrottleInterval: defaultDispatchThrottleInterval, dispatchMaxAttempts: defaultMaxDispatchAttempts, + executionMode: newDefaultPayoutExecutionMode(), + dispatchSerialGate: make(chan struct{}, 1), retryPolicy: defaultPayoutFailurePolicy(), retryDelayFn: retryDelayDuration, retryTimers: map[string]*time.Timer{}, @@ -210,9 +214,28 @@ func (p *cardPayoutProcessor) applyGatewayDescriptor(descriptor *gatewayv1.Gatew p.dispatchThrottleInterval = dispatchThrottleIntervalFromDescriptor(descriptor, defaultDispatchThrottleInterval) p.logger.Info("Configured payout dispatch throttle", zap.Duration("dispatch_interval", p.dispatchThrottleInterval), + zap.Bool("sequential_dispatch", p.dispatchSerialGate != nil), + zap.String("execution_mode", payoutExecutionModeName(p.executionMode)), ) } +func (p *cardPayoutProcessor) setExecutionMode(mode payoutExecutionMode) { + if p == nil { + return + } + p.executionMode = normalizePayoutExecutionMode(mode) +} + +func (p *cardPayoutProcessor) observeExecutionState(state *model.CardPayout) { + if p == nil || state == nil { + return + } + if p.executionMode == nil { + return + } + p.executionMode.OnPersistedState(state.OperationRef, state.Status) +} + func perTxMinAmountPolicy(descriptor *gatewayv1.GatewayInstanceDescriptor) (int64, map[string]int64) { if descriptor == nil || descriptor.GetLimits() == nil { return 0, nil @@ -359,10 +382,33 @@ func (p *cardPayoutProcessor) waitDispatchSlot(ctx context.Context) error { } } +func (p *cardPayoutProcessor) acquireDispatchExecution(ctx context.Context) (func(), error) { + if p == nil { + return nil, merrors.Internal("card payout processor not initialised") + } + if ctx == nil { + ctx = context.Background() + } + if p.dispatchSerialGate == nil { + return func() {}, nil + } + select { + case p.dispatchSerialGate <- struct{}{}: + return func() { + <-p.dispatchSerialGate + }, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + func (p *cardPayoutProcessor) stopRetries() { if p == nil { return } + if p.executionMode != nil { + p.executionMode.Shutdown() + } if p.retryStop != nil { p.retryStop() } @@ -591,10 +637,20 @@ func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv if req == nil { return nil, merrors.InvalidArgument("card payout request is required") } + opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) + if mode := p.executionMode; mode != nil { + if err := mode.BeforeDispatch(ctx, opRef); err != nil { + return nil, err + } + } + release, err := p.acquireDispatchExecution(ctx) + if err != nil { + return nil, err + } + defer release() if err := p.waitDispatchSlot(ctx); err != nil { return nil, err } - opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) attempt := p.incrementDispatchAttempt(opRef) p.logger.Info("Dispatching card payout attempt", zap.String("operation_ref", opRef), @@ -612,10 +668,20 @@ func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req * if req == nil { return nil, merrors.InvalidArgument("card token payout request is required") } + opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) + if mode := p.executionMode; mode != nil { + if err := mode.BeforeDispatch(ctx, opRef); err != nil { + return nil, err + } + } + release, err := p.acquireDispatchExecution(ctx) + if err != nil { + return nil, err + } + defer release() if err := p.waitDispatchSlot(ctx); err != nil { return nil, err } - opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId()) attempt := p.incrementDispatchAttempt(opRef) p.logger.Info("Dispatching card token payout attempt", zap.String("operation_ref", opRef), @@ -1002,6 +1068,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout if existing != nil { switch existing.Status { case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled: + p.observeExecutionState(existing) return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil } } @@ -1165,6 +1232,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT if existing != nil { switch existing.Status { case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled: + p.observeExecutionState(existing) return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil } } diff --git a/api/gateway/mntx/internal/service/gateway/card_processor_test.go b/api/gateway/mntx/internal/service/gateway/card_processor_test.go index 35d9a443..0ba14fa9 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor_test.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor_test.go @@ -318,6 +318,114 @@ func TestCardPayoutProcessor_Submit_SameParentDifferentOperationsStoredSeparatel } } +func TestCardPayoutProcessor_StrictMode_BlocksSecondOperationUntilFirstFinalCallback(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + ProjectID: 99, + StatusSuccess: "success", + StatusProcessing: "processing", + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + var callN atomic.Int32 + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + n := callN.Add(1) + resp := monetix.APIResponse{} + resp.Operation.RequestID = fmt.Sprintf("req-%d", n) + body, _ := json.Marshal(resp) + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2026, 3, 4, 2, 3, 4, 0, time.UTC)}, + repo, + httpClient, + nil, + ) + defer processor.stopRetries() + processor.dispatchThrottleInterval = 0 + processor.setExecutionMode(newStrictIsolatedPayoutExecutionMode()) + + req1 := validCardPayoutRequest() + req1.PayoutId = "" + req1.OperationRef = "op-strict-1" + req1.ParentPaymentRef = "payment-strict-1" + req1.IdempotencyKey = "idem-strict-1" + req1.CardPan = "2204310000002456" + + req2 := validCardPayoutRequest() + req2.PayoutId = "" + req2.OperationRef = "op-strict-2" + req2.ParentPaymentRef = "payment-strict-2" + req2.IdempotencyKey = "idem-strict-2" + req2.CardPan = "2204320000009754" + + if _, err := processor.Submit(context.Background(), req1); err != nil { + t.Fatalf("first submit failed: %v", err) + } + + secondDone := make(chan error, 1) + go func() { + _, err := processor.Submit(context.Background(), req2) + secondDone <- err + }() + + select { + case err := <-secondDone: + t.Fatalf("second submit should block before first operation is final, err=%v", err) + case <-time.After(120 * time.Millisecond): + } + + cb := baseCallback() + cb.Payment.ID = req1.GetOperationRef() + cb.Payment.Status = "success" + cb.Operation.Status = "success" + cb.Operation.Code = "0" + cb.Operation.Message = "Success" + cb.Payment.Sum.Currency = "RUB" + + sig, err := monetix.SignPayload(cb, cfg.SecretKey) + if err != nil { + t.Fatalf("failed to sign callback: %v", err) + } + cb.Signature = sig + payload, err := json.Marshal(cb) + if err != nil { + t.Fatalf("failed to marshal callback: %v", err) + } + + status, err := processor.ProcessCallback(context.Background(), payload) + if err != nil { + t.Fatalf("callback failed: %v", err) + } + if status != http.StatusOK { + t.Fatalf("unexpected callback status: %d", status) + } + + select { + case err := <-secondDone: + if err != nil { + t.Fatalf("second submit returned error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for second submit to unblock") + } + + if got, want := callN.Load(), int32(2); got != want { + t.Fatalf("unexpected provider call count: got=%d want=%d", got, want) + } +} + func TestCardPayoutProcessor_ProcessCallback_UpdatesMatchingOperationWithinSameParent(t *testing.T) { cfg := monetix.Config{ SecretKey: "secret", diff --git a/api/gateway/mntx/internal/service/gateway/options.go b/api/gateway/mntx/internal/service/gateway/options.go index 6465333a..18664abd 100644 --- a/api/gateway/mntx/internal/service/gateway/options.go +++ b/api/gateway/mntx/internal/service/gateway/options.go @@ -77,3 +77,10 @@ func WithMessagingSettings(settings pmodel.SettingsT) Option { } } } + +// WithStrictOperationIsolation serialises payout processing to one unresolved operation at a time. +func WithStrictOperationIsolation(enabled bool) Option { + return func(s *Service) { + s.strictIsolation = enabled + } +} diff --git a/api/gateway/mntx/internal/service/gateway/payout_execution_mode.go b/api/gateway/mntx/internal/service/gateway/payout_execution_mode.go new file mode 100644 index 00000000..c27f9a7e --- /dev/null +++ b/api/gateway/mntx/internal/service/gateway/payout_execution_mode.go @@ -0,0 +1,168 @@ +package gateway + +import ( + "context" + "errors" + "strings" + "sync" + + "github.com/tech/sendico/gateway/mntx/storage/model" +) + +const ( + payoutExecutionModeDefaultName = "default" + payoutExecutionModeStrictIsolatedName = "strict_isolated" +) + +var errPayoutExecutionModeStopped = errors.New("payout execution mode stopped") + +type payoutExecutionMode interface { + Name() string + BeforeDispatch(ctx context.Context, operationRef string) error + OnPersistedState(operationRef string, status model.PayoutStatus) + Shutdown() +} + +type defaultPayoutExecutionMode struct{} + +func newDefaultPayoutExecutionMode() payoutExecutionMode { + return &defaultPayoutExecutionMode{} +} + +func (m *defaultPayoutExecutionMode) Name() string { + return payoutExecutionModeDefaultName +} + +func (m *defaultPayoutExecutionMode) BeforeDispatch(_ context.Context, _ string) error { + return nil +} + +func (m *defaultPayoutExecutionMode) OnPersistedState(_ string, _ model.PayoutStatus) {} + +func (m *defaultPayoutExecutionMode) Shutdown() {} + +type strictIsolatedPayoutExecutionMode struct { + mu sync.Mutex + activeOperation string + waitCh chan struct{} + stopped bool +} + +func newStrictIsolatedPayoutExecutionMode() payoutExecutionMode { + return &strictIsolatedPayoutExecutionMode{ + waitCh: make(chan struct{}), + } +} + +func (m *strictIsolatedPayoutExecutionMode) Name() string { + return payoutExecutionModeStrictIsolatedName +} + +func (m *strictIsolatedPayoutExecutionMode) BeforeDispatch(ctx context.Context, operationRef string) error { + opRef := strings.TrimSpace(operationRef) + if opRef == "" { + return nil + } + if ctx == nil { + ctx = context.Background() + } + + for { + waitCh, allowed, err := m.tryAcquire(opRef) + if allowed { + return nil + } + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-waitCh: + } + } +} + +func (m *strictIsolatedPayoutExecutionMode) OnPersistedState(operationRef string, status model.PayoutStatus) { + opRef := strings.TrimSpace(operationRef) + if opRef == "" { + return + } + + m.mu.Lock() + defer m.mu.Unlock() + if m.stopped { + return + } + + if isFinalPayoutStatus(status) { + if m.activeOperation == opRef { + m.activeOperation = "" + m.signalLocked() + } + return + } + + if m.activeOperation == "" { + m.activeOperation = opRef + m.signalLocked() + } +} + +func (m *strictIsolatedPayoutExecutionMode) Shutdown() { + m.mu.Lock() + defer m.mu.Unlock() + if m.stopped { + return + } + m.stopped = true + m.activeOperation = "" + m.signalLocked() +} + +func (m *strictIsolatedPayoutExecutionMode) tryAcquire(operationRef string) (<-chan struct{}, bool, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.stopped { + return nil, false, errPayoutExecutionModeStopped + } + + switch owner := strings.TrimSpace(m.activeOperation); { + case owner == "": + m.activeOperation = operationRef + m.signalLocked() + return nil, true, nil + case owner == operationRef: + return nil, true, nil + default: + return m.waitCh, false, nil + } +} + +func (m *strictIsolatedPayoutExecutionMode) signalLocked() { + if m.waitCh == nil { + m.waitCh = make(chan struct{}) + return + } + close(m.waitCh) + m.waitCh = make(chan struct{}) +} + +func normalizePayoutExecutionMode(mode payoutExecutionMode) payoutExecutionMode { + if mode == nil { + return newDefaultPayoutExecutionMode() + } + return mode +} + +func payoutExecutionModeName(mode payoutExecutionMode) string { + if mode == nil { + return payoutExecutionModeDefaultName + } + name := strings.TrimSpace(mode.Name()) + if name == "" { + return payoutExecutionModeDefaultName + } + return name +} diff --git a/api/gateway/mntx/internal/service/gateway/payout_execution_mode_test.go b/api/gateway/mntx/internal/service/gateway/payout_execution_mode_test.go new file mode 100644 index 00000000..57a0189a --- /dev/null +++ b/api/gateway/mntx/internal/service/gateway/payout_execution_mode_test.go @@ -0,0 +1,58 @@ +package gateway + +import ( + "context" + "testing" + "time" + + "github.com/tech/sendico/gateway/mntx/storage/model" +) + +func TestStrictIsolatedPayoutExecutionMode_BlocksOtherOperationUntilFinalStatus(t *testing.T) { + mode := newStrictIsolatedPayoutExecutionMode() + if err := mode.BeforeDispatch(context.Background(), "op-1"); err != nil { + t.Fatalf("first acquire failed: %v", err) + } + + waitCtx, waitCancel := context.WithTimeout(context.Background(), time.Second) + defer waitCancel() + secondDone := make(chan error, 1) + go func() { + secondDone <- mode.BeforeDispatch(waitCtx, "op-2") + }() + + select { + case err := <-secondDone: + t.Fatalf("second operation should be blocked before final status, got err=%v", err) + case <-time.After(80 * time.Millisecond): + } + + mode.OnPersistedState("op-1", model.PayoutStatusWaiting) + + select { + case err := <-secondDone: + t.Fatalf("second operation should remain blocked on non-final status, got err=%v", err) + case <-time.After(80 * time.Millisecond): + } + + mode.OnPersistedState("op-1", model.PayoutStatusSuccess) + + select { + case err := <-secondDone: + if err != nil { + t.Fatalf("second operation should proceed after final status, got err=%v", err) + } + case <-time.After(time.Second): + t.Fatalf("timeout waiting for second operation to proceed") + } +} + +func TestStrictIsolatedPayoutExecutionMode_AllowsSameOperationReentry(t *testing.T) { + mode := newStrictIsolatedPayoutExecutionMode() + if err := mode.BeforeDispatch(context.Background(), "op-1"); err != nil { + t.Fatalf("first acquire failed: %v", err) + } + if err := mode.BeforeDispatch(context.Background(), "op-1"); err != nil { + t.Fatalf("same operation should be re-entrant, got err=%v", err) + } +} diff --git a/api/gateway/mntx/internal/service/gateway/service.go b/api/gateway/mntx/internal/service/gateway/service.go index 5a7e1b04..e9eced5f 100644 --- a/api/gateway/mntx/internal/service/gateway/service.go +++ b/api/gateway/mntx/internal/service/gateway/service.go @@ -36,6 +36,7 @@ type Service struct { gatewayDescriptor *gatewayv1.GatewayInstanceDescriptor announcer *discovery.Announcer invokeURI string + strictIsolation bool connectorv1.UnimplementedConnectorServiceServer } @@ -90,6 +91,9 @@ func NewService(logger mlogger.Logger, opts ...Option) *Service { } svc.card = newCardPayoutProcessor(svc.logger, svc.config, svc.clock, svc.storage, svc.httpClient, svc.producer) + if svc.strictIsolation { + svc.card.setExecutionMode(newStrictIsolatedPayoutExecutionMode()) + } svc.card.outbox = &svc.outbox svc.card.msgCfg = svc.msgCfg if err := svc.card.startOutboxReliableProducer(); err != nil { diff --git a/api/gateway/mntx/internal/service/gateway/service_test.go b/api/gateway/mntx/internal/service/gateway/service_test.go new file mode 100644 index 00000000..f470dc1e --- /dev/null +++ b/api/gateway/mntx/internal/service/gateway/service_test.go @@ -0,0 +1,19 @@ +package gateway + +import ( + "testing" + + "go.uber.org/zap" +) + +func TestNewService_StrictOperationIsolationOption(t *testing.T) { + svc := NewService(zap.NewNop(), WithStrictOperationIsolation(true)) + t.Cleanup(svc.Shutdown) + + if svc.card == nil { + t.Fatalf("expected card processor to be initialised") + } + if got, want := payoutExecutionModeName(svc.card.executionMode), payoutExecutionModeStrictIsolatedName; got != want { + t.Fatalf("execution mode mismatch: got=%q want=%q", got, want) + } +} diff --git a/api/gateway/mntx/internal/service/gateway/transfer_notifications.go b/api/gateway/mntx/internal/service/gateway/transfer_notifications.go index cb8ba4b6..fbe53201 100644 --- a/api/gateway/mntx/internal/service/gateway/transfer_notifications.go +++ b/api/gateway/mntx/internal/service/gateway/transfer_notifications.go @@ -16,7 +16,14 @@ import ( ) func isFinalStatus(t *model.CardPayout) bool { - switch t.Status { + if t == nil { + return false + } + return isFinalPayoutStatus(t.Status) +} + +func isFinalPayoutStatus(status model.PayoutStatus) bool { + switch status { case model.PayoutStatusFailed, model.PayoutStatusSuccess, model.PayoutStatusCancelled: return true default: @@ -45,6 +52,7 @@ func (p *cardPayoutProcessor) updatePayoutStatus(ctx context.Context, state *mod ) return err } + p.observeExecutionState(state) return nil } @@ -65,6 +73,7 @@ func (p *cardPayoutProcessor) updatePayoutStatus(ctx context.Context, state *mod ) return err } + p.observeExecutionState(state) return nil } -- 2.49.1