From 34e507b664101337cee84c65e2c5f087e67415f3 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Fri, 6 Mar 2026 11:58:07 +0100 Subject: [PATCH 1/4] bff USDT ledger creation --- .../internal/server/accountapiimp/signup.go | 62 ++++++++++++------- .../accountapiimp/signup_ledger_test.go | 38 +++++++----- 2 files changed, 61 insertions(+), 39 deletions(-) diff --git a/api/edge/bff/internal/server/accountapiimp/signup.go b/api/edge/bff/internal/server/accountapiimp/signup.go index 61608aca..8bc3d3aa 100644 --- a/api/edge/bff/internal/server/accountapiimp/signup.go +++ b/api/edge/bff/internal/server/accountapiimp/signup.go @@ -338,9 +338,6 @@ func (a *AccountAPI) openOrgLedgerAccount(ctx context.Context, org *model.Organi return merrors.Internal("chain gateway default asset is not configured") } - // TODO: remove hardcode - currency := "RUB" - var describable *describablev1.Describable name := strings.TrimSpace(sr.LedgerWallet.Name) var description *string @@ -357,26 +354,47 @@ func (a *AccountAPI) openOrgLedgerAccount(ctx context.Context, org *model.Organi } } - resp, err := a.ledgerClient.CreateAccount(ctx, &ledgerv1.CreateAccountRequest{ - OrganizationRef: org.ID.Hex(), - AccountType: ledgerv1.AccountType_ACCOUNT_TYPE_ASSET, - Currency: currency, - Status: ledgerv1.AccountStatus_ACCOUNT_STATUS_ACTIVE, - Role: ledgerv1.AccountRole_ACCOUNT_ROLE_OPERATING, - Metadata: map[string]string{ - "source": "signup", - "login": sr.Account.Login, - }, - Describable: describable, - }) - if err != nil { - a.logger.Warn("Failed to create ledger account for organization", zap.Error(err), mzap.StorableRef(org)) - return err - } - if resp == nil || resp.GetAccount() == nil || strings.TrimSpace(resp.GetAccount().GetLedgerAccountRef()) == "" { - return merrors.Internal("ledger returned empty account reference") + currencies := []string{"RUB", "USDT"} + if chainTokenCurrency := strings.ToUpper(strings.TrimSpace(a.chainAsset.GetTokenSymbol())); chainTokenCurrency != "" { + currencies = append(currencies, chainTokenCurrency) + } + + seen := make(map[string]struct{}, len(currencies)) + for _, currency := range currencies { + currency = strings.ToUpper(strings.TrimSpace(currency)) + if currency == "" { + continue + } + if _, exists := seen[currency]; exists { + continue + } + seen[currency] = struct{}{} + + resp, err := a.ledgerClient.CreateAccount(ctx, &ledgerv1.CreateAccountRequest{ + OrganizationRef: org.ID.Hex(), + AccountType: ledgerv1.AccountType_ACCOUNT_TYPE_ASSET, + Currency: currency, + Status: ledgerv1.AccountStatus_ACCOUNT_STATUS_ACTIVE, + Role: ledgerv1.AccountRole_ACCOUNT_ROLE_OPERATING, + Metadata: map[string]string{ + "source": "signup", + "login": sr.Account.Login, + }, + Describable: describable, + }) + if err != nil { + a.logger.Warn("Failed to create ledger account for organization", zap.Error(err), mzap.StorableRef(org), zap.String("currency", currency)) + return err + } + if resp == nil || resp.GetAccount() == nil || strings.TrimSpace(resp.GetAccount().GetLedgerAccountRef()) == "" { + return merrors.Internal("ledger returned empty account reference") + } + + a.logger.Info("Ledger account created for organization", + mzap.StorableRef(org), + zap.String("currency", currency), + zap.String("ledger_account_ref", resp.GetAccount().GetLedgerAccountRef())) } - a.logger.Info("Ledger account created for organization", mzap.StorableRef(org), zap.String("ledger_account_ref", resp.GetAccount().GetLedgerAccountRef())) return nil } diff --git a/api/edge/bff/internal/server/accountapiimp/signup_ledger_test.go b/api/edge/bff/internal/server/accountapiimp/signup_ledger_test.go index fadb4407..5acb6810 100644 --- a/api/edge/bff/internal/server/accountapiimp/signup_ledger_test.go +++ b/api/edge/bff/internal/server/accountapiimp/signup_ledger_test.go @@ -16,13 +16,13 @@ import ( ) type stubLedgerAccountClient struct { - createReq *ledgerv1.CreateAccountRequest + createReqs []*ledgerv1.CreateAccountRequest createResp *ledgerv1.CreateAccountResponse createErr error } func (s *stubLedgerAccountClient) CreateAccount(_ context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error) { - s.createReq = req + s.createReqs = append(s.createReqs, req) return s.createResp, s.createErr } @@ -31,7 +31,7 @@ func (s *stubLedgerAccountClient) Close() error { } func TestOpenOrgLedgerAccount(t *testing.T) { - t.Run("creates operating ledger account", func(t *testing.T) { + t.Run("creates operating ledger accounts for RUB and USDT", func(t *testing.T) { desc := " Main org ledger account " sr := &srequest.Signup{ Account: model.AccountData{ @@ -65,22 +65,26 @@ func TestOpenOrgLedgerAccount(t *testing.T) { err := api.openOrgLedgerAccount(context.Background(), org, sr) assert.NoError(t, err) - if assert.NotNil(t, ledgerStub.createReq) { - assert.Equal(t, org.ID.Hex(), ledgerStub.createReq.GetOrganizationRef()) - assert.Equal(t, "RUB", ledgerStub.createReq.GetCurrency()) - assert.Equal(t, ledgerv1.AccountType_ACCOUNT_TYPE_ASSET, ledgerStub.createReq.GetAccountType()) - assert.Equal(t, ledgerv1.AccountStatus_ACCOUNT_STATUS_ACTIVE, ledgerStub.createReq.GetStatus()) - assert.Equal(t, ledgerv1.AccountRole_ACCOUNT_ROLE_OPERATING, ledgerStub.createReq.GetRole()) - assert.Equal(t, map[string]string{ - "source": "signup", - "login": "owner@example.com", - }, ledgerStub.createReq.GetMetadata()) - if assert.NotNil(t, ledgerStub.createReq.GetDescribable()) { - assert.Equal(t, "Primary Ledger", ledgerStub.createReq.GetDescribable().GetName()) - if assert.NotNil(t, ledgerStub.createReq.GetDescribable().Description) { - assert.Equal(t, "Main org ledger account", ledgerStub.createReq.GetDescribable().GetDescription()) + if assert.Len(t, ledgerStub.createReqs, 2) { + currencies := make([]string, 0, len(ledgerStub.createReqs)) + for _, req := range ledgerStub.createReqs { + currencies = append(currencies, req.GetCurrency()) + assert.Equal(t, org.ID.Hex(), req.GetOrganizationRef()) + assert.Equal(t, ledgerv1.AccountType_ACCOUNT_TYPE_ASSET, req.GetAccountType()) + assert.Equal(t, ledgerv1.AccountStatus_ACCOUNT_STATUS_ACTIVE, req.GetStatus()) + assert.Equal(t, ledgerv1.AccountRole_ACCOUNT_ROLE_OPERATING, req.GetRole()) + assert.Equal(t, map[string]string{ + "source": "signup", + "login": "owner@example.com", + }, req.GetMetadata()) + if assert.NotNil(t, req.GetDescribable()) { + assert.Equal(t, "Primary Ledger", req.GetDescribable().GetName()) + if assert.NotNil(t, req.GetDescribable().Description) { + assert.Equal(t, "Main org ledger account", req.GetDescribable().GetDescription()) + } } } + assert.ElementsMatch(t, []string{"RUB", "USDT"}, currencies) } }) From c60e7d2329adac282a8c4268ec3cec08b94d8f65 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Fri, 6 Mar 2026 12:14:32 +0100 Subject: [PATCH 2/4] mntx error codes update --- .../service/gateway/card_processor.go | 99 +++-- .../service/gateway/card_processor_test.go | 289 ++++++++++++- .../mntx/internal/service/gateway/helpers.go | 3 + .../service/gateway/helpers_status_test.go | 14 + .../service/gateway/payout_failure_policy.go | 249 ++++++++++- .../gateway/payout_failure_policy_doccodes.go | 402 ++++++++++++++++++ .../gateway/payout_failure_policy_test.go | 267 +++++++++++- .../service/gateway/transfer_notifications.go | 4 +- .../gateway/transfer_notifications_test.go | 24 ++ .../mntx/internal/service/monetix/sender.go | 72 ++-- .../internal/service/monetix/sender_test.go | 93 ++++ api/gateway/mntx/storage/model/status.go | 7 +- 12 files changed, 1405 insertions(+), 118 deletions(-) create mode 100644 api/gateway/mntx/internal/service/gateway/helpers_status_test.go create mode 100644 api/gateway/mntx/internal/service/gateway/payout_failure_policy_doccodes.go create mode 100644 api/gateway/mntx/internal/service/gateway/transfer_notifications_test.go diff --git a/api/gateway/mntx/internal/service/gateway/card_processor.go b/api/gateway/mntx/internal/service/gateway/card_processor.go index 6ad8d5c1..a2262423 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor.go @@ -54,7 +54,7 @@ type cardPayoutProcessor struct { dispatchSerialGate chan struct{} retryPolicy payoutFailurePolicy - retryDelayFn func(attempt uint32) time.Duration + retryDelayFn func(attempt uint32, strategy payoutRetryStrategy) time.Duration retryMu sync.Mutex retryTimers map[string]*time.Timer @@ -149,15 +149,13 @@ func applyCardPayoutSendResult(state *model.CardPayout, result *monetix.CardPayo return } state.ProviderPaymentID = strings.TrimSpace(result.ProviderRequestID) + state.ProviderCode = strings.TrimSpace(result.ErrorCode) + state.ProviderMessage = strings.TrimSpace(result.ErrorMessage) if result.Accepted { state.Status = model.PayoutStatusWaiting - state.ProviderCode = "" - state.ProviderMessage = "" return } state.Status = model.PayoutStatusFailed - state.ProviderCode = strings.TrimSpace(result.ErrorCode) - state.ProviderMessage = strings.TrimSpace(result.ErrorMessage) } func payoutStateLogFields(state *model.CardPayout) []zap.Field { @@ -593,13 +591,20 @@ func payoutAcceptedForState(state *model.CardPayout) bool { return false } switch state.Status { - case model.PayoutStatusFailed, model.PayoutStatusCancelled: + case model.PayoutStatusFailed, model.PayoutStatusNeedsAttention, model.PayoutStatusCancelled: return false default: return true } } +func terminalStatusAfterRetryExhausted(decision payoutFailureDecision) model.PayoutStatus { + if decision.Action == payoutFailureActionRetry { + return model.PayoutStatusNeedsAttention + } + return model.PayoutStatusFailed +} + func cardPayoutResponseFromState( state *model.CardPayout, accepted bool, @@ -733,15 +738,21 @@ func (p *cardPayoutProcessor) scheduleRetryTimer(operationRef string, delay time p.retryTimers[key] = timer } -func retryDelayDuration(attempt uint32) time.Duration { - return time.Duration(retryDelayForAttempt(attempt)) * time.Second +func retryDelayDuration(attempt uint32, strategy payoutRetryStrategy) time.Duration { + return time.Duration(retryDelayForAttempt(attempt, strategy)) * time.Second } -func (p *cardPayoutProcessor) scheduleCardPayoutRetry(req *mntxv1.CardPayoutRequest, failedAttempt uint32, maxAttempts uint32) { +func (p *cardPayoutProcessor) scheduleCardPayoutRetry( + req *mntxv1.CardPayoutRequest, + failedAttempt uint32, + maxAttempts uint32, + strategy payoutRetryStrategy, +) { if p == nil || req == nil { return } maxAttempts = maxDispatchAttempts(maxAttempts) + strategy = normalizeRetryStrategy(strategy) nextAttempt := failedAttempt + 1 if nextAttempt > maxAttempts { return @@ -751,12 +762,13 @@ func (p *cardPayoutProcessor) scheduleCardPayoutRetry(req *mntxv1.CardPayoutRequ return } operationRef := findOperationRef(cloned.GetOperationRef(), cloned.GetPayoutId()) - delay := retryDelayDuration(failedAttempt) + delay := retryDelayDuration(failedAttempt, strategy) if p.retryDelayFn != nil { - delay = p.retryDelayFn(failedAttempt) + delay = p.retryDelayFn(failedAttempt, strategy) } p.logger.Info("Scheduling card payout retry", zap.String("operation_ref", operationRef), + zap.String("strategy", strategy.String()), zap.Uint32("failed_attempt", failedAttempt), zap.Uint32("next_attempt", nextAttempt), zap.Uint32("max_attempts", maxAttempts), @@ -767,11 +779,17 @@ func (p *cardPayoutProcessor) scheduleCardPayoutRetry(req *mntxv1.CardPayoutRequ }) } -func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry(req *mntxv1.CardTokenPayoutRequest, failedAttempt uint32, maxAttempts uint32) { +func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry( + req *mntxv1.CardTokenPayoutRequest, + failedAttempt uint32, + maxAttempts uint32, + strategy payoutRetryStrategy, +) { if p == nil || req == nil { return } maxAttempts = maxDispatchAttempts(maxAttempts) + strategy = normalizeRetryStrategy(strategy) nextAttempt := failedAttempt + 1 if nextAttempt > maxAttempts { return @@ -781,12 +799,13 @@ func (p *cardPayoutProcessor) scheduleCardTokenPayoutRetry(req *mntxv1.CardToken return } operationRef := findOperationRef(cloned.GetOperationRef(), cloned.GetPayoutId()) - delay := retryDelayDuration(failedAttempt) + delay := retryDelayDuration(failedAttempt, strategy) if p.retryDelayFn != nil { - delay = p.retryDelayFn(failedAttempt) + delay = p.retryDelayFn(failedAttempt, strategy) } p.logger.Info("Scheduling card token payout retry", zap.String("operation_ref", operationRef), + zap.String("strategy", strategy.String()), zap.Uint32("failed_attempt", failedAttempt), zap.Uint32("next_attempt", nextAttempt), zap.Uint32("max_attempts", maxAttempts), @@ -857,11 +876,11 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest, p.logger.Warn("Failed to persist retryable payout transport failure", zap.Error(upErr)) return } - p.scheduleCardPayoutRetry(req, attempt, maxAttempts) + p.scheduleCardPayoutRetry(req, attempt, maxAttempts, decision.Strategy) return } - state.Status = model.PayoutStatusFailed + state.Status = terminalStatusAfterRetryExhausted(decision) state.FailureReason = payoutFailureReason("", err.Error()) if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { p.logger.Warn("Failed to persist terminal payout transport failure", zap.Error(upErr)) @@ -889,11 +908,11 @@ func (p *cardPayoutProcessor) runCardPayoutRetry(req *mntxv1.CardPayoutRequest, p.logger.Warn("Failed to persist retryable payout provider failure", zap.Error(upErr)) return } - p.scheduleCardPayoutRetry(req, attempt, maxAttempts) + p.scheduleCardPayoutRetry(req, attempt, maxAttempts, decision.Strategy) return } - state.Status = model.PayoutStatusFailed + state.Status = terminalStatusAfterRetryExhausted(decision) state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { p.logger.Warn("Failed to persist terminal payout provider failure", zap.Error(upErr)) @@ -946,11 +965,11 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou p.logger.Warn("Failed to persist retryable token payout transport failure", zap.Error(upErr)) return } - p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts) + p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts, decision.Strategy) return } - state.Status = model.PayoutStatusFailed + state.Status = terminalStatusAfterRetryExhausted(decision) state.FailureReason = payoutFailureReason("", err.Error()) if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { p.logger.Warn("Failed to persist terminal token payout transport failure", zap.Error(upErr)) @@ -978,11 +997,11 @@ func (p *cardPayoutProcessor) runCardTokenPayoutRetry(req *mntxv1.CardTokenPayou p.logger.Warn("Failed to persist retryable token payout provider failure", zap.Error(upErr)) return } - p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts) + p.scheduleCardTokenPayoutRetry(req, attempt, maxAttempts, decision.Strategy) return } - state.Status = model.PayoutStatusFailed + state.Status = terminalStatusAfterRetryExhausted(decision) state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) if upErr := p.updatePayoutStatus(ctx, state); upErr != nil { p.logger.Warn("Failed to persist terminal token payout provider failure", zap.Error(upErr)) @@ -1067,7 +1086,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout } if existing != nil { switch existing.Status { - case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled: + case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusNeedsAttention, model.PayoutStatusCancelled: p.observeExecutionState(existing) return cardPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil } @@ -1088,11 +1107,11 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout p.logger.Warn("Failed to update payout status", fields...) return nil, e } - p.scheduleCardPayoutRetry(req, 1, maxAttempts) + p.scheduleCardPayoutRetry(req, 1, maxAttempts, decision.Strategy) return cardPayoutResponseFromState(state, true, "", ""), nil } - state.Status = model.PayoutStatusFailed + state.Status = terminalStatusAfterRetryExhausted(decision) state.FailureReason = payoutFailureReason("", err.Error()) if e := p.updatePayoutStatus(ctx, state); e != nil { fields := append([]zap.Field{zap.Error(e)}, payoutStateLogFields(state)...) @@ -1112,6 +1131,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout errorMessage := strings.TrimSpace(result.ErrorMessage) scheduleRetry := false retryMaxAttempts := uint32(0) + retryStrategy := payoutRetryStrategyImmediate if !result.Accepted { decision := p.retryPolicy.decideProviderFailure(result.ErrorCode) @@ -1124,8 +1144,9 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout errorMessage = "" scheduleRetry = true retryMaxAttempts = maxAttempts + retryStrategy = decision.Strategy } else { - state.Status = model.PayoutStatusFailed + state.Status = terminalStatusAfterRetryExhausted(decision) state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) p.clearRetryState(state.OperationRef) } @@ -1144,7 +1165,7 @@ func (p *cardPayoutProcessor) Submit(ctx context.Context, req *mntxv1.CardPayout return nil, err } if scheduleRetry { - p.scheduleCardPayoutRetry(req, 1, retryMaxAttempts) + p.scheduleCardPayoutRetry(req, 1, retryMaxAttempts, retryStrategy) } resp := cardPayoutResponseFromState(state, accepted, errorCode, errorMessage) @@ -1231,7 +1252,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT } if existing != nil { switch existing.Status { - case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusCancelled: + case model.PayoutStatusProcessing, model.PayoutStatusWaiting, model.PayoutStatusSuccess, model.PayoutStatusFailed, model.PayoutStatusNeedsAttention, model.PayoutStatusCancelled: p.observeExecutionState(existing) return cardTokenPayoutResponseFromState(existing, payoutAcceptedForState(existing), "", ""), nil } @@ -1250,11 +1271,11 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT if e := p.updatePayoutStatus(ctx, state); e != nil { return nil, e } - p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts) + p.scheduleCardTokenPayoutRetry(req, 1, maxAttempts, decision.Strategy) return cardTokenPayoutResponseFromState(state, true, "", ""), nil } - state.Status = model.PayoutStatusFailed + state.Status = terminalStatusAfterRetryExhausted(decision) state.FailureReason = payoutFailureReason("", err.Error()) if e := p.updatePayoutStatus(ctx, state); e != nil { return nil, e @@ -1274,6 +1295,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT errorMessage := strings.TrimSpace(result.ErrorMessage) scheduleRetry := false retryMaxAttempts := uint32(0) + retryStrategy := payoutRetryStrategyImmediate if !result.Accepted { decision := p.retryPolicy.decideProviderFailure(result.ErrorCode) @@ -1286,8 +1308,9 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT errorMessage = "" scheduleRetry = true retryMaxAttempts = maxAttempts + retryStrategy = decision.Strategy } else { - state.Status = model.PayoutStatusFailed + state.Status = terminalStatusAfterRetryExhausted(decision) state.FailureReason = payoutFailureReason(result.ErrorCode, result.ErrorMessage) p.clearRetryState(state.OperationRef) } @@ -1301,7 +1324,7 @@ func (p *cardPayoutProcessor) SubmitToken(ctx context.Context, req *mntxv1.CardT return nil, err } if scheduleRetry { - p.scheduleCardTokenPayoutRetry(req, 1, retryMaxAttempts) + p.scheduleCardTokenPayoutRetry(req, 1, retryMaxAttempts, retryStrategy) } resp := cardTokenPayoutResponseFromState(state, accepted, errorCode, errorMessage) @@ -1470,7 +1493,7 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt } retryScheduled := false - if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled { + if state.Status == model.PayoutStatusFailed || state.Status == model.PayoutStatusCancelled || state.Status == model.PayoutStatusNeedsAttention { decision := p.retryPolicy.decideProviderFailure(state.ProviderCode) attemptsUsed := p.currentDispatchAttempt(operationRef) maxAttempts := p.maxDispatchAttempts() @@ -1488,7 +1511,7 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt p.logger.Warn("Failed to persist callback retry scheduling state", zap.Error(err)) return http.StatusInternalServerError, err } - p.scheduleCardPayoutRetry(req, attemptsUsed, maxAttempts) + p.scheduleCardPayoutRetry(req, attemptsUsed, maxAttempts, decision.Strategy) retryScheduled = true } else if req := p.loadCardTokenRetryRequest(operationRef); req != nil { state.Status = model.PayoutStatusProcessing @@ -1503,7 +1526,7 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt p.logger.Warn("Failed to persist callback token retry scheduling state", zap.Error(err)) return http.StatusInternalServerError, err } - p.scheduleCardTokenPayoutRetry(req, attemptsUsed, maxAttempts) + p.scheduleCardTokenPayoutRetry(req, attemptsUsed, maxAttempts, decision.Strategy) retryScheduled = true } else { p.logger.Warn("Retryable callback decline received but no retry request snapshot found", @@ -1514,6 +1537,12 @@ func (p *cardPayoutProcessor) ProcessCallback(ctx context.Context, payload []byt ) } } + if !retryScheduled && decision.Action == payoutFailureActionRetry { + state.Status = model.PayoutStatusNeedsAttention + } + if existing != nil && existing.Status == model.PayoutStatusNeedsAttention { + state.Status = model.PayoutStatusNeedsAttention + } if !retryScheduled && strings.TrimSpace(state.FailureReason) == "" { state.FailureReason = payoutFailureReason(state.ProviderCode, state.ProviderMessage) } diff --git a/api/gateway/mntx/internal/service/gateway/card_processor_test.go b/api/gateway/mntx/internal/service/gateway/card_processor_test.go index 0ba14fa9..d25b0795 100644 --- a/api/gateway/mntx/internal/service/gateway/card_processor_test.go +++ b/api/gateway/mntx/internal/service/gateway/card_processor_test.go @@ -101,6 +101,68 @@ func TestCardPayoutProcessor_Submit_Success(t *testing.T) { } } +func TestCardPayoutProcessor_Submit_AcceptedBodyErrorRemainsWaiting(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + ProjectID: 99, + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + resp := monetix.APIResponse{ + Status: "error", + Code: "3062", + Message: "Payment details not received", + } + body, _ := json.Marshal(resp) + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + now := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + processor := newCardPayoutProcessor(zap.NewNop(), cfg, staticClock{now: now}, repo, httpClient, nil) + + req := validCardPayoutRequest() + + resp, err := processor.Submit(context.Background(), req) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if !resp.GetAccepted() { + t.Fatalf("expected accepted payout response") + } + if resp.GetPayout().GetStatus() != mntxv1.PayoutStatus_PAYOUT_STATUS_WAITING { + t.Fatalf("expected waiting status, got %v", resp.GetPayout().GetStatus()) + } + if got := resp.GetErrorCode(); got != "3062" { + t.Fatalf("expected response error code %q, got %q", "3062", got) + } + if got := resp.GetErrorMessage(); got != "Payment details not received" { + t.Fatalf("expected response error message, got %q", got) + } + + stored, ok := repo.payouts.Get(req.GetPayoutId()) + if !ok || stored == nil { + t.Fatalf("expected payout state stored") + } + if got := stored.Status; got != model.PayoutStatusWaiting { + t.Fatalf("expected stored waiting status, got %v", got) + } + if got := stored.ProviderCode; got != "3062" { + t.Fatalf("expected stored provider code %q, got %q", "3062", got) + } + if got := stored.ProviderMessage; got != "Payment details not received" { + t.Fatalf("expected stored provider message, got %q", got) + } +} + func TestCardPayoutProcessor_Submit_MissingConfig(t *testing.T) { cfg := monetix.Config{ AllowedCurrencies: []string{"RUB"}, @@ -525,7 +587,7 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineUntilSuccess(t *t n := calls.Add(1) resp := monetix.APIResponse{} if n == 1 { - resp.Code = providerCodeDeclineAmountOrFrequencyLimit + resp.Code = "10101" resp.Message = "Decline due to amount or frequency limit" body, _ := json.Marshal(resp) return &http.Response{ @@ -554,7 +616,7 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineUntilSuccess(t *t ) defer processor.stopRetries() processor.dispatchThrottleInterval = 0 - processor.retryDelayFn = func(uint32) time.Duration { return 10 * time.Millisecond } + processor.retryDelayFn = func(uint32, payoutRetryStrategy) time.Duration { return 10 * time.Millisecond } req := validCardPayoutRequest() resp, err := processor.Submit(context.Background(), req) @@ -581,7 +643,7 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineUntilSuccess(t *t } } -func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *testing.T) { +func TestCardPayoutProcessor_Submit_ProviderRetryUsesDelayedStrategy(t *testing.T) { cfg := monetix.Config{ BaseURL: "https://monetix.test", SecretKey: "secret", @@ -590,12 +652,10 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *test } repo := newMockRepository() - var calls atomic.Int32 httpClient := &http.Client{ Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { - _ = calls.Add(1) resp := monetix.APIResponse{ - Code: providerCodeDeclineAmountOrFrequencyLimit, + Code: "10101", Message: "Decline due to amount or frequency limit", } body, _ := json.Marshal(resp) @@ -617,7 +677,159 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *test ) defer processor.stopRetries() processor.dispatchThrottleInterval = 0 - processor.retryDelayFn = func(uint32) time.Duration { return time.Millisecond } + + capturedStrategy := payoutRetryStrategy(0) + processor.retryDelayFn = func(_ uint32, strategy payoutRetryStrategy) time.Duration { + capturedStrategy = strategy + return time.Hour + } + + resp, err := processor.Submit(context.Background(), validCardPayoutRequest()) + if err != nil { + t.Fatalf("submit returned error: %v", err) + } + if !resp.GetAccepted() { + t.Fatalf("expected accepted response when retry is scheduled") + } + if got := normalizeRetryStrategy(capturedStrategy); got != payoutRetryStrategyDelayed { + t.Fatalf("unexpected retry strategy: got=%v want=%v", got, payoutRetryStrategyDelayed) + } +} + +func TestCardPayoutProcessor_Submit_StatusRefreshRetryUsesStatusRefreshStrategy(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + ProjectID: 99, + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + resp := monetix.APIResponse{ + Code: "3061", + Message: "Transaction not found", + } + body, _ := json.Marshal(resp) + return &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2026, 3, 4, 1, 2, 3, 0, time.UTC)}, + repo, + httpClient, + nil, + ) + defer processor.stopRetries() + processor.dispatchThrottleInterval = 0 + + capturedStrategy := payoutRetryStrategy(0) + processor.retryDelayFn = func(_ uint32, strategy payoutRetryStrategy) time.Duration { + capturedStrategy = strategy + return time.Hour + } + + resp, err := processor.Submit(context.Background(), validCardPayoutRequest()) + if err != nil { + t.Fatalf("submit returned error: %v", err) + } + if !resp.GetAccepted() { + t.Fatalf("expected accepted response when retry is scheduled") + } + if got := normalizeRetryStrategy(capturedStrategy); got != payoutRetryStrategyStatusRefresh { + t.Fatalf("unexpected retry strategy: got=%v want=%v", got, payoutRetryStrategyStatusRefresh) + } +} + +func TestCardPayoutProcessor_Submit_TransportRetryUsesImmediateStrategy(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + ProjectID: 99, + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + return nil, errors.New("transport timeout") + }), + } + + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2026, 3, 4, 1, 2, 3, 0, time.UTC)}, + repo, + httpClient, + nil, + ) + defer processor.stopRetries() + processor.dispatchThrottleInterval = 0 + + capturedStrategy := payoutRetryStrategy(0) + processor.retryDelayFn = func(_ uint32, strategy payoutRetryStrategy) time.Duration { + capturedStrategy = strategy + return time.Hour + } + + resp, err := processor.Submit(context.Background(), validCardPayoutRequest()) + if err != nil { + t.Fatalf("submit returned error: %v", err) + } + if !resp.GetAccepted() { + t.Fatalf("expected accepted response when retry is scheduled") + } + if got := normalizeRetryStrategy(capturedStrategy); got != payoutRetryStrategyImmediate { + t.Fatalf("unexpected retry strategy: got=%v want=%v", got, payoutRetryStrategyImmediate) + } +} + +func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenNeedsAttention(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + ProjectID: 99, + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + var calls atomic.Int32 + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + _ = calls.Add(1) + resp := monetix.APIResponse{ + Code: "10101", + Message: "Decline due to amount or frequency limit", + } + body, _ := json.Marshal(resp) + return &http.Response{ + StatusCode: http.StatusTooManyRequests, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2026, 3, 4, 1, 2, 3, 0, time.UTC)}, + repo, + httpClient, + nil, + ) + defer processor.stopRetries() + processor.dispatchThrottleInterval = 0 + processor.retryDelayFn = func(uint32, payoutRetryStrategy) time.Duration { return time.Millisecond } req := validCardPayoutRequest() resp, err := processor.Submit(context.Background(), req) @@ -631,14 +843,14 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *test deadline := time.Now().Add(2 * time.Second) for { state, ok := repo.payouts.Get(req.GetPayoutId()) - if ok && state != nil && state.Status == model.PayoutStatusFailed { - if !strings.Contains(state.FailureReason, providerCodeDeclineAmountOrFrequencyLimit) { + if ok && state != nil && state.Status == model.PayoutStatusNeedsAttention { + if !strings.Contains(state.FailureReason, "10101") { t.Fatalf("expected failure reason to include provider code, got=%q", state.FailureReason) } break } if time.Now().After(deadline) { - t.Fatalf("timeout waiting for terminal failed status") + t.Fatalf("timeout waiting for terminal needs_attention status") } time.Sleep(10 * time.Millisecond) } @@ -647,6 +859,59 @@ func TestCardPayoutProcessor_Submit_RetriesProviderLimitDeclineThenFails(t *test } } +func TestCardPayoutProcessor_Submit_NonRetryProviderDeclineRemainsFailed(t *testing.T) { + cfg := monetix.Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + ProjectID: 99, + AllowedCurrencies: []string{"RUB"}, + } + + repo := newMockRepository() + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + resp := monetix.APIResponse{ + Code: "10003", + Message: "Decline by anti-fraud policy", + } + body, _ := json.Marshal(resp) + return &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + processor := newCardPayoutProcessor( + zap.NewNop(), + cfg, + staticClock{now: time.Date(2026, 3, 4, 1, 2, 3, 0, time.UTC)}, + repo, + httpClient, + nil, + ) + defer processor.stopRetries() + processor.dispatchThrottleInterval = 0 + + req := validCardPayoutRequest() + resp, err := processor.Submit(context.Background(), req) + if err != nil { + t.Fatalf("submit returned error: %v", err) + } + if resp.GetAccepted() { + t.Fatalf("expected non-accepted response for non-retryable provider decline") + } + + state, ok := repo.payouts.Get(req.GetPayoutId()) + if !ok || state == nil { + t.Fatal("expected stored payout state") + } + if got, want := state.Status, model.PayoutStatusFailed; got != want { + t.Fatalf("unexpected payout status: got=%q want=%q", got, want) + } +} + func TestCardPayoutProcessor_ProcessCallback_RetryableDeclineSchedulesRetry(t *testing.T) { cfg := monetix.Config{ BaseURL: "https://monetix.test", @@ -687,7 +952,7 @@ func TestCardPayoutProcessor_ProcessCallback_RetryableDeclineSchedulesRetry(t *t ) defer processor.stopRetries() processor.dispatchThrottleInterval = 0 - processor.retryDelayFn = func(uint32) time.Duration { return 5 * time.Millisecond } + processor.retryDelayFn = func(uint32, payoutRetryStrategy) time.Duration { return 5 * time.Millisecond } req := validCardPayoutRequest() resp, err := processor.Submit(context.Background(), req) @@ -702,7 +967,7 @@ func TestCardPayoutProcessor_ProcessCallback_RetryableDeclineSchedulesRetry(t *t cb.Payment.ID = req.GetPayoutId() cb.Payment.Status = "failed" cb.Operation.Status = "failed" - cb.Operation.Code = providerCodeDeclineAmountOrFrequencyLimit + cb.Operation.Code = "10101" cb.Operation.Message = "Decline due to amount or frequency limit" cb.Payment.Sum.Currency = "RUB" diff --git a/api/gateway/mntx/internal/service/gateway/helpers.go b/api/gateway/mntx/internal/service/gateway/helpers.go index 629b4ff2..981ad450 100644 --- a/api/gateway/mntx/internal/service/gateway/helpers.go +++ b/api/gateway/mntx/internal/service/gateway/helpers.go @@ -69,6 +69,9 @@ func payoutStatusToProto(s model.PayoutStatus) mntxv1.PayoutStatus { return mntxv1.PayoutStatus_PAYOUT_STATUS_SUCCESS case model.PayoutStatusFailed: return mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED + case model.PayoutStatusNeedsAttention: + // Connector/gateway proto does not expose needs_attention yet; map it to failed externally. + return mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED case model.PayoutStatusCancelled: return mntxv1.PayoutStatus_PAYOUT_STATUS_CANCELLED default: diff --git a/api/gateway/mntx/internal/service/gateway/helpers_status_test.go b/api/gateway/mntx/internal/service/gateway/helpers_status_test.go new file mode 100644 index 00000000..da2dc5c5 --- /dev/null +++ b/api/gateway/mntx/internal/service/gateway/helpers_status_test.go @@ -0,0 +1,14 @@ +package gateway + +import ( + "testing" + + "github.com/tech/sendico/gateway/mntx/storage/model" + mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" +) + +func TestPayoutStatusToProto_NeedsAttentionMapsToFailed(t *testing.T) { + if got, want := payoutStatusToProto(model.PayoutStatusNeedsAttention), mntxv1.PayoutStatus_PAYOUT_STATUS_FAILED; got != want { + t.Fatalf("unexpected proto status: got=%v want=%v", got, want) + } +} diff --git a/api/gateway/mntx/internal/service/gateway/payout_failure_policy.go b/api/gateway/mntx/internal/service/gateway/payout_failure_policy.go index 83b84e9d..ed325935 100644 --- a/api/gateway/mntx/internal/service/gateway/payout_failure_policy.go +++ b/api/gateway/mntx/internal/service/gateway/payout_failure_policy.go @@ -1,13 +1,11 @@ package gateway import ( + "sort" + "strconv" "strings" ) -const ( - providerCodeDeclineAmountOrFrequencyLimit = "10101" -) - type payoutFailureAction int const ( @@ -15,47 +13,137 @@ const ( payoutFailureActionRetry ) +type payoutRetryStrategy int + +const ( + payoutRetryStrategyImmediate payoutRetryStrategy = iota + 1 + payoutRetryStrategyDelayed + payoutRetryStrategyStatusRefresh +) + type payoutFailureDecision struct { - Action payoutFailureAction - Reason string + Action payoutFailureAction + Strategy payoutRetryStrategy + Reason string } type payoutFailurePolicy struct { - providerCodeActions map[string]payoutFailureAction + providerCodeStrategies map[string]payoutRetryStrategy + documentedProviderCodes map[string]struct{} } -func defaultPayoutFailurePolicy() payoutFailurePolicy { - return payoutFailurePolicy{ - providerCodeActions: map[string]payoutFailureAction{ - providerCodeDeclineAmountOrFrequencyLimit: payoutFailureActionRetry, +type retryCodeBucket struct { + strategy payoutRetryStrategy + retryable bool + codes []string +} + +var providerRetryOnlyCodeBuckets = []retryCodeBucket{ + // GTX "repeat request now / temporary issue" style codes. + { + strategy: payoutRetryStrategyImmediate, + retryable: true, + codes: []string{ + // General codes. + "104", "108", "301", "320", "601", "602", "603", "3025", "3198", + // External card PS codes. + "10000", "10100", "10104", "10105", "10107", "10202", "102051", "10301", "105012", "10505", "10601", "10602", "10603", + // External alternate PS codes. + "20000", "20100", "20104", "20105", "20202", "20301", "20304", "20601", "20602", "20603", }, + }, + // GTX "retry later / limits / period restrictions" style codes. + { + strategy: payoutRetryStrategyDelayed, + retryable: true, + codes: []string{ + // General codes. + "312", "314", "315", "316", "325", "2466", + "3106", "3108", "3109", "3110", "3111", "3112", + "3285", "3297", "3298", + "3305", "3306", "3307", "3308", "3309", "3310", "3311", "3312", "3313", "3314", "3315", "3316", "3317", "3318", "3319", "3320", "3321", "3322", "3323", "3324", "3325", "3326", "3327", "3328", "3329", "3330", "3331", "3332", "3333", "3334", "3335", "3336", "3337", "3338", "3339", "3340", + "3342", "3343", "3344", "3345", "3346", "3347", "3348", "3349", "3350", "3351", "3352", "3353", "3355", "3357", + "3407", "3408", "3450", "3451", "3452", "3613", + // External card PS codes. + "10101", "10109", "10112", "10114", "101012", "101013", "101014", + // External alternate PS codes. + "20109", "20206", "20505", "201012", "201013", "201014", + }, + }, + // GTX status refresh/polling conditions. + { + strategy: payoutRetryStrategyStatusRefresh, + retryable: true, + codes: []string{ + "3061", "3062", + "9999", "19999", "20802", "29999", + }, + }, +} + +var providerDocumentedNonRetryCodes = buildDocumentedNonRetryCodes(providerDocumentedCodes, providerRetryOnlyCodeBuckets) + +var providerRetryCodeBuckets = func() []retryCodeBucket { + buckets := make([]retryCodeBucket, 0, len(providerRetryOnlyCodeBuckets)+1) + buckets = append(buckets, providerRetryOnlyCodeBuckets...) + buckets = append(buckets, retryCodeBucket{ + strategy: payoutRetryStrategyImmediate, + retryable: false, + codes: providerDocumentedNonRetryCodes, + }) + return buckets +}() + +func defaultPayoutFailurePolicy() payoutFailurePolicy { + strategies := map[string]payoutRetryStrategy{} + for _, bucket := range providerRetryCodeBuckets { + if !bucket.retryable { + continue + } + registerRetryStrategy(strategies, bucket.strategy, bucket.codes...) + } + + return payoutFailurePolicy{ + providerCodeStrategies: strategies, + documentedProviderCodes: newCodeSet(providerDocumentedCodes), } } func (p payoutFailurePolicy) decideProviderFailure(code string) payoutFailureDecision { - normalized := strings.TrimSpace(code) + normalized := normalizeProviderCode(code) if normalized == "" { return payoutFailureDecision{ - Action: payoutFailureActionFail, - Reason: "provider_failure", + Action: payoutFailureActionFail, + Strategy: payoutRetryStrategyImmediate, + Reason: "provider_failure", } } - if action, ok := p.providerCodeActions[normalized]; ok { + if strategy, ok := p.providerCodeStrategies[normalized]; ok { return payoutFailureDecision{ - Action: action, - Reason: "provider_code_" + normalized, + Action: payoutFailureActionRetry, + Strategy: strategy, + Reason: "provider_code_" + normalized, + } + } + if _, ok := p.documentedProviderCodes[normalized]; ok { + return payoutFailureDecision{ + Action: payoutFailureActionFail, + Strategy: payoutRetryStrategyImmediate, + Reason: "provider_code_" + normalized + "_documented_non_retry", } } return payoutFailureDecision{ - Action: payoutFailureActionFail, - Reason: "provider_code_" + normalized, + Action: payoutFailureActionFail, + Strategy: payoutRetryStrategyImmediate, + Reason: "provider_code_" + normalized + "_unknown", } } func (p payoutFailurePolicy) decideTransportFailure() payoutFailureDecision { return payoutFailureDecision{ - Action: payoutFailureActionRetry, - Reason: "transport_failure", + Action: payoutFailureActionRetry, + Strategy: payoutRetryStrategyImmediate, + Reason: "transport_failure", } } @@ -72,8 +160,40 @@ func payoutFailureReason(code, message string) string { } } -func retryDelayForAttempt(attempt uint32) int { - // Backoff in seconds by attempt number (attempt starts at 1). +func retryDelayForAttempt(attempt uint32, strategy payoutRetryStrategy) int { + strategy = normalizeRetryStrategy(strategy) + + // Backoff in seconds by strategy and attempt number (attempt starts at 1). + if strategy == payoutRetryStrategyStatusRefresh { + switch { + case attempt <= 1: + return 10 + case attempt == 2: + return 20 + case attempt == 3: + return 40 + case attempt == 4: + return 80 + default: + return 160 + } + } + + if strategy == payoutRetryStrategyDelayed { + switch { + case attempt <= 1: + return 30 + case attempt == 2: + return 120 + case attempt == 3: + return 600 + case attempt == 4: + return 1800 + default: + return 7200 + } + } + switch { case attempt <= 1: return 5 @@ -85,3 +205,86 @@ func retryDelayForAttempt(attempt uint32) int { return 60 } } + +func registerRetryStrategy(dst map[string]payoutRetryStrategy, strategy payoutRetryStrategy, codes ...string) { + if dst == nil || len(codes) == 0 { + return + } + strategy = normalizeRetryStrategy(strategy) + for _, code := range codes { + normalized := normalizeProviderCode(code) + if normalized == "" { + continue + } + dst[normalized] = strategy + } +} + +func newCodeSet(codes []string) map[string]struct{} { + set := map[string]struct{}{} + for _, code := range codes { + normalized := normalizeProviderCode(code) + if normalized == "" { + continue + } + set[normalized] = struct{}{} + } + return set +} + +func buildDocumentedNonRetryCodes(documented []string, retryBuckets []retryCodeBucket) []string { + documentedSet := newCodeSet(documented) + retrySet := map[string]struct{}{} + for _, bucket := range retryBuckets { + for _, code := range bucket.codes { + normalized := normalizeProviderCode(code) + if normalized == "" { + continue + } + retrySet[normalized] = struct{}{} + } + } + + nonRetry := make([]string, 0, len(documentedSet)) + for code := range documentedSet { + if _, ok := retrySet[code]; ok { + continue + } + nonRetry = append(nonRetry, code) + } + + sort.Slice(nonRetry, func(i, j int) bool { + left, leftErr := strconv.Atoi(nonRetry[i]) + right, rightErr := strconv.Atoi(nonRetry[j]) + if leftErr != nil || rightErr != nil { + return nonRetry[i] < nonRetry[j] + } + return left < right + }) + + return nonRetry +} + +func normalizeProviderCode(code string) string { + return strings.TrimSpace(code) +} + +func normalizeRetryStrategy(strategy payoutRetryStrategy) payoutRetryStrategy { + switch strategy { + case payoutRetryStrategyDelayed, payoutRetryStrategyStatusRefresh: + return strategy + default: + return payoutRetryStrategyImmediate + } +} + +func (s payoutRetryStrategy) String() string { + switch normalizeRetryStrategy(s) { + case payoutRetryStrategyDelayed: + return "delayed" + case payoutRetryStrategyStatusRefresh: + return "status_refresh" + default: + return "immediate" + } +} diff --git a/api/gateway/mntx/internal/service/gateway/payout_failure_policy_doccodes.go b/api/gateway/mntx/internal/service/gateway/payout_failure_policy_doccodes.go new file mode 100644 index 00000000..c75325d9 --- /dev/null +++ b/api/gateway/mntx/internal/service/gateway/payout_failure_policy_doccodes.go @@ -0,0 +1,402 @@ +package gateway + +// providerDocumentedCodes is the normalized list of numeric response codes documented in +// https://developers.gtxpoint.com/ru/ru_gate_statuses_and_response_codes.html +// (all response-code tables). +var providerDocumentedCodes = []string{ + "0", + "100", + "104", + "108", + "109", + "301", + "303", + "309", + "310", + "311", + "312", + "313", + "314", + "315", + "316", + "320", + "325", + "402", + "501", + "502", + "504", + "601", + "602", + "603", + "702", + "903", + "904", + "1337", + "1401", + "1402", + "1403", + "1404", + "1405", + "1406", + "1407", + "1408", + "1409", + "1410", + "1411", + "1412", + "1413", + "1415", + "1416", + "1417", + "1418", + "1419", + "1420", + "1421", + "1422", + "1423", + "1424", + "1425", + "1426", + "1427", + "1428", + "1429", + "1430", + "1431", + "1432", + "1433", + "1434", + "1435", + "1436", + "1437", + "1438", + "1439", + "1441", + "1451", + "1452", + "1453", + "1454", + "1455", + "1456", + "1457", + "1461", + "1462", + "1463", + "1464", + "1499", + "2003", + "2004", + "2005", + "2008", + "2014", + "2061", + "2123", + "2124", + "2154", + "2164", + "2261", + "2426", + "2442", + "2466", + "2541", + "2606", + "2609", + "2610", + "2611", + "2641", + "2642", + "2701", + "2801", + "2881", + "2945", + "2949", + "3001", + "3002", + "3003", + "3004", + "3019", + "3020", + "3021", + "3022", + "3023", + "3024", + "3025", + "3026", + "3027", + "3028", + "3029", + "3030", + "3041", + "3059", + "3060", + "3061", + "3062", + "3081", + "3101", + "3102", + "3103", + "3104", + "3105", + "3106", + "3107", + "3108", + "3109", + "3110", + "3111", + "3112", + "3118", + "3119", + "3120", + "3121", + "3122", + "3123", + "3124", + "3141", + "3161", + "3181", + "3182", + "3183", + "3184", + "3191", + "3192", + "3193", + "3194", + "3195", + "3196", + "3197", + "3198", + "3199", + "3200", + "3201", + "3221", + "3230", + "3241", + "3242", + "3243", + "3244", + "3261", + "3262", + "3281", + "3283", + "3284", + "3285", + "3286", + "3287", + "3288", + "3289", + "3291", + "3292", + "3293", + "3297", + "3298", + "3299", + "3301", + "3303", + "3304", + "3305", + "3306", + "3307", + "3308", + "3309", + "3310", + "3311", + "3312", + "3313", + "3314", + "3315", + "3316", + "3317", + "3318", + "3319", + "3320", + "3321", + "3322", + "3323", + "3324", + "3325", + "3326", + "3327", + "3328", + "3329", + "3330", + "3331", + "3332", + "3333", + "3334", + "3335", + "3336", + "3337", + "3338", + "3339", + "3340", + "3341", + "3342", + "3343", + "3344", + "3345", + "3346", + "3347", + "3348", + "3349", + "3350", + "3351", + "3352", + "3353", + "3355", + "3356", + "3357", + "3358", + "3360", + "3400", + "3402", + "3403", + "3404", + "3405", + "3406", + "3407", + "3408", + "3409", + "3410", + "3411", + "3412", + "3413", + "3414", + "3415", + "3416", + "3417", + "3418", + "3419", + "3431", + "3432", + "3433", + "3434", + "3435", + "3436", + "3437", + "3438", + "3439", + "3450", + "3451", + "3452", + "3470", + "3471", + "3472", + "3480", + "3485", + "3490", + "3491", + "3609", + "3610", + "3611", + "3612", + "3613", + "9999", + "10000", + "10100", + "10101", + "10102", + "10103", + "10104", + "10105", + "10106", + "10107", + "10108", + "10109", + "10110", + "10111", + "10112", + "10113", + "10114", + "10201", + "10202", + "10203", + "10204", + "10205", + "10301", + "10401", + "10402", + "10403", + "10404", + "10405", + "10501", + "10502", + "10503", + "10504", + "10505", + "10601", + "10602", + "10603", + "10701", + "10702", + "10703", + "10704", + "10705", + "10706", + "10707", + "10708", + "10709", + "10722", + "10801", + "10805", + "10806", + "10807", + "10811", + "10812", + "19999", + "20000", + "20100", + "20101", + "20102", + "20103", + "20104", + "20105", + "20106", + "20107", + "20109", + "20201", + "20202", + "20203", + "20204", + "20205", + "20206", + "20301", + "20302", + "20303", + "20304", + "20401", + "20402", + "20501", + "20502", + "20503", + "20504", + "20505", + "20601", + "20602", + "20603", + "20604", + "20701", + "20702", + "20703", + "20705", + "20706", + "20801", + "20802", + "29999", + "30000", + "30100", + "30301", + "30302", + "30303", + "30401", + "101011", + "101012", + "101013", + "101014", + "101021", + "102051", + "105012", + "108010", + "201011", + "201012", + "201013", + "201014", +} diff --git a/api/gateway/mntx/internal/service/gateway/payout_failure_policy_test.go b/api/gateway/mntx/internal/service/gateway/payout_failure_policy_test.go index d5566f57..1d35914b 100644 --- a/api/gateway/mntx/internal/service/gateway/payout_failure_policy_test.go +++ b/api/gateway/mntx/internal/service/gateway/payout_failure_policy_test.go @@ -2,28 +2,73 @@ package gateway import "testing" +func retryBucketCodeSet() map[string]struct{} { + set := map[string]struct{}{} + for _, bucket := range providerRetryCodeBuckets { + if !bucket.retryable { + continue + } + for _, code := range bucket.codes { + set[normalizeProviderCode(code)] = struct{}{} + } + } + return set +} + +func allBucketCodeSet() map[string]struct{} { + set := map[string]struct{}{} + for _, bucket := range providerRetryCodeBuckets { + for _, code := range bucket.codes { + set[normalizeProviderCode(code)] = struct{}{} + } + } + return set +} + func TestPayoutFailurePolicy_DecideProviderFailure(t *testing.T) { policy := defaultPayoutFailurePolicy() cases := []struct { - name string - code string - action payoutFailureAction + name string + code string + action payoutFailureAction + strategy payoutRetryStrategy }{ { - name: "retryable provider limit code", - code: providerCodeDeclineAmountOrFrequencyLimit, - action: payoutFailureActionRetry, + name: "immediate retry strategy code", + code: "10000", + action: payoutFailureActionRetry, + strategy: payoutRetryStrategyImmediate, }, { - name: "unknown provider code", - code: "99999", - action: payoutFailureActionFail, + name: "delayed retry strategy code", + code: "10101", + action: payoutFailureActionRetry, + strategy: payoutRetryStrategyDelayed, }, { - name: "empty provider code", - code: "", - action: payoutFailureActionFail, + name: "status refresh retry strategy code", + code: "3061", + action: payoutFailureActionRetry, + strategy: payoutRetryStrategyStatusRefresh, + }, + { + name: "status refresh retry strategy payment details missing code", + code: "3062", + action: payoutFailureActionRetry, + strategy: payoutRetryStrategyStatusRefresh, + }, + { + name: "unknown provider code", + code: "99999", + action: payoutFailureActionFail, + strategy: payoutRetryStrategyImmediate, + }, + { + name: "empty provider code", + code: "", + action: payoutFailureActionFail, + strategy: payoutRetryStrategyImmediate, }, } @@ -35,6 +80,204 @@ func TestPayoutFailurePolicy_DecideProviderFailure(t *testing.T) { if got.Action != tc.action { t.Fatalf("action mismatch: got=%v want=%v", got.Action, tc.action) } + if got.Strategy != tc.strategy { + t.Fatalf("strategy mismatch: got=%v want=%v", got.Strategy, tc.strategy) + } + }) + } +} + +func TestPayoutFailurePolicy_DocumentRetryCoverage(t *testing.T) { + policy := defaultPayoutFailurePolicy() + + // Parsed from GTX response-code tables (General, RCS, external card PS, external alternate PS, merchant system): + // 32 immediate + 84 delayed + 6 status-refresh = 122 retryable codes. + if got, want := len(policy.providerCodeStrategies), 122; got != want { + t.Fatalf("retry catalog size mismatch: got=%d want=%d", got, want) + } + if got, want := len(policy.documentedProviderCodes), 395; got != want { + t.Fatalf("documented code catalog size mismatch: got=%d want=%d", got, want) + } + + cases := []struct { + code string + strategy payoutRetryStrategy + }{ + // Immediate retry examples. + {code: "3025", strategy: payoutRetryStrategyImmediate}, + {code: "3198", strategy: payoutRetryStrategyImmediate}, + {code: "105012", strategy: payoutRetryStrategyImmediate}, + {code: "20603", strategy: payoutRetryStrategyImmediate}, + // Delayed retry examples, including previously missed high-range limits. + {code: "3106", strategy: payoutRetryStrategyDelayed}, + {code: "3337", strategy: payoutRetryStrategyDelayed}, + {code: "3407", strategy: payoutRetryStrategyDelayed}, + {code: "3613", strategy: payoutRetryStrategyDelayed}, + {code: "201014", strategy: payoutRetryStrategyDelayed}, + // Status refresh examples. + {code: "3061", strategy: payoutRetryStrategyStatusRefresh}, + {code: "3062", strategy: payoutRetryStrategyStatusRefresh}, + {code: "20802", strategy: payoutRetryStrategyStatusRefresh}, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.code, func(t *testing.T) { + t.Helper() + got := policy.decideProviderFailure(tc.code) + if got.Action != payoutFailureActionRetry { + t.Fatalf("action mismatch: got=%v want=%v", got.Action, payoutFailureActionRetry) + } + if got.Strategy != tc.strategy { + t.Fatalf("strategy mismatch: got=%v want=%v", got.Strategy, tc.strategy) + } + }) + } +} + +func TestPayoutFailurePolicy_DocumentedCodeCoverageByPolicy(t *testing.T) { + policy := defaultPayoutFailurePolicy() + retrySet := retryBucketCodeSet() + + if got, want := len(retrySet), len(policy.providerCodeStrategies); got != want { + t.Fatalf("retry set size mismatch: got=%d want=%d", got, want) + } + + documentedNonRetry := 0 + for _, code := range providerDocumentedCodes { + code := normalizeProviderCode(code) + decision := policy.decideProviderFailure(code) + + if _, isRetry := retrySet[code]; isRetry { + if decision.Action != payoutFailureActionRetry { + t.Fatalf("documented retry code %s unexpectedly classified as non-retry", code) + } + continue + } + + documentedNonRetry++ + if decision.Action != payoutFailureActionFail { + t.Fatalf("documented non-retry code %s unexpectedly classified as retry", code) + } + if decision.Reason != "provider_code_"+code+"_documented_non_retry" { + t.Fatalf("documented non-retry code %s has unexpected reason: %q", code, decision.Reason) + } + } + + if got, want := len(retrySet)+documentedNonRetry, len(providerDocumentedCodes); got != want { + t.Fatalf("coverage mismatch: retry(%d)+non_retry(%d) != documented(%d)", len(retrySet), documentedNonRetry, len(providerDocumentedCodes)) + } +} + +func TestProviderRetryCodeBuckets_DoNotOverlapAndCoverDocumentedCodes(t *testing.T) { + seen := map[string]int{} + for bucketIdx, bucket := range providerRetryCodeBuckets { + for _, rawCode := range bucket.codes { + code := normalizeProviderCode(rawCode) + if code == "" { + t.Fatalf("empty code in bucket #%d", bucketIdx) + } + if prevIdx, ok := seen[code]; ok { + t.Fatalf("overlap detected for code %s between bucket #%d and bucket #%d", code, prevIdx, bucketIdx) + } + seen[code] = bucketIdx + } + } + + allBucketCodes := allBucketCodeSet() + documented := newCodeSet(providerDocumentedCodes) + if got, want := len(allBucketCodes), len(documented); got != want { + t.Fatalf("union size mismatch: buckets=%d documented=%d", got, want) + } + + for code := range documented { + if _, ok := allBucketCodes[code]; !ok { + t.Fatalf("documented code %s is missing from providerRetryCodeBuckets union", code) + } + } + for code := range allBucketCodes { + if _, ok := documented[code]; !ok { + t.Fatalf("bucket code %s is not present in documented code list", code) + } + } +} + +func TestPayoutFailurePolicy_DecideProviderFailure_DocumentedNonRetryCode(t *testing.T) { + policy := defaultPayoutFailurePolicy() + + got := policy.decideProviderFailure("3059") + if got.Action != payoutFailureActionFail { + t.Fatalf("action mismatch: got=%v want=%v", got.Action, payoutFailureActionFail) + } + if got.Strategy != payoutRetryStrategyImmediate { + t.Fatalf("strategy mismatch: got=%v want=%v", got.Strategy, payoutRetryStrategyImmediate) + } + if got.Reason != "provider_code_3059_documented_non_retry" { + t.Fatalf("reason mismatch: got=%q", got.Reason) + } +} + +func TestPayoutFailurePolicy_DecideProviderFailure_UnknownCode(t *testing.T) { + policy := defaultPayoutFailurePolicy() + + got := policy.decideProviderFailure("99999") + if got.Action != payoutFailureActionFail { + t.Fatalf("action mismatch: got=%v want=%v", got.Action, payoutFailureActionFail) + } + if got.Strategy != payoutRetryStrategyImmediate { + t.Fatalf("strategy mismatch: got=%v want=%v", got.Strategy, payoutRetryStrategyImmediate) + } + if got.Reason != "provider_code_99999_unknown" { + t.Fatalf("reason mismatch: got=%q", got.Reason) + } +} + +func TestPayoutFailurePolicy_DecideTransportFailure(t *testing.T) { + policy := defaultPayoutFailurePolicy() + + got := policy.decideTransportFailure() + if got.Action != payoutFailureActionRetry { + t.Fatalf("action mismatch: got=%v want=%v", got.Action, payoutFailureActionRetry) + } + if got.Strategy != payoutRetryStrategyImmediate { + t.Fatalf("strategy mismatch: got=%v want=%v", got.Strategy, payoutRetryStrategyImmediate) + } +} + +func TestRetryDelayForAttempt_ByStrategy(t *testing.T) { + cases := []struct { + name string + attempt uint32 + strategy payoutRetryStrategy + wantDelay int + }{ + { + name: "immediate first attempt", + attempt: 1, + strategy: payoutRetryStrategyImmediate, + wantDelay: 5, + }, + { + name: "delayed second attempt", + attempt: 2, + strategy: payoutRetryStrategyDelayed, + wantDelay: 120, + }, + { + name: "status refresh third attempt", + attempt: 3, + strategy: payoutRetryStrategyStatusRefresh, + wantDelay: 40, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Helper() + if got := retryDelayForAttempt(tc.attempt, tc.strategy); got != tc.wantDelay { + t.Fatalf("delay mismatch: got=%d want=%d", got, tc.wantDelay) + } }) } } diff --git a/api/gateway/mntx/internal/service/gateway/transfer_notifications.go b/api/gateway/mntx/internal/service/gateway/transfer_notifications.go index fbe53201..5a7ed279 100644 --- a/api/gateway/mntx/internal/service/gateway/transfer_notifications.go +++ b/api/gateway/mntx/internal/service/gateway/transfer_notifications.go @@ -24,7 +24,7 @@ func isFinalStatus(t *model.CardPayout) bool { func isFinalPayoutStatus(status model.PayoutStatus) bool { switch status { - case model.PayoutStatusFailed, model.PayoutStatusSuccess, model.PayoutStatusCancelled: + case model.PayoutStatusFailed, model.PayoutStatusNeedsAttention, model.PayoutStatusSuccess, model.PayoutStatusCancelled: return true default: return false @@ -35,6 +35,8 @@ func toOpStatus(t *model.CardPayout) (rail.OperationResult, error) { switch t.Status { case model.PayoutStatusFailed: return rail.OperationResultFailed, nil + case model.PayoutStatusNeedsAttention: + return rail.OperationResultFailed, nil case model.PayoutStatusSuccess: return rail.OperationResultSuccess, nil case model.PayoutStatusCancelled: diff --git a/api/gateway/mntx/internal/service/gateway/transfer_notifications_test.go b/api/gateway/mntx/internal/service/gateway/transfer_notifications_test.go new file mode 100644 index 00000000..0b644f64 --- /dev/null +++ b/api/gateway/mntx/internal/service/gateway/transfer_notifications_test.go @@ -0,0 +1,24 @@ +package gateway + +import ( + "testing" + + "github.com/tech/sendico/gateway/mntx/storage/model" + "github.com/tech/sendico/pkg/payments/rail" +) + +func TestIsFinalPayoutStatus_NeedsAttentionIsFinal(t *testing.T) { + if !isFinalPayoutStatus(model.PayoutStatusNeedsAttention) { + t.Fatal("expected needs_attention to be final") + } +} + +func TestToOpStatus_NeedsAttentionMapsToFailed(t *testing.T) { + status, err := toOpStatus(&model.CardPayout{Status: model.PayoutStatusNeedsAttention}) + if err != nil { + t.Fatalf("toOpStatus returned error: %v", err) + } + if status != rail.OperationResultFailed { + t.Fatalf("unexpected operation result: got=%q want=%q", status, rail.OperationResultFailed) + } +} diff --git a/api/gateway/mntx/internal/service/monetix/sender.go b/api/gateway/mntx/internal/service/monetix/sender.go index b89b0b31..6b3c6e4a 100644 --- a/api/gateway/mntx/internal/service/monetix/sender.go +++ b/api/gateway/mntx/internal/service/monetix/sender.go @@ -166,25 +166,16 @@ func (c *Client) sendTokenization(ctx context.Context, req CardTokenizeRequest) } } - if apiResp.Operation.RequestID != "" { - result.ProviderRequestID = apiResp.Operation.RequestID - } else if apiResp.RequestID != "" { - result.ProviderRequestID = apiResp.RequestID - } - result.ProviderStatus = strings.TrimSpace(apiResp.Status) - if result.ProviderStatus == "" { - result.ProviderStatus = strings.TrimSpace(apiResp.Operation.Status) - } + result.ProviderRequestID = providerRequestID(apiResp) + result.ProviderStatus = providerStatus(apiResp) - if !result.Accepted { - result.ErrorCode = apiResp.Code - if result.ErrorCode == "" { + errorCode, errorMessage := providerError(apiResp) + if !result.Accepted || isProviderStatusError(result.ProviderStatus) { + result.ErrorCode = errorCode + if !result.Accepted && result.ErrorCode == "" { result.ErrorCode = http.StatusText(resp.StatusCode) } - result.ErrorMessage = apiResp.Message - if result.ErrorMessage == "" { - result.ErrorMessage = apiResp.Operation.Message - } + result.ErrorMessage = errorMessage } c.logger.Info("Monetix tokenization response", @@ -288,25 +279,16 @@ func (c *Client) send(ctx context.Context, req any, path string, dispatchLog fun } } - if apiResp.Operation.RequestID != "" { - result.ProviderRequestID = apiResp.Operation.RequestID - } else if apiResp.RequestID != "" { - result.ProviderRequestID = apiResp.RequestID - } - result.ProviderStatus = strings.TrimSpace(apiResp.Status) - if result.ProviderStatus == "" { - result.ProviderStatus = strings.TrimSpace(apiResp.Operation.Status) - } + result.ProviderRequestID = providerRequestID(apiResp) + result.ProviderStatus = providerStatus(apiResp) - if !result.Accepted { - result.ErrorCode = apiResp.Code - if result.ErrorCode == "" { + errorCode, errorMessage := providerError(apiResp) + if !result.Accepted || isProviderStatusError(result.ProviderStatus) { + result.ErrorCode = errorCode + if !result.Accepted && result.ErrorCode == "" { result.ErrorCode = http.StatusText(resp.StatusCode) } - result.ErrorMessage = apiResp.Message - if result.ErrorMessage == "" { - result.ErrorMessage = apiResp.Operation.Message - } + result.ErrorMessage = errorMessage } if responseLog != nil { @@ -324,6 +306,32 @@ func normalizeExpiryYear(year int) int { return year } +func providerRequestID(resp APIResponse) string { + return firstNonEmpty(resp.Operation.RequestID, resp.RequestID) +} + +func providerStatus(resp APIResponse) string { + return firstNonEmpty(resp.Status, resp.Operation.Status) +} + +func providerError(resp APIResponse) (code, message string) { + return firstNonEmpty(resp.Code, resp.Operation.Code), firstNonEmpty(resp.Message, resp.Operation.Message) +} + +func isProviderStatusError(status string) bool { + return strings.EqualFold(strings.TrimSpace(status), "error") +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + trimmed := strings.TrimSpace(value) + if trimmed != "" { + return trimmed + } + } + return "" +} + func normalizeRequestExpiryYear(req any) { switch r := req.(type) { case *CardPayoutRequest: diff --git a/api/gateway/mntx/internal/service/monetix/sender_test.go b/api/gateway/mntx/internal/service/monetix/sender_test.go index 236de6ab..b02ebdad 100644 --- a/api/gateway/mntx/internal/service/monetix/sender_test.go +++ b/api/gateway/mntx/internal/service/monetix/sender_test.go @@ -175,6 +175,99 @@ func TestSendCardPayout_HTTPError(t *testing.T) { } } +func TestSendCardPayout_HTTPAcceptedBodyErrorStillAccepted(t *testing.T) { + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + body := `{"status":"error","code":"3062","message":"Payment details not received"}` + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + cfg := Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + } + client := NewClient(cfg, httpClient, zap.NewNop()) + + req := CardPayoutRequest{ + General: General{ProjectID: 1, PaymentID: "payout-1"}, + Customer: Customer{ + ID: "cust-1", + FirstName: "Jane", + LastName: "Doe", + IP: "203.0.113.10", + }, + Payment: Payment{Amount: 1000, Currency: "RUB"}, + Card: Card{PAN: "4111111111111111", Year: 2030, Month: 12, CardHolder: "JANE DOE"}, + } + + result, err := client.CreateCardPayout(context.Background(), req) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if !result.Accepted { + t.Fatalf("expected accepted response") + } + if result.ProviderStatus != "error" { + t.Fatalf("expected provider status error, got %q", result.ProviderStatus) + } + if result.ErrorCode != "3062" { + t.Fatalf("expected error code %q, got %q", "3062", result.ErrorCode) + } + if result.ErrorMessage != "Payment details not received" { + t.Fatalf("expected error message, got %q", result.ErrorMessage) + } +} + +func TestSendCardPayout_HTTPErrorFallsBackToOperationCode(t *testing.T) { + httpClient := &http.Client{ + Transport: roundTripperFunc(func(r *http.Request) (*http.Response, error) { + body := `{"operation":{"code":"3061","message":"Transaction not found"}}` + return &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(strings.NewReader(body)), + Header: http.Header{"Content-Type": []string{"application/json"}}, + }, nil + }), + } + + cfg := Config{ + BaseURL: "https://monetix.test", + SecretKey: "secret", + } + client := NewClient(cfg, httpClient, zap.NewNop()) + + req := CardPayoutRequest{ + General: General{ProjectID: 1, PaymentID: "payout-1"}, + Customer: Customer{ + ID: "cust-1", + FirstName: "Jane", + LastName: "Doe", + IP: "203.0.113.10", + }, + Payment: Payment{Amount: 1000, Currency: "RUB"}, + Card: Card{PAN: "4111111111111111", Year: 2030, Month: 12, CardHolder: "JANE DOE"}, + } + + result, err := client.CreateCardPayout(context.Background(), req) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if result.Accepted { + t.Fatalf("expected rejected response") + } + if result.ErrorCode != "3061" { + t.Fatalf("expected error code %q, got %q", "3061", result.ErrorCode) + } + if result.ErrorMessage != "Transaction not found" { + t.Fatalf("expected error message, got %q", result.ErrorMessage) + } +} + type errorReadCloser struct { err error } diff --git a/api/gateway/mntx/storage/model/status.go b/api/gateway/mntx/storage/model/status.go index 1fc14d70..b64335fc 100644 --- a/api/gateway/mntx/storage/model/status.go +++ b/api/gateway/mntx/storage/model/status.go @@ -7,7 +7,8 @@ const ( PayoutStatusProcessing PayoutStatus = "processing" // we are working on it PayoutStatusWaiting PayoutStatus = "waiting" // waiting external world - PayoutStatusSuccess PayoutStatus = "success" // final success - PayoutStatusFailed PayoutStatus = "failed" // final failure - PayoutStatusCancelled PayoutStatus = "cancelled" // final cancelled + PayoutStatusSuccess PayoutStatus = "success" // final success + PayoutStatusFailed PayoutStatus = "failed" // final failure + PayoutStatusCancelled PayoutStatus = "cancelled" // final cancelled + PayoutStatusNeedsAttention PayoutStatus = "needs_attention" // final, manual review required ) From 4295456f631e1bfaa88428eeb45e7b291eb93230 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Fri, 6 Mar 2026 13:50:13 +0100 Subject: [PATCH 3/4] fixed tgsettle upsert logic --- api/gateway/tgsettle/go.mod | 2 +- api/gateway/tgsettle/go.sum | 4 +- .../tgsettle/storage/mongo/store/payments.go | 43 +-- .../storage/mongo/store/payments_test.go | 251 ++++++++++++++++++ 4 files changed, 278 insertions(+), 22 deletions(-) create mode 100644 api/gateway/tgsettle/storage/mongo/store/payments_test.go diff --git a/api/gateway/tgsettle/go.mod b/api/gateway/tgsettle/go.mod index 37efef2d..cab2452f 100644 --- a/api/gateway/tgsettle/go.mod +++ b/api/gateway/tgsettle/go.mod @@ -11,7 +11,7 @@ require ( github.com/tech/sendico/pkg v0.1.0 go.mongodb.org/mongo-driver/v2 v2.5.0 go.uber.org/zap v1.27.1 - google.golang.org/grpc v1.79.1 + google.golang.org/grpc v1.79.2 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/api/gateway/tgsettle/go.sum b/api/gateway/tgsettle/go.sum index 5261ce70..f8c3c7eb 100644 --- a/api/gateway/tgsettle/go.sum +++ b/api/gateway/tgsettle/go.sum @@ -210,8 +210,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 h1:ggcbiqK8WWh6l1dnltU4BgWGIGo+EVYxCaAPih/zQXQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= -google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= -google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU= +google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/api/gateway/tgsettle/storage/mongo/store/payments.go b/api/gateway/tgsettle/storage/mongo/store/payments.go index 8b3c0fbe..11759af5 100644 --- a/api/gateway/tgsettle/storage/mongo/store/payments.go +++ b/api/gateway/tgsettle/storage/mongo/store/payments.go @@ -119,25 +119,30 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro return merrors.InvalidArgument("intention reference key is required", "intent_ref") } - filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey) - err := p.repo.Insert(ctx, record, filter) - if errors.Is(err, merrors.ErrDataConflict) { - patch := repository.Patch(). - Set(repository.Field(fieldOperationRef), record.OperationRef). - Set(repository.Field("paymentIntentId"), record.PaymentIntentID). - Set(repository.Field("quoteRef"), record.QuoteRef). - Set(repository.Field("intentRef"), record.IntentRef). - Set(repository.Field("paymentRef"), record.PaymentRef). - Set(repository.Field("outgoingLeg"), record.OutgoingLeg). - Set(repository.Field("targetChatId"), record.TargetChatID). - Set(repository.Field("requestedMoney"), record.RequestedMoney). - Set(repository.Field("executedMoney"), record.ExecutedMoney). - Set(repository.Field("status"), record.Status). - Set(repository.Field("failureReason"), record.FailureReason). - Set(repository.Field("executedAt"), record.ExecutedAt). - Set(repository.Field("expiresAt"), record.ExpiresAt). - Set(repository.Field("expiredAt"), record.ExpiredAt) - _, err = p.repo.PatchMany(ctx, filter, patch) + existing, err := p.FindByIdempotencyKey(ctx, record.IdempotencyKey) + if err != nil { + return err + } + if existing != nil { + record.ID = existing.ID + if record.CreatedAt.IsZero() { + record.CreatedAt = existing.CreatedAt + } + } + + err = p.repo.Upsert(ctx, record) + if mongo.IsDuplicateKeyError(err) { + // Concurrent insert by idempotency key: resolve existing ID and retry replace-by-ID. + existing, lookupErr := p.FindByIdempotencyKey(ctx, record.IdempotencyKey) + if lookupErr != nil { + err = lookupErr + } else if existing != nil { + record.ID = existing.ID + if record.CreatedAt.IsZero() { + record.CreatedAt = existing.CreatedAt + } + err = p.repo.Upsert(ctx, record) + } } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { diff --git a/api/gateway/tgsettle/storage/mongo/store/payments_test.go b/api/gateway/tgsettle/storage/mongo/store/payments_test.go new file mode 100644 index 00000000..6097c434 --- /dev/null +++ b/api/gateway/tgsettle/storage/mongo/store/payments_test.go @@ -0,0 +1,251 @@ +package store + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/tech/sendico/gateway/tgsettle/storage/model" + "github.com/tech/sendico/pkg/db/repository" + "github.com/tech/sendico/pkg/db/storable" + "github.com/tech/sendico/pkg/merrors" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.uber.org/zap" +) + +type fakePaymentsRepo struct { + repository.Repository + + records map[string]*model.PaymentRecord + findErrByCall map[int]error + duplicateWhenZeroID bool + findCalls int + upsertCalls int + upsertIDs []bson.ObjectID + upsertIdempotencyKey []string +} + +func (f *fakePaymentsRepo) FindOneByFilter(_ context.Context, query repository.FilterQuery, result storable.Storable) error { + f.findCalls++ + if err, ok := f.findErrByCall[f.findCalls]; ok { + return err + } + + rec, ok := result.(*model.PaymentRecord) + if !ok { + return merrors.InvalidDataType("expected *model.PaymentRecord") + } + + doc := query.BuildQuery() + if key := stringField(doc, fieldIdempotencyKey); key != "" { + stored, ok := f.records[key] + if !ok { + return merrors.NoData("payment not found by filter") + } + *rec = *stored + return nil + } + if operationRef := stringField(doc, fieldOperationRef); operationRef != "" { + for _, stored := range f.records { + if strings.TrimSpace(stored.OperationRef) == operationRef { + *rec = *stored + return nil + } + } + return merrors.NoData("payment not found by operation ref") + } + + return merrors.NoData("payment not found") +} + +func (f *fakePaymentsRepo) Upsert(_ context.Context, obj storable.Storable) error { + f.upsertCalls++ + + rec, ok := obj.(*model.PaymentRecord) + if !ok { + return merrors.InvalidDataType("expected *model.PaymentRecord") + } + f.upsertIDs = append(f.upsertIDs, rec.ID) + f.upsertIdempotencyKey = append(f.upsertIdempotencyKey, rec.IdempotencyKey) + + if f.duplicateWhenZeroID && rec.ID.IsZero() { + if _, exists := f.records[rec.IdempotencyKey]; exists { + return mongo.WriteException{ + WriteErrors: mongo.WriteErrors{ + { + Code: 11000, + Message: "E11000 duplicate key error collection: tgsettle_gateway.payments", + }, + }, + } + } + } + + copyRec := *rec + if copyRec.ID.IsZero() { + copyRec.ID = bson.NewObjectID() + } + if copyRec.CreatedAt.IsZero() { + copyRec.CreatedAt = time.Now().UTC() + } + copyRec.UpdatedAt = time.Now().UTC() + if f.records == nil { + f.records = map[string]*model.PaymentRecord{} + } + f.records[copyRec.IdempotencyKey] = ©Rec + *rec = copyRec + return nil +} + +func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) { + key := "idem-existing" + existingID := bson.NewObjectID() + existingCreatedAt := time.Date(2026, 3, 6, 10, 0, 0, 0, time.UTC) + + repo := &fakePaymentsRepo{ + records: map[string]*model.PaymentRecord{ + key: { + Base: storable.Base{ + ID: existingID, + CreatedAt: existingCreatedAt, + UpdatedAt: existingCreatedAt, + }, + IdempotencyKey: key, + PaymentIntentID: "pi-old", + IntentRef: "intent-old", + }, + }, + duplicateWhenZeroID: true, + } + store := &Payments{logger: zap.NewNop(), repo: repo} + + record := &model.PaymentRecord{ + IdempotencyKey: key, + PaymentIntentID: "pi-new", + QuoteRef: "quote-new", + IntentRef: "intent-new", + } + + if err := store.Upsert(context.Background(), record); err != nil { + t.Fatalf("upsert failed: %v", err) + } + + if repo.upsertCalls != 1 { + t.Fatalf("expected one upsert call, got %d", repo.upsertCalls) + } + if len(repo.upsertIDs) != 1 || repo.upsertIDs[0] != existingID { + t.Fatalf("expected upsert to reuse existing id %s, got %+v", existingID.Hex(), repo.upsertIDs) + } + if record.ID != existingID { + t.Fatalf("record ID mismatch: got %s want %s", record.ID.Hex(), existingID.Hex()) + } +} + +func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) { + key := "idem-race" + existingID := bson.NewObjectID() + + repo := &fakePaymentsRepo{ + records: map[string]*model.PaymentRecord{ + key: { + Base: storable.Base{ + ID: existingID, + CreatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC), + UpdatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC), + }, + IdempotencyKey: key, + PaymentIntentID: "pi-existing", + IntentRef: "intent-existing", + }, + }, + findErrByCall: map[int]error{ + 1: merrors.NoData("payment not found by filter"), + }, + duplicateWhenZeroID: true, + } + store := &Payments{logger: zap.NewNop(), repo: repo} + + record := &model.PaymentRecord{ + IdempotencyKey: key, + PaymentIntentID: "pi-new", + QuoteRef: "quote-new", + IntentRef: "intent-new", + } + + if err := store.Upsert(context.Background(), record); err != nil { + t.Fatalf("upsert failed: %v", err) + } + + if repo.upsertCalls != 2 { + t.Fatalf("expected two upsert calls, got %d", repo.upsertCalls) + } + if len(repo.upsertIDs) != 2 { + t.Fatalf("expected two upsert IDs, got %d", len(repo.upsertIDs)) + } + if !repo.upsertIDs[0].IsZero() { + t.Fatalf("expected first upsert to use zero id due stale read, got %s", repo.upsertIDs[0].Hex()) + } + if repo.upsertIDs[1] != existingID { + t.Fatalf("expected retry to use existing id %s, got %s", existingID.Hex(), repo.upsertIDs[1].Hex()) + } +} + +func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T) { + key := "idem-nosuchtx" + + repo := &fakePaymentsRepo{ + records: map[string]*model.PaymentRecord{ + key: { + Base: storable.Base{ + ID: bson.NewObjectID(), + CreatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC), + UpdatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC), + }, + IdempotencyKey: key, + PaymentIntentID: "pi-existing", + IntentRef: "intent-existing", + }, + }, + findErrByCall: map[int]error{ + 1: merrors.NoData("payment not found by filter"), + 2: mongo.CommandError{ + Code: 251, + Name: "NoSuchTransaction", + Message: "Transaction with { txnNumber: 2 } has been aborted.", + }, + }, + duplicateWhenZeroID: true, + } + store := &Payments{logger: zap.NewNop(), repo: repo} + + record := &model.PaymentRecord{ + IdempotencyKey: key, + PaymentIntentID: "pi-new", + QuoteRef: "quote-new", + IntentRef: "intent-new", + } + + err := store.Upsert(context.Background(), record) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "NoSuchTransaction") { + t.Fatalf("expected NoSuchTransaction error, got %v", err) + } + if repo.upsertCalls != 1 { + t.Fatalf("expected one upsert attempt before lookup failure, got %d", repo.upsertCalls) + } +} + +func stringField(doc bson.D, key string) string { + for _, entry := range doc { + if entry.Key != key { + continue + } + res, _ := entry.Value.(string) + return strings.TrimSpace(res) + } + return "" +} From 3295b9d9f0375b943f05dcbb3cd64da523eee7fa Mon Sep 17 00:00:00 2001 From: Stephan D Date: Fri, 6 Mar 2026 15:12:14 +0100 Subject: [PATCH 4/4] fixed po <-> tgsettle contract --- .../internal/service/gateway/connector.go | 13 +- .../service/gateway/connector_test.go | 119 ++++++++++ .../tgsettle/storage/model/execution.go | 35 +-- .../tgsettle/storage/mongo/store/payments.go | 7 +- .../storage/mongo/store/payments_test.go | 36 ++- api/payments/orchestrator/go.mod | 2 +- api/payments/orchestrator/go.sum | 4 +- .../service/orchestrationv2/agg/module.go | 6 + .../service/orchestrationv2/agg/service.go | 6 + .../orchestrationv2/agg/service_test.go | 20 +- .../service/orchestrationv2/psvc/execute.go | 3 + .../service/orchestrationv2/psvc/runtime.go | 9 + .../service/orchestrationv2/ssched/input.go | 17 ++ .../service/orchestrator/external_runtime.go | 125 +++++++--- .../orchestrator/external_runtime_test.go | 214 +++++++++++++++++- 15 files changed, 540 insertions(+), 76 deletions(-) create mode 100644 api/gateway/tgsettle/internal/service/gateway/connector_test.go diff --git a/api/gateway/tgsettle/internal/service/gateway/connector.go b/api/gateway/tgsettle/internal/service/gateway/connector.go index f287fd9d..8a7d24e4 100644 --- a/api/gateway/tgsettle/internal/service/gateway/connector.go +++ b/api/gateway/tgsettle/internal/service/gateway/connector.go @@ -136,13 +136,22 @@ func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOp return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, "")}}, nil } transfer := resp.GetTransfer() + operationID := strings.TrimSpace(transfer.GetOperationRef()) + if operationID == "" { + s.logger.Warn("Submit operation transfer response missing operation_ref", append(logFields, + zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())), + )...) + return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{ + Error: connectorError(connectorv1.ErrorCode_TEMPORARY_UNAVAILABLE, "submit_operation: operation_ref is missing in transfer response", op, ""), + }}, nil + } s.logger.Info("Submit operation transfer submitted", append(logFields, zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())), zap.String("status", transfer.GetStatus().String()), )...) return &connectorv1.SubmitOperationResponse{ Receipt: &connectorv1.OperationReceipt{ - OperationId: strings.TrimSpace(transfer.GetTransferRef()), + OperationId: operationID, Status: transferStatusToOperation(transfer.GetStatus()), ProviderRef: strings.TrimSpace(transfer.GetTransferRef()), }, @@ -224,7 +233,7 @@ func transferToOperation(transfer *chainv1.Transfer) *connectorv1.Operation { return nil } op := &connectorv1.Operation{ - OperationId: strings.TrimSpace(transfer.GetTransferRef()), + OperationId: strings.TrimSpace(transfer.GetOperationRef()), Type: connectorv1.OperationType_TRANSFER, Status: transferStatusToOperation(transfer.GetStatus()), Money: transfer.GetRequestedAmount(), diff --git a/api/gateway/tgsettle/internal/service/gateway/connector_test.go b/api/gateway/tgsettle/internal/service/gateway/connector_test.go new file mode 100644 index 00000000..4916e606 --- /dev/null +++ b/api/gateway/tgsettle/internal/service/gateway/connector_test.go @@ -0,0 +1,119 @@ +package gateway + +import ( + "context" + "testing" + + storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" + paymenttypes "github.com/tech/sendico/pkg/payments/types" + moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" + connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestSubmitOperation_UsesOperationRefAsOperationID(t *testing.T) { + svc, _, _ := newTestService(t) + svc.chatID = "1" + + req := &connectorv1.SubmitOperationRequest{ + Operation: &connectorv1.Operation{ + Type: connectorv1.OperationType_TRANSFER, + IdempotencyKey: "idem-settlement-1", + OperationRef: "payment-1:hop_2_settlement_fx_convert", + IntentRef: "intent-1", + Money: &moneyv1.Money{Amount: "1.00", Currency: "USDT"}, + From: &connectorv1.OperationParty{ + Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{ + ConnectorId: tgsettleConnectorID, + AccountId: "wallet-src", + }}, + }, + To: &connectorv1.OperationParty{ + Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{ + ConnectorId: tgsettleConnectorID, + AccountId: "wallet-dst", + }}, + }, + Params: structFromMap(map[string]interface{}{ + "payment_ref": "payment-1", + "organization_ref": "org-1", + }), + }, + } + + resp, err := svc.SubmitOperation(context.Background(), req) + if err != nil { + t.Fatalf("SubmitOperation returned error: %v", err) + } + if resp.GetReceipt() == nil { + t.Fatal("expected receipt") + } + if got := resp.GetReceipt().GetError(); got != nil { + t.Fatalf("expected no connector error, got: %v", got) + } + if got, want := resp.GetReceipt().GetOperationId(), "payment-1:hop_2_settlement_fx_convert"; got != want { + t.Fatalf("operation_id mismatch: got=%q want=%q", got, want) + } + if got, want := resp.GetReceipt().GetProviderRef(), "idem-settlement-1"; got != want { + t.Fatalf("provider_ref mismatch: got=%q want=%q", got, want) + } +} + +func TestGetOperation_UsesOperationRefIdentity(t *testing.T) { + svc, repo, _ := newTestService(t) + + record := &storagemodel.PaymentRecord{ + IdempotencyKey: "idem-settlement-2", + OperationRef: "payment-2:hop_2_settlement_fx_convert", + PaymentIntentID: "pi-2", + PaymentRef: "payment-2", + RequestedMoney: &paymenttypes.Money{Amount: "5.00", Currency: "USDT"}, + Status: storagemodel.PaymentStatusSuccess, + } + if err := repo.payments.Upsert(context.Background(), record); err != nil { + t.Fatalf("failed to seed payment record: %v", err) + } + + resp, err := svc.GetOperation(context.Background(), &connectorv1.GetOperationRequest{ + OperationId: "payment-2:hop_2_settlement_fx_convert", + }) + if err != nil { + t.Fatalf("GetOperation returned error: %v", err) + } + if resp.GetOperation() == nil { + t.Fatal("expected operation") + } + if got, want := resp.GetOperation().GetOperationId(), "payment-2:hop_2_settlement_fx_convert"; got != want { + t.Fatalf("operation_id mismatch: got=%q want=%q", got, want) + } + if got, want := resp.GetOperation().GetProviderRef(), "idem-settlement-2"; got != want { + t.Fatalf("provider_ref mismatch: got=%q want=%q", got, want) + } +} + +func TestGetOperation_DoesNotResolveByIdempotencyKey(t *testing.T) { + svc, repo, _ := newTestService(t) + + record := &storagemodel.PaymentRecord{ + IdempotencyKey: "idem-settlement-3", + OperationRef: "payment-3:hop_2_settlement_fx_convert", + PaymentIntentID: "pi-3", + PaymentRef: "payment-3", + RequestedMoney: &paymenttypes.Money{Amount: "5.00", Currency: "USDT"}, + Status: storagemodel.PaymentStatusSuccess, + } + if err := repo.payments.Upsert(context.Background(), record); err != nil { + t.Fatalf("failed to seed payment record: %v", err) + } + + _, err := svc.GetOperation(context.Background(), &connectorv1.GetOperationRequest{ + OperationId: "idem-settlement-3", + }) + if err == nil { + t.Fatal("expected not found error") + } + if status.Code(err) != codes.NotFound { + t.Fatalf("unexpected error code: got=%s want=%s", status.Code(err), codes.NotFound) + } +} diff --git a/api/gateway/tgsettle/storage/model/execution.go b/api/gateway/tgsettle/storage/model/execution.go index fea1ffc2..194d7a19 100644 --- a/api/gateway/tgsettle/storage/model/execution.go +++ b/api/gateway/tgsettle/storage/model/execution.go @@ -20,22 +20,25 @@ const ( ) type PaymentRecord struct { - storable.Base `bson:",inline" json:",inline"` - OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"` - IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` - PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` - QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` - IntentRef string `bson:"intentRef,omitempty" json:"intent_ref,omitempty"` - PaymentRef string `bson:"paymentRef,omitempty" json:"payment_ref,omitempty"` - OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"` - TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"` - RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"` - ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"` - Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"` - FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"` - ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` - ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"` - ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"` + storable.Base `bson:",inline" json:",inline"` + OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"` + IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` + ConfirmationRef string `bson:"confirmationRef,omitempty" json:"confirmation_ref,omitempty"` + ConfirmationMessageID string `bson:"confirmationMessageId,omitempty" json:"confirmation_message_id,omitempty"` + ConfirmationReplyMessageID string `bson:"confirmationReplyMessageId,omitempty" json:"confirmation_reply_message_id,omitempty"` + PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` + QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` + IntentRef string `bson:"intentRef,omitempty" json:"intent_ref,omitempty"` + PaymentRef string `bson:"paymentRef,omitempty" json:"payment_ref,omitempty"` + OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"` + TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"` + RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"` + ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"` + Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"` + FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"` + ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` + ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"` + ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"` } type TelegramConfirmation struct { diff --git a/api/gateway/tgsettle/storage/mongo/store/payments.go b/api/gateway/tgsettle/storage/mongo/store/payments.go index 11759af5..3a8f83e6 100644 --- a/api/gateway/tgsettle/storage/mongo/store/payments.go +++ b/api/gateway/tgsettle/storage/mongo/store/payments.go @@ -103,14 +103,13 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro return merrors.InvalidArgument("payment record is nil", "record") } record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey) - record.PaymentIntentID = strings.TrimSpace(record.PaymentIntentID) record.QuoteRef = strings.TrimSpace(record.QuoteRef) record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg) record.TargetChatID = strings.TrimSpace(record.TargetChatID) record.IntentRef = strings.TrimSpace(record.IntentRef) record.OperationRef = strings.TrimSpace(record.OperationRef) - if record.PaymentIntentID == "" { - return merrors.InvalidArgument("intention reference is required", "payment_intent_ref") + if record.IntentRef == "" { + return merrors.InvalidArgument("intention reference is required", "intent_ref") } if record.IdempotencyKey == "" { return merrors.InvalidArgument("idempotency key is required", "idempotency_key") @@ -148,7 +147,7 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Failed to upsert payment record", zap.String("idempotency_key", record.IdempotencyKey), - zap.String("payment_intent_id", record.PaymentIntentID), + zap.String("intent_ref", record.IntentRef), zap.String("quote_ref", record.QuoteRef), zap.Error(err)) } diff --git a/api/gateway/tgsettle/storage/mongo/store/payments_test.go b/api/gateway/tgsettle/storage/mongo/store/payments_test.go index 6097c434..d11d810e 100644 --- a/api/gateway/tgsettle/storage/mongo/store/payments_test.go +++ b/api/gateway/tgsettle/storage/mongo/store/payments_test.go @@ -112,9 +112,8 @@ func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) { CreatedAt: existingCreatedAt, UpdatedAt: existingCreatedAt, }, - IdempotencyKey: key, - PaymentIntentID: "pi-old", - IntentRef: "intent-old", + IdempotencyKey: key, + IntentRef: "pi-old", }, }, duplicateWhenZeroID: true, @@ -122,10 +121,9 @@ func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) { store := &Payments{logger: zap.NewNop(), repo: repo} record := &model.PaymentRecord{ - IdempotencyKey: key, - PaymentIntentID: "pi-new", - QuoteRef: "quote-new", - IntentRef: "intent-new", + IdempotencyKey: key, + IntentRef: "pi-new", + QuoteRef: "quote-new", } if err := store.Upsert(context.Background(), record); err != nil { @@ -155,9 +153,8 @@ func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) { CreatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC), UpdatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC), }, - IdempotencyKey: key, - PaymentIntentID: "pi-existing", - IntentRef: "intent-existing", + IdempotencyKey: key, + IntentRef: "pi-existing", }, }, findErrByCall: map[int]error{ @@ -168,10 +165,9 @@ func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) { store := &Payments{logger: zap.NewNop(), repo: repo} record := &model.PaymentRecord{ - IdempotencyKey: key, - PaymentIntentID: "pi-new", - QuoteRef: "quote-new", - IntentRef: "intent-new", + IdempotencyKey: key, + IntentRef: "pi-new", + QuoteRef: "quote-new", } if err := store.Upsert(context.Background(), record); err != nil { @@ -203,9 +199,8 @@ func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T) CreatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC), UpdatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC), }, - IdempotencyKey: key, - PaymentIntentID: "pi-existing", - IntentRef: "intent-existing", + IdempotencyKey: key, + IntentRef: "pi-existing", }, }, findErrByCall: map[int]error{ @@ -221,10 +216,9 @@ func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T) store := &Payments{logger: zap.NewNop(), repo: repo} record := &model.PaymentRecord{ - IdempotencyKey: key, - PaymentIntentID: "pi-new", - QuoteRef: "quote-new", - IntentRef: "intent-new", + IdempotencyKey: key, + IntentRef: "pi-new", + QuoteRef: "quote-new", } err := store.Upsert(context.Background(), record) diff --git a/api/payments/orchestrator/go.mod b/api/payments/orchestrator/go.mod index 8552be8d..d4fe9d97 100644 --- a/api/payments/orchestrator/go.mod +++ b/api/payments/orchestrator/go.mod @@ -27,7 +27,7 @@ require ( github.com/tech/sendico/pkg v0.1.0 go.mongodb.org/mongo-driver/v2 v2.5.0 go.uber.org/zap v1.27.1 - google.golang.org/grpc v1.79.1 + google.golang.org/grpc v1.79.2 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/api/payments/orchestrator/go.sum b/api/payments/orchestrator/go.sum index 4fb8cd3c..edeb912c 100644 --- a/api/payments/orchestrator/go.sum +++ b/api/payments/orchestrator/go.sum @@ -213,8 +213,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 h1:ggcbiqK8WWh6l1dnltU4BgWGIGo+EVYxCaAPih/zQXQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= -google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= -google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU= +google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/agg/module.go b/api/payments/orchestrator/internal/service/orchestrationv2/agg/module.go index 2402ee11..0a60069a 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/agg/module.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/agg/module.go @@ -45,6 +45,9 @@ const ( type StepShell struct { StepRef string `bson:"stepRef" json:"stepRef"` StepCode string `bson:"stepCode" json:"stepCode"` + Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"` + Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"` + InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"` ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"` UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"` } @@ -53,6 +56,9 @@ type StepShell struct { type StepExecution struct { StepRef string `bson:"stepRef" json:"stepRef"` StepCode string `bson:"stepCode" json:"stepCode"` + Rail model.Rail `bson:"rail,omitempty" json:"rail,omitempty"` + Gateway string `bson:"gateway,omitempty" json:"gateway,omitempty"` + InstanceID string `bson:"instanceId,omitempty" json:"instanceId,omitempty"` ReportVisibility model.ReportVisibility `bson:"reportVisibility,omitempty" json:"reportVisibility,omitempty"` UserLabel string `bson:"userLabel,omitempty" json:"userLabel,omitempty"` State StepState `bson:"state" json:"state"` diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/agg/service.go b/api/payments/orchestrator/internal/service/orchestrationv2/agg/service.go index b3affe03..a6104db6 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/agg/service.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/agg/service.go @@ -143,10 +143,16 @@ func buildInitialStepTelemetry(shell []StepShell) ([]StepExecution, error) { return nil, merrors.InvalidArgument("steps[" + itoa(i) + "].report_visibility is invalid") } userLabel := strings.TrimSpace(shell[i].UserLabel) + railValue := model.ParseRail(string(shell[i].Rail)) + gatewayID := strings.TrimSpace(shell[i].Gateway) + instanceID := strings.TrimSpace(shell[i].InstanceID) out = append(out, StepExecution{ StepRef: stepRef, StepCode: stepCode, + Rail: railValue, + Gateway: gatewayID, + InstanceID: instanceID, ReportVisibility: visibility, UserLabel: userLabel, State: StepStatePending, diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/agg/service_test.go b/api/payments/orchestrator/internal/service/orchestrationv2/agg/service_test.go index fd551e90..e4946fd5 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/agg/service_test.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/agg/service_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" paymenttypes "github.com/tech/sendico/pkg/payments/types" "go.mongodb.org/mongo-driver/v2/bson" @@ -42,7 +43,15 @@ func TestCreate_OK(t *testing.T) { QuoteSnapshot: quote, Steps: []StepShell{ {StepRef: " s1 ", StepCode: " reserve_funds ", ReportVisibility: model.ReportVisibilityHidden}, - {StepRef: "s2", StepCode: "submit_gateway", ReportVisibility: model.ReportVisibilityUser, UserLabel: " Card payout "}, + { + StepRef: "s2", + StepCode: "submit_gateway", + Rail: discovery.RailProviderSettlement, + Gateway: "payment_gateway_settlement", + InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + ReportVisibility: model.ReportVisibilityUser, + UserLabel: " Card payout ", + }, }, }) if err != nil { @@ -111,6 +120,15 @@ func TestCreate_OK(t *testing.T) { if got, want := payment.StepExecutions[1].UserLabel, "Card payout"; got != want { t.Fatalf("unexpected second step user label: got=%q want=%q", got, want) } + if got, want := payment.StepExecutions[1].Rail, model.Rail(discovery.RailProviderSettlement); got != want { + t.Fatalf("unexpected second step rail: got=%q want=%q", got, want) + } + if got, want := payment.StepExecutions[1].Gateway, "payment_gateway_settlement"; got != want { + t.Fatalf("unexpected second step gateway: got=%q want=%q", got, want) + } + if got, want := payment.StepExecutions[1].InstanceID, "04a54fec-20f4-4250-a715-eb9886e13e12"; got != want { + t.Fatalf("unexpected second step instance_id: got=%q want=%q", got, want) + } // Verify immutable snapshot semantics by ensuring clones were created. payment.IntentSnapshot.Ref = "changed" diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute.go index 273da8bd..0d834db6 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/execute.go @@ -221,6 +221,9 @@ func toStepShells(graph *xplan.Graph) []agg.StepShell { out = append(out, agg.StepShell{ StepRef: graph.Steps[i].StepRef, StepCode: graph.Steps[i].StepCode, + Rail: graph.Steps[i].Rail, + Gateway: graph.Steps[i].Gateway, + InstanceID: graph.Steps[i].InstanceID, ReportVisibility: graph.Steps[i].Visibility, UserLabel: graph.Steps[i].UserLabel, }) diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/runtime.go b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/runtime.go index 98ad3b5c..39fd00b3 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/psvc/runtime.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/psvc/runtime.go @@ -408,6 +408,15 @@ func stepExecutionEqual(left, right agg.StepExecution) bool { if left.StepRef != right.StepRef || left.StepCode != right.StepCode { return false } + if left.Rail != right.Rail { + return false + } + if strings.TrimSpace(left.Gateway) != strings.TrimSpace(right.Gateway) { + return false + } + if strings.TrimSpace(left.InstanceID) != strings.TrimSpace(right.InstanceID) { + return false + } if left.State != right.State || left.Attempt != right.Attempt { return false } diff --git a/api/payments/orchestrator/internal/service/orchestrationv2/ssched/input.go b/api/payments/orchestrator/internal/service/orchestrationv2/ssched/input.go index 0def2253..edc87c4f 100644 --- a/api/payments/orchestrator/internal/service/orchestrationv2/ssched/input.go +++ b/api/payments/orchestrator/internal/service/orchestrationv2/ssched/input.go @@ -6,6 +6,7 @@ import ( "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan" "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" ) @@ -144,6 +145,16 @@ func (s *svc) normalizeStepExecutions( stepCode = stepsByRef[stepRef].StepCode } exec.StepCode = stepCode + step := stepsByRef[stepRef] + if exec.Rail == discovery.RailUnspecified { + exec.Rail = step.Rail + } + if strings.TrimSpace(exec.Gateway) == "" { + exec.Gateway = strings.TrimSpace(step.Gateway) + } + if strings.TrimSpace(exec.InstanceID) == "" { + exec.InstanceID = strings.TrimSpace(step.InstanceID) + } exec.ReportVisibility = effectiveStepVisibility(exec.ReportVisibility, stepsByRef[stepRef].Visibility) exec.UserLabel = firstNonEmpty(exec.UserLabel, stepsByRef[stepRef].UserLabel) cloned := cloneStepExecution(exec) @@ -158,6 +169,9 @@ func (s *svc) normalizeStepExecution(exec agg.StepExecution, index int) (agg.Ste exec.FailureCode = strings.TrimSpace(exec.FailureCode) exec.FailureMsg = strings.TrimSpace(exec.FailureMsg) exec.UserLabel = strings.TrimSpace(exec.UserLabel) + exec.Gateway = strings.TrimSpace(exec.Gateway) + exec.InstanceID = strings.TrimSpace(exec.InstanceID) + exec.Rail = model.ParseRail(string(exec.Rail)) exec.ReportVisibility = model.NormalizeReportVisibility(exec.ReportVisibility) exec.ExternalRefs = cloneExternalRefs(exec.ExternalRefs) if exec.StepRef == "" { @@ -197,6 +211,9 @@ func seedMissingExecutions( executionsByRef[stepRef] = &agg.StepExecution{ StepRef: step.StepRef, StepCode: step.StepCode, + Rail: step.Rail, + Gateway: strings.TrimSpace(step.Gateway), + InstanceID: strings.TrimSpace(step.InstanceID), ReportVisibility: effectiveStepVisibility(model.ReportVisibilityUnspecified, step.Visibility), UserLabel: strings.TrimSpace(step.UserLabel), State: agg.StepStatePending, diff --git a/api/payments/orchestrator/internal/service/orchestrator/external_runtime.go b/api/payments/orchestrator/internal/service/orchestrator/external_runtime.go index b9770c38..add5f3c1 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/external_runtime.go +++ b/api/payments/orchestrator/internal/service/orchestrator/external_runtime.go @@ -13,7 +13,6 @@ import ( "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc" - "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/xplan" "github.com/tech/sendico/payments/storage/model" cons "github.com/tech/sendico/pkg/messaging/consumer" paymentgatewaynotifications "github.com/tech/sendico/pkg/messaging/notifications/paymentgateway" @@ -412,6 +411,9 @@ func buildObserveCandidate(step agg.StepExecution) (runningObserveCandidate, boo } } } + if candidate.gatewayInstanceID == "" { + candidate.gatewayInstanceID = strings.TrimSpace(step.InstanceID) + } if candidate.stepRef == "" || candidate.transferRef == "" { return runningObserveCandidate{}, false } @@ -475,7 +477,7 @@ func (s *Service) pollObserveCandidate(ctx context.Context, payment *agg.Payment StepRef: candidate.stepRef, OperationRef: firstNonEmpty(strings.TrimSpace(transfer.GetOperationRef()), candidate.operationRef), TransferRef: strings.TrimSpace(candidate.transferRef), - GatewayInstanceID: firstNonEmpty(candidate.gatewayInstanceID, strings.TrimSpace(gateway.InstanceID), strings.TrimSpace(gateway.ID)), + GatewayInstanceID: resolvedObserveGatewayID(candidate.gatewayInstanceID, gateway), Status: status, } switch status { @@ -517,39 +519,106 @@ func (s *Service) pollObserveCandidate(ctx context.Context, payment *agg.Payment } func (s *Service) resolveObserveGateway(ctx context.Context, payment *agg.Payment, candidate runningObserveCandidate) (*model.GatewayInstanceDescriptor, error) { + if s == nil || s.gatewayRegistry == nil { + return nil, errors.New("observe polling: gateway registry is unavailable") + } + items, err := s.gatewayRegistry.List(ctx) + if err != nil { + return nil, err + } + hint, hasHint := observeStepGatewayHint(payment, candidate.stepRef) + expectedRail := model.Rail(discovery.RailUnspecified) + if hasHint { + expectedRail = hint.rail + } if gatewayID := strings.TrimSpace(candidate.gatewayInstanceID); gatewayID != "" { - items, err := s.gatewayRegistry.List(ctx) - if err == nil { - for i := range items { - item := items[i] - if item == nil || !item.IsEnabled { - continue - } - if !strings.EqualFold(strings.TrimSpace(item.ID), gatewayID) && !strings.EqualFold(strings.TrimSpace(item.InstanceID), gatewayID) { - continue - } - if strings.TrimSpace(item.InvokeURI) == "" { - continue - } - return item, nil - } + if item := findEnabledGatewayDescriptor(items, gatewayID, expectedRail); item != nil { + return item, nil } } + if hasHint { + if item := findEnabledGatewayDescriptor(items, hint.instanceID, hint.rail); item != nil { + return item, nil + } + if item := findEnabledGatewayDescriptor(items, hint.gatewayID, hint.rail); item != nil { + return item, nil + } + } + return nil, errors.New("observe polling: gateway instance not found") +} - executor := gatewayCryptoExecutor{ - gatewayRegistry: s.gatewayRegistry, +type observeStepHint struct { + rail model.Rail + gatewayID string + instanceID string +} + +func observeStepGatewayHint(payment *agg.Payment, stepRef string) (observeStepHint, bool) { + if payment == nil { + return observeStepHint{}, false } - step := xplan.Step{ - Rail: discovery.RailCrypto, + key := strings.TrimSpace(stepRef) + if key == "" { + return observeStepHint{}, false } - if gatewayID := strings.TrimSpace(candidate.gatewayInstanceID); gatewayID != "" { - step.InstanceID = gatewayID - step.Gateway = gatewayID - } else if gateway, instanceID, ok := sourceCryptoHop(payment); ok { - step.Gateway = strings.TrimSpace(gateway) - step.InstanceID = strings.TrimSpace(instanceID) + for i := range payment.StepExecutions { + step := payment.StepExecutions[i] + if !strings.EqualFold(strings.TrimSpace(step.StepRef), key) { + continue + } + hint := observeStepHint{ + rail: model.ParseRail(string(step.Rail)), + gatewayID: strings.TrimSpace(step.Gateway), + instanceID: strings.TrimSpace(step.InstanceID), + } + if hint.gatewayID == "" && hint.instanceID == "" { + return observeStepHint{}, false + } + return hint, true } - return executor.resolveGateway(ctx, step) + return observeStepHint{}, false +} + +func findEnabledGatewayDescriptor(items []*model.GatewayInstanceDescriptor, identifier string, rail model.Rail) *model.GatewayInstanceDescriptor { + key := strings.TrimSpace(identifier) + if key == "" { + return nil + } + for i := range items { + item := items[i] + if item == nil || !item.IsEnabled || strings.TrimSpace(item.InvokeURI) == "" { + continue + } + if rail != model.Rail(discovery.RailUnspecified) && model.ParseRail(string(item.Rail)) != rail { + continue + } + if strings.EqualFold(strings.TrimSpace(item.ID), key) || strings.EqualFold(strings.TrimSpace(item.InstanceID), key) { + return item + } + } + return nil +} + +func resolvedObserveGatewayID(candidateGatewayID string, gateway *model.GatewayInstanceDescriptor) string { + candidateID := strings.TrimSpace(candidateGatewayID) + if candidateID != "" && gatewayIdentifierMatches(gateway, candidateID) { + return candidateID + } + if gateway == nil { + return "" + } + return firstNonEmpty(strings.TrimSpace(gateway.InstanceID), strings.TrimSpace(gateway.ID)) +} + +func gatewayIdentifierMatches(gateway *model.GatewayInstanceDescriptor, identifier string) bool { + if gateway == nil { + return false + } + key := strings.TrimSpace(identifier) + if key == "" { + return false + } + return strings.EqualFold(strings.TrimSpace(gateway.ID), key) || strings.EqualFold(strings.TrimSpace(gateway.InstanceID), key) } func mapTransferStatus(status chainv1.TransferStatus) (gatewayStatus erecon.GatewayStatus, terminal bool, ok bool) { diff --git a/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go b/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go index fa0ad19f..ba0e19a8 100644 --- a/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go +++ b/api/payments/orchestrator/internal/service/orchestrator/external_runtime_test.go @@ -3,14 +3,15 @@ package orchestrator import ( "context" "errors" - "github.com/tech/sendico/pkg/discovery" "testing" + chainclient "github.com/tech/sendico/gateway/chain/client" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/agg" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/erecon" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/prepo" "github.com/tech/sendico/payments/orchestrator/internal/service/orchestrationv2/psvc" "github.com/tech/sendico/payments/storage/model" + "github.com/tech/sendico/pkg/discovery" pm "github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/payments/rail" paymenttypes "github.com/tech/sendico/pkg/payments/types" @@ -412,6 +413,30 @@ func TestRunningObserveCandidates_UsesCardPayoutRefAsTransfer(t *testing.T) { } } +func TestRunningObserveCandidates_UsesPlannedStepInstanceWhenExternalRefGatewayMissing(t *testing.T) { + payment := &agg.Payment{ + StepExecutions: []agg.StepExecution{ + { + StepRef: "hop_2_settlement_observe", + StepCode: "hop.2.settlement.observe", + InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + State: agg.StepStateRunning, + ExternalRefs: []agg.ExternalRef{ + {Kind: erecon.ExternalRefKindTransfer, Ref: "trf-2"}, + }, + }, + }, + } + + candidates := runningObserveCandidates(payment) + if len(candidates) != 1 { + t.Fatalf("candidate count mismatch: got=%d want=1", len(candidates)) + } + if got, want := candidates[0].gatewayInstanceID, "04a54fec-20f4-4250-a715-eb9886e13e12"; got != want { + t.Fatalf("gateway_instance_id mismatch: got=%q want=%q", got, want) + } +} + func TestResolveObserveGateway_UsesExternalRefGatewayInstanceAcrossRails(t *testing.T) { svc := &Service{ gatewayRegistry: &fakeGatewayRegistry{ @@ -466,5 +491,192 @@ func TestResolveObserveGateway_UsesExternalRefGatewayInstanceAcrossRails(t *test } } +func TestResolveObserveGateway_UsesPlannedStepGatewayWhenExternalRefInstanceIsStale(t *testing.T) { + svc := &Service{ + gatewayRegistry: &fakeGatewayRegistry{ + items: []*model.GatewayInstanceDescriptor{ + { + ID: "payment_gateway_settlement", + InstanceID: "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4", + Rail: discovery.RailProviderSettlement, + InvokeURI: "grpc://tgsettle-gateway-new", + IsEnabled: true, + }, + { + ID: "crypto_rail_gateway_tron_mainnet", + InstanceID: "fbef2c3b-ff66-447e-8bba-fa666a955855", + Rail: discovery.RailCrypto, + InvokeURI: "grpc://tron-gateway", + IsEnabled: true, + }, + }, + }, + } + + payment := &agg.Payment{ + StepExecutions: []agg.StepExecution{ + { + StepRef: "hop_2_settlement_observe", + StepCode: "hop.2.settlement.observe", + Rail: discovery.RailProviderSettlement, + Gateway: "payment_gateway_settlement", + InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + }, + }, + } + + gateway, err := svc.resolveObserveGateway(context.Background(), payment, runningObserveCandidate{ + stepRef: "hop_2_settlement_observe", + transferRef: "trf-1", + gatewayInstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + }) + if err != nil { + t.Fatalf("resolveObserveGateway returned error: %v", err) + } + if gateway == nil { + t.Fatal("expected gateway") + } + if got, want := gateway.ID, "payment_gateway_settlement"; got != want { + t.Fatalf("gateway id mismatch: got=%q want=%q", got, want) + } + if got, want := gateway.InstanceID, "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4"; got != want { + t.Fatalf("gateway instance mismatch: got=%q want=%q", got, want) + } +} + +func TestResolveObserveGateway_FailsWhenPlannedGatewayMetadataIsMissing(t *testing.T) { + svc := &Service{ + gatewayRegistry: &fakeGatewayRegistry{ + items: []*model.GatewayInstanceDescriptor{ + { + ID: "crypto_rail_gateway_tron_mainnet", + InstanceID: "fbef2c3b-ff66-447e-8bba-fa666a955855", + Rail: discovery.RailCrypto, + InvokeURI: "grpc://tron-gateway", + IsEnabled: true, + }, + }, + }, + } + + payment := &agg.Payment{ + QuoteSnapshot: &model.PaymentQuoteSnapshot{ + Route: &paymenttypes.QuoteRouteSpecification{ + Hops: []*paymenttypes.QuoteRouteHop{ + { + Index: 1, + Rail: "CRYPTO", + Gateway: "crypto_rail_gateway_tron_mainnet", + InstanceID: "fbef2c3b-ff66-447e-8bba-fa666a955855", + Role: paymenttypes.QuoteRouteHopRoleSource, + }, + }, + }, + }, + } + + gateway, err := svc.resolveObserveGateway(context.Background(), payment, runningObserveCandidate{ + stepRef: "hop_2_settlement_observe", + transferRef: "trf-1", + gatewayInstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + }) + if err == nil { + t.Fatal("expected gateway resolution error") + } + if gateway != nil { + t.Fatal("expected nil gateway on resolution failure") + } +} + +func TestPollObserveCandidate_UsesResolvedGatewayAfterInstanceRotation(t *testing.T) { + orgID := bson.NewObjectID() + transferRef := "b6874b55-20b0-425d-9e47-d430964b1616:hop_2_settlement_fx_convert" + operationRef := "69aabf823555e083d23b2964:hop_2_settlement_fx_convert" + + var requestedTransferRef string + client := &chainclient.Fake{ + GetTransferFn: func(_ context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) { + requestedTransferRef = req.GetTransferRef() + return &chainv1.GetTransferResponse{ + Transfer: &chainv1.Transfer{ + TransferRef: req.GetTransferRef(), + OperationRef: operationRef, + Status: chainv1.TransferStatus_TRANSFER_SUCCESS, + }, + }, nil + }, + } + resolver := &fakeGatewayInvokeResolver{client: client} + v2 := &fakeExternalRuntimeV2{} + svc := &Service{ + logger: zap.NewNop(), + v2: v2, + gatewayInvokeResolver: resolver, + gatewayRegistry: &fakeGatewayRegistry{ + items: []*model.GatewayInstanceDescriptor{ + { + ID: "payment_gateway_settlement", + InstanceID: "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4", + Rail: discovery.RailProviderSettlement, + InvokeURI: "grpc://tgsettle-gateway-new", + IsEnabled: true, + }, + }, + }, + } + + payment := &agg.Payment{ + OrganizationBoundBase: pm.OrganizationBoundBase{OrganizationRef: orgID}, + PaymentRef: "69aabf823555e083d23b2964", + StepExecutions: []agg.StepExecution{ + { + StepRef: "hop_2_settlement_observe", + StepCode: "hop.2.settlement.observe", + Rail: discovery.RailProviderSettlement, + Gateway: "payment_gateway_settlement", + InstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + State: agg.StepStateRunning, + ExternalRefs: []agg.ExternalRef{ + { + GatewayInstanceID: "04a54fec-20f4-4250-a715-eb9886e13e12", + Kind: erecon.ExternalRefKindTransfer, + Ref: transferRef, + }, + }, + }, + }, + } + + candidates := runningObserveCandidates(payment) + if len(candidates) != 1 { + t.Fatalf("candidate count mismatch: got=%d want=1", len(candidates)) + } + + svc.pollObserveCandidate(context.Background(), payment, candidates[0]) + + if got, want := resolver.lastInvokeURI, "grpc://tgsettle-gateway-new"; got != want { + t.Fatalf("invoke uri mismatch: got=%q want=%q", got, want) + } + if got, want := requestedTransferRef, transferRef; got != want { + t.Fatalf("transfer_ref lookup mismatch: got=%q want=%q", got, want) + } + if v2.reconcileInput == nil || v2.reconcileInput.Event.Gateway == nil { + t.Fatal("expected reconcile gateway event") + } + gw := v2.reconcileInput.Event.Gateway + if got, want := gw.StepRef, "hop_2_settlement_observe"; got != want { + t.Fatalf("step_ref mismatch: got=%q want=%q", got, want) + } + if got, want := gw.Status, erecon.GatewayStatusSuccess; got != want { + t.Fatalf("status mismatch: got=%q want=%q", got, want) + } + if got, want := gw.OperationRef, operationRef; got != want { + t.Fatalf("operation_ref mismatch: got=%q want=%q", got, want) + } + if got, want := gw.GatewayInstanceID, "ea2600ce-3de6-4cc5-bd1e-e26ebaceb6b4"; got != want { + t.Fatalf("gateway_instance_id mismatch: got=%q want=%q", got, want) + } +} + var _ prepo.Repository = (*fakeExternalRuntimeRepo)(nil) var _ psvc.Service = (*fakeExternalRuntimeV2)(nil)