Merge pull request 'serial payouts' (#632) from mntx-627 into main
Some checks failed
ci/woodpecker/push/gateway_mntx Pipeline failed
Some checks failed
ci/woodpecker/push/gateway_mntx Pipeline failed
Reviewed-on: #632
This commit was merged in pull request #632.
This commit is contained in:
@@ -13,6 +13,7 @@ This service now supports Monetix “payout by card”.
|
|||||||
- `MONETIX_PROJECT_ID` – integer project ID
|
- `MONETIX_PROJECT_ID` – integer project ID
|
||||||
- `MONETIX_SECRET_KEY` – signature secret
|
- `MONETIX_SECRET_KEY` – signature secret
|
||||||
- Optional: `allowed_currencies`, `require_customer_address`, `request_timeout_seconds`
|
- Optional: `allowed_currencies`, `require_customer_address`, `request_timeout_seconds`
|
||||||
|
- Optional execution mode: `mcards.strict_operation_mode=true` to process only one unresolved payout operation at a time (strict isolated mode)
|
||||||
- Gateway descriptor: `gateway.id`, optional `gateway.currencies`, `gateway.limits` (for per-payout minimum use `gateway.limits.per_tx_min_amount`)
|
- Gateway descriptor: `gateway.id`, optional `gateway.currencies`, `gateway.limits` (for per-payout minimum use `gateway.limits.per_tx_min_amount`)
|
||||||
- Callback server: `MNTX_GATEWAY_HTTP_PORT` (exposed as 8084), `http.callback.path`, optional `allowed_cidrs`
|
- Callback server: `MNTX_GATEWAY_HTTP_PORT` (exposed as 8084), `http.callback.path`, optional `allowed_cidrs`
|
||||||
|
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ mcards:
|
|||||||
request_timeout_seconds: 15
|
request_timeout_seconds: 15
|
||||||
status_success: "success"
|
status_success: "success"
|
||||||
status_processing: "processing"
|
status_processing: "processing"
|
||||||
|
strict_operation_mode: false
|
||||||
|
|
||||||
gateway:
|
gateway:
|
||||||
id: "mcards"
|
id: "mcards"
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ mcards:
|
|||||||
request_timeout_seconds: 15
|
request_timeout_seconds: 15
|
||||||
status_success: "success"
|
status_success: "success"
|
||||||
status_processing: "processing"
|
status_processing: "processing"
|
||||||
|
strict_operation_mode: true
|
||||||
|
|
||||||
gateway:
|
gateway:
|
||||||
id: "mcards"
|
id: "mcards"
|
||||||
|
|||||||
@@ -59,6 +59,7 @@ type monetixConfig struct {
|
|||||||
RequestTimeoutSeconds int `yaml:"request_timeout_seconds"`
|
RequestTimeoutSeconds int `yaml:"request_timeout_seconds"`
|
||||||
StatusSuccess string `yaml:"status_success"`
|
StatusSuccess string `yaml:"status_success"`
|
||||||
StatusProcessing string `yaml:"status_processing"`
|
StatusProcessing string `yaml:"status_processing"`
|
||||||
|
StrictOperationMode bool `yaml:"strict_operation_mode"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type gatewayConfig struct {
|
type gatewayConfig struct {
|
||||||
@@ -167,6 +168,7 @@ func (i *Imp) Start() error {
|
|||||||
zap.Duration("request_timeout", monetixCfg.RequestTimeout),
|
zap.Duration("request_timeout", monetixCfg.RequestTimeout),
|
||||||
zap.String("status_success", monetixCfg.SuccessStatus()),
|
zap.String("status_success", monetixCfg.SuccessStatus()),
|
||||||
zap.String("status_processing", monetixCfg.ProcessingStatus()),
|
zap.String("status_processing", monetixCfg.ProcessingStatus()),
|
||||||
|
zap.Bool("strict_operation_mode", cfg.Monetix.StrictOperationMode),
|
||||||
)
|
)
|
||||||
|
|
||||||
gatewayDescriptor := resolveGatewayDescriptor(cfg.Gateway, monetixCfg)
|
gatewayDescriptor := resolveGatewayDescriptor(cfg.Gateway, monetixCfg)
|
||||||
@@ -196,6 +198,7 @@ func (i *Imp) Start() error {
|
|||||||
mntxservice.WithDiscoveryInvokeURI(invokeURI),
|
mntxservice.WithDiscoveryInvokeURI(invokeURI),
|
||||||
mntxservice.WithProducer(producer),
|
mntxservice.WithProducer(producer),
|
||||||
mntxservice.WithMonetixConfig(monetixCfg),
|
mntxservice.WithMonetixConfig(monetixCfg),
|
||||||
|
mntxservice.WithStrictOperationIsolation(cfg.Monetix.StrictOperationMode),
|
||||||
mntxservice.WithGatewayDescriptor(gatewayDescriptor),
|
mntxservice.WithGatewayDescriptor(gatewayDescriptor),
|
||||||
mntxservice.WithHTTPClient(&http.Client{Timeout: monetixCfg.Timeout()}),
|
mntxservice.WithHTTPClient(&http.Client{Timeout: monetixCfg.Timeout()}),
|
||||||
mntxservice.WithStorage(repo),
|
mntxservice.WithStorage(repo),
|
||||||
|
|||||||
@@ -47,9 +47,11 @@ type cardPayoutProcessor struct {
|
|||||||
perTxMinAmountMinorByCurrency map[string]int64
|
perTxMinAmountMinorByCurrency map[string]int64
|
||||||
dispatchThrottleInterval time.Duration
|
dispatchThrottleInterval time.Duration
|
||||||
dispatchMaxAttempts uint32
|
dispatchMaxAttempts uint32
|
||||||
|
executionMode payoutExecutionMode
|
||||||
|
|
||||||
dispatchMu sync.Mutex
|
dispatchMu sync.Mutex
|
||||||
nextDispatchAllowed time.Time
|
nextDispatchAllowed time.Time
|
||||||
|
dispatchSerialGate chan struct{}
|
||||||
|
|
||||||
retryPolicy payoutFailurePolicy
|
retryPolicy payoutFailurePolicy
|
||||||
retryDelayFn func(attempt uint32) time.Duration
|
retryDelayFn func(attempt uint32) time.Duration
|
||||||
@@ -189,6 +191,8 @@ func newCardPayoutProcessor(
|
|||||||
producer: producer,
|
producer: producer,
|
||||||
dispatchThrottleInterval: defaultDispatchThrottleInterval,
|
dispatchThrottleInterval: defaultDispatchThrottleInterval,
|
||||||
dispatchMaxAttempts: defaultMaxDispatchAttempts,
|
dispatchMaxAttempts: defaultMaxDispatchAttempts,
|
||||||
|
executionMode: newDefaultPayoutExecutionMode(),
|
||||||
|
dispatchSerialGate: make(chan struct{}, 1),
|
||||||
retryPolicy: defaultPayoutFailurePolicy(),
|
retryPolicy: defaultPayoutFailurePolicy(),
|
||||||
retryDelayFn: retryDelayDuration,
|
retryDelayFn: retryDelayDuration,
|
||||||
retryTimers: map[string]*time.Timer{},
|
retryTimers: map[string]*time.Timer{},
|
||||||
@@ -210,9 +214,28 @@ func (p *cardPayoutProcessor) applyGatewayDescriptor(descriptor *gatewayv1.Gatew
|
|||||||
p.dispatchThrottleInterval = dispatchThrottleIntervalFromDescriptor(descriptor, defaultDispatchThrottleInterval)
|
p.dispatchThrottleInterval = dispatchThrottleIntervalFromDescriptor(descriptor, defaultDispatchThrottleInterval)
|
||||||
p.logger.Info("Configured payout dispatch throttle",
|
p.logger.Info("Configured payout dispatch throttle",
|
||||||
zap.Duration("dispatch_interval", p.dispatchThrottleInterval),
|
zap.Duration("dispatch_interval", p.dispatchThrottleInterval),
|
||||||
|
zap.Bool("sequential_dispatch", p.dispatchSerialGate != nil),
|
||||||
|
zap.String("execution_mode", payoutExecutionModeName(p.executionMode)),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) setExecutionMode(mode payoutExecutionMode) {
|
||||||
|
if p == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.executionMode = normalizePayoutExecutionMode(mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) observeExecutionState(state *model.CardPayout) {
|
||||||
|
if p == nil || state == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if p.executionMode == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.executionMode.OnPersistedState(state.OperationRef, state.Status)
|
||||||
|
}
|
||||||
|
|
||||||
func perTxMinAmountPolicy(descriptor *gatewayv1.GatewayInstanceDescriptor) (int64, map[string]int64) {
|
func perTxMinAmountPolicy(descriptor *gatewayv1.GatewayInstanceDescriptor) (int64, map[string]int64) {
|
||||||
if descriptor == nil || descriptor.GetLimits() == nil {
|
if descriptor == nil || descriptor.GetLimits() == nil {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
@@ -359,10 +382,33 @@ func (p *cardPayoutProcessor) waitDispatchSlot(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *cardPayoutProcessor) acquireDispatchExecution(ctx context.Context) (func(), error) {
|
||||||
|
if p == nil {
|
||||||
|
return nil, merrors.Internal("card payout processor not initialised")
|
||||||
|
}
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.Background()
|
||||||
|
}
|
||||||
|
if p.dispatchSerialGate == nil {
|
||||||
|
return func() {}, nil
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case p.dispatchSerialGate <- struct{}{}:
|
||||||
|
return func() {
|
||||||
|
<-p.dispatchSerialGate
|
||||||
|
}, nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *cardPayoutProcessor) stopRetries() {
|
func (p *cardPayoutProcessor) stopRetries() {
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if p.executionMode != nil {
|
||||||
|
p.executionMode.Shutdown()
|
||||||
|
}
|
||||||
if p.retryStop != nil {
|
if p.retryStop != nil {
|
||||||
p.retryStop()
|
p.retryStop()
|
||||||
}
|
}
|
||||||
@@ -591,10 +637,20 @@ func (p *cardPayoutProcessor) dispatchCardPayout(ctx context.Context, req *mntxv
|
|||||||
if req == nil {
|
if req == nil {
|
||||||
return nil, merrors.InvalidArgument("card payout request is required")
|
return nil, merrors.InvalidArgument("card payout request is required")
|
||||||
}
|
}
|
||||||
|
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||||
|
if mode := p.executionMode; mode != nil {
|
||||||
|
if err := mode.BeforeDispatch(ctx, opRef); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
release, err := p.acquireDispatchExecution(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
if err := p.waitDispatchSlot(ctx); err != nil {
|
if err := p.waitDispatchSlot(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
|
||||||
attempt := p.incrementDispatchAttempt(opRef)
|
attempt := p.incrementDispatchAttempt(opRef)
|
||||||
p.logger.Info("Dispatching card payout attempt",
|
p.logger.Info("Dispatching card payout attempt",
|
||||||
zap.String("operation_ref", opRef),
|
zap.String("operation_ref", opRef),
|
||||||
@@ -612,10 +668,20 @@ func (p *cardPayoutProcessor) dispatchCardTokenPayout(ctx context.Context, req *
|
|||||||
if req == nil {
|
if req == nil {
|
||||||
return nil, merrors.InvalidArgument("card token payout request is required")
|
return nil, merrors.InvalidArgument("card token payout request is required")
|
||||||
}
|
}
|
||||||
|
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
||||||
|
if mode := p.executionMode; mode != nil {
|
||||||
|
if err := mode.BeforeDispatch(ctx, opRef); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
release, err := p.acquireDispatchExecution(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer release()
|
||||||
if err := p.waitDispatchSlot(ctx); err != nil {
|
if err := p.waitDispatchSlot(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
opRef := findOperationRef(req.GetOperationRef(), req.GetPayoutId())
|
|
||||||
attempt := p.incrementDispatchAttempt(opRef)
|
attempt := p.incrementDispatchAttempt(opRef)
|
||||||
p.logger.Info("Dispatching card token payout attempt",
|
p.logger.Info("Dispatching card token payout attempt",
|
||||||
zap.String("operation_ref", opRef),
|
zap.String("operation_ref", opRef),
|
||||||
@@ -1002,6 +1068,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout
|
|||||||
if existing != nil {
|
if existing != nil {
|
||||||
switch existing.Status {
|
switch existing.Status {
|
||||||
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
||||||
|
p.observeExecutionState(existing)
|
||||||
return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1165,6 +1232,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT
|
|||||||
if existing != nil {
|
if existing != nil {
|
||||||
switch existing.Status {
|
switch existing.Status {
|
||||||
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled:
|
||||||
|
p.observeExecutionState(existing)
|
||||||
return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -318,6 +318,114 @@ func TestCardPayoutProcessor_Submit_SameParentDifferentOperationsStoredSeparatel
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCardPayoutProcessor_StrictMode_BlocksSecondOperationUntilFirstFinalCallback(t *testing.T) {
|
||||||
|
cfg := monetix.Config{
|
||||||
|
BaseURL: "https://monetix.test",
|
||||||
|
SecretKey: "secret",
|
||||||
|
ProjectID: 99,
|
||||||
|
StatusSuccess: "success",
|
||||||
|
StatusProcessing: "processing",
|
||||||
|
AllowedCurrencies: []string{"RUB"},
|
||||||
|
}
|
||||||
|
|
||||||
|
repo := newMockRepository()
|
||||||
|
var callN atomic.Int32
|
||||||
|
httpClient := &http.Client{
|
||||||
|
Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) {
|
||||||
|
n := callN.Add(1)
|
||||||
|
resp := monetix.APIResponse{}
|
||||||
|
resp.Operation.RequestID = fmt.Sprintf("req-%d", n)
|
||||||
|
body, _ := json.Marshal(resp)
|
||||||
|
return &http.Response{
|
||||||
|
StatusCode: http.StatusOK,
|
||||||
|
Body: io.NopCloser(bytes.NewReader(body)),
|
||||||
|
Header: http.Header{"Content-Type": []string{"application/json"}},
|
||||||
|
}, nil
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
|
||||||
|
processor := newCardPayoutProcessor(
|
||||||
|
zap.NewNop(),
|
||||||
|
cfg,
|
||||||
|
staticClock{now: time.Date(2026, 3, 4, 2, 3, 4, 0, time.UTC)},
|
||||||
|
repo,
|
||||||
|
httpClient,
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
defer processor.stopRetries()
|
||||||
|
processor.dispatchThrottleInterval = 0
|
||||||
|
processor.setExecutionMode(newStrictIsolatedPayoutExecutionMode())
|
||||||
|
|
||||||
|
req1 := validCardPayoutRequest()
|
||||||
|
req1.PayoutId = ""
|
||||||
|
req1.OperationRef = "op-strict-1"
|
||||||
|
req1.ParentPaymentRef = "payment-strict-1"
|
||||||
|
req1.IdempotencyKey = "idem-strict-1"
|
||||||
|
req1.CardPan = "2204310000002456"
|
||||||
|
|
||||||
|
req2 := validCardPayoutRequest()
|
||||||
|
req2.PayoutId = ""
|
||||||
|
req2.OperationRef = "op-strict-2"
|
||||||
|
req2.ParentPaymentRef = "payment-strict-2"
|
||||||
|
req2.IdempotencyKey = "idem-strict-2"
|
||||||
|
req2.CardPan = "2204320000009754"
|
||||||
|
|
||||||
|
if _, err := processor.Submit(context.Background(), req1); err != nil {
|
||||||
|
t.Fatalf("first submit failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
secondDone := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
_, err := processor.Submit(context.Background(), req2)
|
||||||
|
secondDone <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-secondDone:
|
||||||
|
t.Fatalf("second submit should block before first operation is final, err=%v", err)
|
||||||
|
case <-time.After(120 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
cb := baseCallback()
|
||||||
|
cb.Payment.ID = req1.GetOperationRef()
|
||||||
|
cb.Payment.Status = "success"
|
||||||
|
cb.Operation.Status = "success"
|
||||||
|
cb.Operation.Code = "0"
|
||||||
|
cb.Operation.Message = "Success"
|
||||||
|
cb.Payment.Sum.Currency = "RUB"
|
||||||
|
|
||||||
|
sig, err := monetix.SignPayload(cb, cfg.SecretKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to sign callback: %v", err)
|
||||||
|
}
|
||||||
|
cb.Signature = sig
|
||||||
|
payload, err := json.Marshal(cb)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to marshal callback: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
status, err := processor.ProcessCallback(context.Background(), payload)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("callback failed: %v", err)
|
||||||
|
}
|
||||||
|
if status != http.StatusOK {
|
||||||
|
t.Fatalf("unexpected callback status: %d", status)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-secondDone:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("second submit returned error: %v", err)
|
||||||
|
}
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatalf("timeout waiting for second submit to unblock")
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, want := callN.Load(), int32(2); got != want {
|
||||||
|
t.Fatalf("unexpected provider call count: got=%d want=%d", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCardPayoutProcessor_ProcessCallback_UpdatesMatchingOperationWithinSameParent(t *testing.T) {
|
func TestCardPayoutProcessor_ProcessCallback_UpdatesMatchingOperationWithinSameParent(t *testing.T) {
|
||||||
cfg := monetix.Config{
|
cfg := monetix.Config{
|
||||||
SecretKey: "secret",
|
SecretKey: "secret",
|
||||||
|
|||||||
@@ -77,3 +77,10 @@ func WithMessagingSettings(settings pmodel.SettingsT) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithStrictOperationIsolation serialises payout processing to one unresolved operation at a time.
|
||||||
|
func WithStrictOperationIsolation(enabled bool) Option {
|
||||||
|
return func(s *Service) {
|
||||||
|
s.strictIsolation = enabled
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,168 @@
|
|||||||
|
package gateway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/gateway/mntx/storage/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
payoutExecutionModeDefaultName = "default"
|
||||||
|
payoutExecutionModeStrictIsolatedName = "strict_isolated"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errPayoutExecutionModeStopped = errors.New("payout execution mode stopped")
|
||||||
|
|
||||||
|
type payoutExecutionMode interface {
|
||||||
|
Name() string
|
||||||
|
BeforeDispatch(ctx context.Context, operationRef string) error
|
||||||
|
OnPersistedState(operationRef string, status model.PayoutStatus)
|
||||||
|
Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
type defaultPayoutExecutionMode struct{}
|
||||||
|
|
||||||
|
func newDefaultPayoutExecutionMode() payoutExecutionMode {
|
||||||
|
return &defaultPayoutExecutionMode{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultPayoutExecutionMode) Name() string {
|
||||||
|
return payoutExecutionModeDefaultName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultPayoutExecutionMode) BeforeDispatch(_ context.Context, _ string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *defaultPayoutExecutionMode) OnPersistedState(_ string, _ model.PayoutStatus) {}
|
||||||
|
|
||||||
|
func (m *defaultPayoutExecutionMode) Shutdown() {}
|
||||||
|
|
||||||
|
type strictIsolatedPayoutExecutionMode struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
activeOperation string
|
||||||
|
waitCh chan struct{}
|
||||||
|
stopped bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStrictIsolatedPayoutExecutionMode() payoutExecutionMode {
|
||||||
|
return &strictIsolatedPayoutExecutionMode{
|
||||||
|
waitCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *strictIsolatedPayoutExecutionMode) Name() string {
|
||||||
|
return payoutExecutionModeStrictIsolatedName
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *strictIsolatedPayoutExecutionMode) BeforeDispatch(ctx context.Context, operationRef string) error {
|
||||||
|
opRef := strings.TrimSpace(operationRef)
|
||||||
|
if opRef == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.Background()
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
waitCh, allowed, err := m.tryAcquire(opRef)
|
||||||
|
if allowed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-waitCh:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *strictIsolatedPayoutExecutionMode) OnPersistedState(operationRef string, status model.PayoutStatus) {
|
||||||
|
opRef := strings.TrimSpace(operationRef)
|
||||||
|
if opRef == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
if m.stopped {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if isFinalPayoutStatus(status) {
|
||||||
|
if m.activeOperation == opRef {
|
||||||
|
m.activeOperation = ""
|
||||||
|
m.signalLocked()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.activeOperation == "" {
|
||||||
|
m.activeOperation = opRef
|
||||||
|
m.signalLocked()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *strictIsolatedPayoutExecutionMode) Shutdown() {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
if m.stopped {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.stopped = true
|
||||||
|
m.activeOperation = ""
|
||||||
|
m.signalLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *strictIsolatedPayoutExecutionMode) tryAcquire(operationRef string) (<-chan struct{}, bool, error) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
if m.stopped {
|
||||||
|
return nil, false, errPayoutExecutionModeStopped
|
||||||
|
}
|
||||||
|
|
||||||
|
switch owner := strings.TrimSpace(m.activeOperation); {
|
||||||
|
case owner == "":
|
||||||
|
m.activeOperation = operationRef
|
||||||
|
m.signalLocked()
|
||||||
|
return nil, true, nil
|
||||||
|
case owner == operationRef:
|
||||||
|
return nil, true, nil
|
||||||
|
default:
|
||||||
|
return m.waitCh, false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *strictIsolatedPayoutExecutionMode) signalLocked() {
|
||||||
|
if m.waitCh == nil {
|
||||||
|
m.waitCh = make(chan struct{})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
close(m.waitCh)
|
||||||
|
m.waitCh = make(chan struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func normalizePayoutExecutionMode(mode payoutExecutionMode) payoutExecutionMode {
|
||||||
|
if mode == nil {
|
||||||
|
return newDefaultPayoutExecutionMode()
|
||||||
|
}
|
||||||
|
return mode
|
||||||
|
}
|
||||||
|
|
||||||
|
func payoutExecutionModeName(mode payoutExecutionMode) string {
|
||||||
|
if mode == nil {
|
||||||
|
return payoutExecutionModeDefaultName
|
||||||
|
}
|
||||||
|
name := strings.TrimSpace(mode.Name())
|
||||||
|
if name == "" {
|
||||||
|
return payoutExecutionModeDefaultName
|
||||||
|
}
|
||||||
|
return name
|
||||||
|
}
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
package gateway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/gateway/mntx/storage/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStrictIsolatedPayoutExecutionMode_BlocksOtherOperationUntilFinalStatus(t *testing.T) {
|
||||||
|
mode := newStrictIsolatedPayoutExecutionMode()
|
||||||
|
if err := mode.BeforeDispatch(context.Background(), "op-1"); err != nil {
|
||||||
|
t.Fatalf("first acquire failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
waitCtx, waitCancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer waitCancel()
|
||||||
|
secondDone := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
secondDone <- mode.BeforeDispatch(waitCtx, "op-2")
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-secondDone:
|
||||||
|
t.Fatalf("second operation should be blocked before final status, got err=%v", err)
|
||||||
|
case <-time.After(80 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
mode.OnPersistedState("op-1", model.PayoutStatusWaiting)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-secondDone:
|
||||||
|
t.Fatalf("second operation should remain blocked on non-final status, got err=%v", err)
|
||||||
|
case <-time.After(80 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
mode.OnPersistedState("op-1", model.PayoutStatusSuccess)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-secondDone:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("second operation should proceed after final status, got err=%v", err)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatalf("timeout waiting for second operation to proceed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStrictIsolatedPayoutExecutionMode_AllowsSameOperationReentry(t *testing.T) {
|
||||||
|
mode := newStrictIsolatedPayoutExecutionMode()
|
||||||
|
if err := mode.BeforeDispatch(context.Background(), "op-1"); err != nil {
|
||||||
|
t.Fatalf("first acquire failed: %v", err)
|
||||||
|
}
|
||||||
|
if err := mode.BeforeDispatch(context.Background(), "op-1"); err != nil {
|
||||||
|
t.Fatalf("same operation should be re-entrant, got err=%v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -36,6 +36,7 @@ type Service struct {
|
|||||||
gatewayDescriptor *gatewayv1.GatewayInstanceDescriptor
|
gatewayDescriptor *gatewayv1.GatewayInstanceDescriptor
|
||||||
announcer *discovery.Announcer
|
announcer *discovery.Announcer
|
||||||
invokeURI string
|
invokeURI string
|
||||||
|
strictIsolation bool
|
||||||
|
|
||||||
connectorv1.UnimplementedConnectorServiceServer
|
connectorv1.UnimplementedConnectorServiceServer
|
||||||
}
|
}
|
||||||
@@ -90,6 +91,9 @@ func NewService(logger mlogger.Logger, opts ...Option) *Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
svc.card = newCardPayoutProcessor(svc.logger, svc.config, svc.clock, svc.storage, svc.httpClient, svc.producer)
|
svc.card = newCardPayoutProcessor(svc.logger, svc.config, svc.clock, svc.storage, svc.httpClient, svc.producer)
|
||||||
|
if svc.strictIsolation {
|
||||||
|
svc.card.setExecutionMode(newStrictIsolatedPayoutExecutionMode())
|
||||||
|
}
|
||||||
svc.card.outbox = &svc.outbox
|
svc.card.outbox = &svc.outbox
|
||||||
svc.card.msgCfg = svc.msgCfg
|
svc.card.msgCfg = svc.msgCfg
|
||||||
if err := svc.card.startOutboxReliableProducer(); err != nil {
|
if err := svc.card.startOutboxReliableProducer(); err != nil {
|
||||||
|
|||||||
19
api/gateway/mntx/internal/service/gateway/service_test.go
Normal file
19
api/gateway/mntx/internal/service/gateway/service_test.go
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package gateway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewService_StrictOperationIsolationOption(t *testing.T) {
|
||||||
|
svc := NewService(zap.NewNop(), WithStrictOperationIsolation(true))
|
||||||
|
t.Cleanup(svc.Shutdown)
|
||||||
|
|
||||||
|
if svc.card == nil {
|
||||||
|
t.Fatalf("expected card processor to be initialised")
|
||||||
|
}
|
||||||
|
if got, want := payoutExecutionModeName(svc.card.executionMode), payoutExecutionModeStrictIsolatedName; got != want {
|
||||||
|
t.Fatalf("execution mode mismatch: got=%q want=%q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,7 +16,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func isFinalStatus(t *model.CardPayout) bool {
|
func isFinalStatus(t *model.CardPayout) bool {
|
||||||
switch t.Status {
|
if t == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return isFinalPayoutStatus(t.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
func isFinalPayoutStatus(status model.PayoutStatus) bool {
|
||||||
|
switch status {
|
||||||
case model.PayoutStatusFailed, model.PayoutStatusSuccess, model.PayoutStatusCancelled:
|
case model.PayoutStatusFailed, model.PayoutStatusSuccess, model.PayoutStatusCancelled:
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
@@ -45,6 +52,7 @@ func (p *cardPayoutProcessor) updatePayoutStatus(ctx context.Context, state *mod
|
|||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
p.observeExecutionState(state)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,6 +73,7 @@ func (p *cardPayoutProcessor) updatePayoutStatus(ctx context.Context, state *mod
|
|||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
p.observeExecutionState(state)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user