Files
sendico/api/gateway/mntx/internal/service/gateway/payout_execution_mode.go
Stephan D e77d1ab793 linting
2026-03-10 12:31:09 +01:00

170 lines
3.5 KiB
Go

package gateway
import (
"context"
"errors"
"strings"
"sync"
"github.com/tech/sendico/gateway/mntx/storage/model"
)
const (
payoutExecutionModeDefaultName = "default"
payoutExecutionModeStrictIsolatedName = "strict_isolated"
)
var errPayoutExecutionModeStopped = errors.New("payout execution mode stopped")
type payoutExecutionMode interface {
Name() string
BeforeDispatch(ctx context.Context, operationRef string) error
OnPersistedState(operationRef string, status model.PayoutStatus)
Shutdown()
}
type defaultPayoutExecutionMode struct{}
func newDefaultPayoutExecutionMode() payoutExecutionMode {
return &defaultPayoutExecutionMode{}
}
func (m *defaultPayoutExecutionMode) Name() string {
return payoutExecutionModeDefaultName
}
func (m *defaultPayoutExecutionMode) BeforeDispatch(_ context.Context, _ string) error {
return nil
}
func (m *defaultPayoutExecutionMode) OnPersistedState(_ string, _ model.PayoutStatus) {}
func (m *defaultPayoutExecutionMode) Shutdown() {}
type strictIsolatedPayoutExecutionMode struct {
mu sync.Mutex
activeOperation string
waitCh chan struct{}
stopped bool
}
func newStrictIsolatedPayoutExecutionMode() payoutExecutionMode {
return &strictIsolatedPayoutExecutionMode{
waitCh: make(chan struct{}),
}
}
func (m *strictIsolatedPayoutExecutionMode) Name() string {
return payoutExecutionModeStrictIsolatedName
}
func (m *strictIsolatedPayoutExecutionMode) BeforeDispatch(ctx context.Context, operationRef string) error {
opRef := strings.TrimSpace(operationRef)
if opRef == "" {
return nil
}
if ctx == nil {
ctx = context.Background()
}
for {
waitCh, allowed, err := m.tryAcquire(opRef)
if allowed {
return nil
}
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-waitCh:
}
}
}
func (m *strictIsolatedPayoutExecutionMode) OnPersistedState(operationRef string, status model.PayoutStatus) {
opRef := strings.TrimSpace(operationRef)
if opRef == "" {
return
}
m.mu.Lock()
defer m.mu.Unlock()
if m.stopped {
return
}
if isFinalPayoutStatus(status) {
if m.activeOperation == opRef {
m.activeOperation = ""
m.signalLocked()
}
return
}
if m.activeOperation == "" {
m.activeOperation = opRef
m.signalLocked()
}
}
func (m *strictIsolatedPayoutExecutionMode) Shutdown() {
m.mu.Lock()
defer m.mu.Unlock()
if m.stopped {
return
}
m.stopped = true
m.activeOperation = ""
m.signalLocked()
}
func (m *strictIsolatedPayoutExecutionMode) tryAcquire(operationRef string) (<-chan struct{}, bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.stopped {
return nil, false, errPayoutExecutionModeStopped
}
owner := strings.TrimSpace(m.activeOperation)
switch owner {
case "":
m.activeOperation = operationRef
m.signalLocked()
return nil, true, nil
case operationRef:
return nil, true, nil
default:
return m.waitCh, false, nil
}
}
func (m *strictIsolatedPayoutExecutionMode) signalLocked() {
if m.waitCh == nil {
m.waitCh = make(chan struct{})
return
}
close(m.waitCh)
m.waitCh = make(chan struct{})
}
func normalizePayoutExecutionMode(mode payoutExecutionMode) payoutExecutionMode {
if mode == nil {
return newDefaultPayoutExecutionMode()
}
return mode
}
func payoutExecutionModeName(mode payoutExecutionMode) string {
if mode == nil {
return payoutExecutionModeDefaultName
}
name := strings.TrimSpace(mode.Name())
if name == "" {
return payoutExecutionModeDefaultName
}
return name
}