package gateway import ( "context" "errors" "strings" "sync" "github.com/tech/sendico/gateway/mntx/storage/model" ) const ( payoutExecutionModeDefaultName = "default" payoutExecutionModeStrictIsolatedName = "strict_isolated" ) var errPayoutExecutionModeStopped = errors.New("payout execution mode stopped") type payoutExecutionMode interface { Name() string BeforeDispatch(ctx context.Context, operationRef string) error OnPersistedState(operationRef string, status model.PayoutStatus) Shutdown() } type defaultPayoutExecutionMode struct{} func newDefaultPayoutExecutionMode() payoutExecutionMode { return &defaultPayoutExecutionMode{} } func (m *defaultPayoutExecutionMode) Name() string { return payoutExecutionModeDefaultName } func (m *defaultPayoutExecutionMode) BeforeDispatch(_ context.Context, _ string) error { return nil } func (m *defaultPayoutExecutionMode) OnPersistedState(_ string, _ model.PayoutStatus) {} func (m *defaultPayoutExecutionMode) Shutdown() {} type strictIsolatedPayoutExecutionMode struct { mu sync.Mutex activeOperation string waitCh chan struct{} stopped bool } func newStrictIsolatedPayoutExecutionMode() payoutExecutionMode { return &strictIsolatedPayoutExecutionMode{ waitCh: make(chan struct{}), } } func (m *strictIsolatedPayoutExecutionMode) Name() string { return payoutExecutionModeStrictIsolatedName } func (m *strictIsolatedPayoutExecutionMode) BeforeDispatch(ctx context.Context, operationRef string) error { opRef := strings.TrimSpace(operationRef) if opRef == "" { return nil } if ctx == nil { ctx = context.Background() } for { waitCh, allowed, err := m.tryAcquire(opRef) if allowed { return nil } if err != nil { return err } select { case <-ctx.Done(): return ctx.Err() case <-waitCh: } } } func (m *strictIsolatedPayoutExecutionMode) OnPersistedState(operationRef string, status model.PayoutStatus) { opRef := strings.TrimSpace(operationRef) if opRef == "" { return } m.mu.Lock() defer m.mu.Unlock() if m.stopped { return } if isFinalPayoutStatus(status) { if m.activeOperation == opRef { m.activeOperation = "" m.signalLocked() } return } if m.activeOperation == "" { m.activeOperation = opRef m.signalLocked() } } func (m *strictIsolatedPayoutExecutionMode) Shutdown() { m.mu.Lock() defer m.mu.Unlock() if m.stopped { return } m.stopped = true m.activeOperation = "" m.signalLocked() } func (m *strictIsolatedPayoutExecutionMode) tryAcquire(operationRef string) (<-chan struct{}, bool, error) { m.mu.Lock() defer m.mu.Unlock() if m.stopped { return nil, false, errPayoutExecutionModeStopped } owner := strings.TrimSpace(m.activeOperation) switch owner { case "": m.activeOperation = operationRef m.signalLocked() return nil, true, nil case operationRef: return nil, true, nil default: return m.waitCh, false, nil } } func (m *strictIsolatedPayoutExecutionMode) signalLocked() { if m.waitCh == nil { m.waitCh = make(chan struct{}) return } close(m.waitCh) m.waitCh = make(chan struct{}) } func normalizePayoutExecutionMode(mode payoutExecutionMode) payoutExecutionMode { if mode == nil { return newDefaultPayoutExecutionMode() } return mode } func payoutExecutionModeName(mode payoutExecutionMode) string { if mode == nil { return payoutExecutionModeDefaultName } name := strings.TrimSpace(mode.Name()) if name == "" { return payoutExecutionModeDefaultName } return name }