bettter message reaction and pending payments persistence

This commit is contained in:
Stephan D
2026-01-21 00:12:32 +01:00
parent abc4ddfb5b
commit 0e933ace58
7 changed files with 362 additions and 175 deletions

View File

@@ -40,3 +40,4 @@ gateway:
target_chat_id_env: TGSETTLE_GATEWAY_CHAT_ID
timeout_seconds: 259200
accepted_user_ids: []
success_reaction: "\U0001FAE1"

View File

@@ -35,10 +35,11 @@ type config struct {
}
type gatewayConfig struct {
Rail string `yaml:"rail"`
TargetChatIDEnv string `yaml:"target_chat_id_env"`
TimeoutSeconds int32 `yaml:"timeout_seconds"`
AcceptedUserIDs []string `yaml:"accepted_user_ids"`
Rail string `yaml:"rail"`
TargetChatIDEnv string `yaml:"target_chat_id_env"`
TimeoutSeconds int32 `yaml:"timeout_seconds"`
AcceptedUserIDs []string `yaml:"accepted_user_ids"`
SuccessReaction string `yaml:"success_reaction"`
}
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
@@ -94,6 +95,7 @@ func (i *Imp) Start() error {
TargetChatIDEnv: cfg.Gateway.TargetChatIDEnv,
TimeoutSeconds: cfg.Gateway.TimeoutSeconds,
AcceptedUserIDs: cfg.Gateway.AcceptedUserIDs,
SuccessReaction: cfg.Gateway.SuccessReaction,
InvokeURI: invokeURI,
}
svc := gateway.NewService(logger, repo, producer, broker, gwCfg)

View File

@@ -5,7 +5,7 @@ import (
"errors"
"os"
"strings"
"sync"
"time"
"github.com/tech/sendico/gateway/tgsettle/storage"
storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model"
@@ -36,8 +36,7 @@ import (
const (
defaultConfirmationTimeoutSeconds = 259200
executedStatus = "executed"
telegramSuccessReaction = "\u2705"
defaultTelegramSuccessReaction = "\U0001FAE1"
)
const (
@@ -54,22 +53,22 @@ type Config struct {
TargetChatIDEnv string
TimeoutSeconds int32
AcceptedUserIDs []string
SuccessReaction string
InvokeURI string
}
type Service struct {
logger mlogger.Logger
repo storage.Repository
producer msg.Producer
broker mb.Broker
cfg Config
rail string
chatID string
announcer *discovery.Announcer
invokeURI string
logger mlogger.Logger
repo storage.Repository
producer msg.Producer
broker mb.Broker
cfg Config
rail string
chatID string
announcer *discovery.Announcer
invokeURI string
successReaction string
mu sync.Mutex
pending map[string]*model.PaymentGatewayIntent
consumers []msg.Consumer
connectorv1.UnimplementedConnectorServiceServer
@@ -88,9 +87,12 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
cfg: cfg,
rail: strings.TrimSpace(cfg.Rail),
invokeURI: strings.TrimSpace(cfg.InvokeURI),
pending: map[string]*model.PaymentGatewayIntent{},
}
svc.chatID = strings.TrimSpace(readEnv(cfg.TargetChatIDEnv))
svc.successReaction = strings.TrimSpace(cfg.SuccessReaction)
if svc.successReaction == "" {
svc.successReaction = defaultTelegramSuccessReaction
}
svc.startConsumers()
svc.startAnnouncer()
return svc
@@ -186,8 +188,13 @@ func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransfe
return nil, err
}
if existing != nil {
s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", strings.TrimSpace(existing.Status)))...)
return &chainv1.SubmitTransferResponse{Transfer: transferFromExecution(existing, req)}, nil
existing, err = s.expirePaymentIfNeeded(ctx, existing)
if err != nil {
s.logger.Warn("Submit transfer status refresh failed", append(logFields, zap.Error(err))...)
return nil, err
}
s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", string(paymentStatus(existing))))...)
return &chainv1.SubmitTransferResponse{Transfer: transferFromPayment(existing, req)}, nil
}
if err := s.onIntent(ctx, intent); err != nil {
s.logger.Warn("Submit transfer intent handling failed", append(logFields, zap.Error(err))...)
@@ -218,15 +225,16 @@ func (s *Service) GetTransfer(ctx context.Context, req *chainv1.GetTransferReque
return nil, err
}
if existing != nil {
existing, err = s.expirePaymentIfNeeded(ctx, existing)
if err != nil {
s.logger.Warn("Get transfer status refresh failed", append(logFields, zap.Error(err))...)
return nil, err
}
s.logger.Info("Get transfer resolved from execution", append(logFields,
zap.String("payment_intent_id", strings.TrimSpace(existing.PaymentIntentID)),
zap.String("status", strings.TrimSpace(existing.Status)),
zap.String("status", string(paymentStatus(existing))),
)...)
return &chainv1.GetTransferResponse{Transfer: transferFromExecution(existing, nil)}, nil
}
if s.hasPending(transferRef) {
s.logger.Info("Get transfer pending", logFields...)
return &chainv1.GetTransferResponse{Transfer: transferPending(transferRef)}, nil
return &chainv1.GetTransferResponse{Transfer: transferFromPayment(existing, nil)}, nil
}
s.logger.Warn("Get transfer not found", logFields...)
return nil, status.Error(codes.NotFound, "transfer not found")
@@ -255,28 +263,40 @@ func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayInte
return merrors.Internal("payment gateway storage unavailable")
}
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, intent.IdempotencyKey)
if err != nil {
return err
}
if existing != nil {
s.logger.Info("Payment gateway intent already executed",
zap.String("idempotency_key", intent.IdempotencyKey),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("quote_ref", intent.QuoteRef),
zap.String("rail", intent.OutgoingLeg))
return nil
}
confirmReq, err := s.buildConfirmationRequest(intent)
if err != nil {
s.logger.Warn("Failed to build confirmation request", zap.Error(err), zap.String("idempotency_key", intent.IdempotencyKey), zap.String("payment_intent_id", intent.PaymentIntentID))
return err
}
if err := s.sendConfirmationRequest(confirmReq); err != nil {
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, confirmReq.RequestID)
if err != nil {
return err
}
if existing != nil {
existing, err = s.expirePaymentIfNeeded(ctx, existing)
if err != nil {
return err
}
s.logger.Info("Payment gateway intent already recorded",
zap.String("idempotency_key", confirmReq.RequestID),
zap.String("payment_intent_id", confirmReq.PaymentIntentID),
zap.String("quote_ref", confirmReq.QuoteRef),
zap.String("rail", confirmReq.Rail),
zap.String("status", string(paymentStatus(existing))))
return nil
}
record := paymentRecordFromIntent(intent, confirmReq)
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
s.logger.Warn("Failed to persist pending payment", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID))
return err
}
if err := s.sendConfirmationRequest(confirmReq); err != nil {
s.logger.Warn("Failed to publish confirmation request", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID))
s.markPaymentExpired(ctx, record, time.Now())
return err
}
s.trackIntent(confirmReq.RequestID, intent)
return nil
}
@@ -290,44 +310,49 @@ func (s *Service) onConfirmationResult(ctx context.Context, result *model.Confir
s.logger.Warn("Confirmation result rejected", zap.String("reason", "request_id is required"))
return merrors.InvalidArgument("confirmation request_id is required", "request_id")
}
intent := s.lookupIntent(requestID)
if intent == nil {
s.logger.Warn("Confirmation result ignored: intent not found", zap.String("request_id", requestID))
record, err := s.loadPayment(ctx, requestID)
if err != nil {
s.logger.Warn("Confirmation result lookup failed", zap.Error(err), zap.String("request_id", requestID))
return err
}
if record == nil {
s.logger.Warn("Confirmation result ignored: payment not found", zap.String("request_id", requestID))
return nil
}
if result.RawReply != nil && s.repo != nil && s.repo.TelegramConfirmations() != nil {
if err := s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{
RequestID: requestID,
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
PaymentIntentID: record.PaymentIntentID,
QuoteRef: record.QuoteRef,
RawReply: result.RawReply,
}); err != nil {
s.logger.Warn("Failed to store telegram confirmation", zap.Error(err), zap.String("request_id", requestID))
} else {
s.logger.Info("Stored telegram confirmation", zap.String("request_id", requestID),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("payment_intent_id", record.PaymentIntentID),
zap.String("reply_text", result.RawReply.Text), zap.String("reply_user_id", result.RawReply.FromUserID),
zap.String("reply_user", result.RawReply.FromUsername))
}
}
if result.Status == model.ConfirmationStatusConfirmed || result.Status == model.ConfirmationStatusClarified {
exec := &storagemodel.PaymentExecution{
IdempotencyKey: intent.IdempotencyKey,
PaymentIntentID: intent.PaymentIntentID,
ExecutedMoney: result.Money,
QuoteRef: intent.QuoteRef,
Status: executedStatus,
}
if err := s.repo.Payments().InsertExecution(ctx, exec); err != nil && err != storage.ErrDuplicate {
return err
}
nextStatus := paymentStatusFromResult(result)
currentStatus := paymentStatus(record)
if currentStatus == storagemodel.PaymentStatusExecuted || currentStatus == storagemodel.PaymentStatusExpired {
s.logger.Info("Confirmation result ignored: payment already finalized",
zap.String("request_id", requestID),
zap.String("status", string(currentStatus)))
return nil
}
s.applyPaymentResult(record, nextStatus, result)
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
s.logger.Warn("Failed to persist payment status", zap.Error(err), zap.String("request_id", requestID))
return err
}
intent := intentFromPayment(record)
s.publishExecution(intent, result)
s.publishTelegramReaction(result)
s.removeIntent(requestID)
return nil
}
@@ -432,7 +457,7 @@ func (s *Service) publishTelegramReaction(result *model.ConfirmationResult) {
RequestID: strings.TrimSpace(result.RequestID),
ChatID: strings.TrimSpace(result.RawReply.ChatID),
MessageID: strings.TrimSpace(result.RawReply.MessageID),
Emoji: telegramSuccessReaction,
Emoji: s.successReaction,
}
if request.ChatID == "" || request.MessageID == "" || request.Emoji == "" {
return
@@ -454,48 +479,55 @@ func (s *Service) publishTelegramReaction(result *model.ConfirmationResult) {
zap.String("emoji", request.Emoji))
}
func (s *Service) trackIntent(requestID string, intent *model.PaymentGatewayIntent) {
if s == nil || intent == nil {
return
func (s *Service) loadPayment(ctx context.Context, requestID string) (*storagemodel.PaymentRecord, error) {
if s == nil || s.repo == nil || s.repo.Payments() == nil {
return nil, merrors.Internal("payment gateway storage unavailable")
}
requestID = strings.TrimSpace(requestID)
if requestID == "" {
return
return nil, merrors.InvalidArgument("request_id is required", "request_id")
}
s.mu.Lock()
s.pending[requestID] = intent
s.mu.Unlock()
return s.repo.Payments().FindByIdempotencyKey(ctx, requestID)
}
func (s *Service) lookupIntent(requestID string) *model.PaymentGatewayIntent {
requestID = strings.TrimSpace(requestID)
if requestID == "" {
return nil
func (s *Service) expirePaymentIfNeeded(ctx context.Context, record *storagemodel.PaymentRecord) (*storagemodel.PaymentRecord, error) {
if record == nil {
return nil, nil
}
s.mu.Lock()
defer s.mu.Unlock()
return s.pending[requestID]
status := paymentStatus(record)
if status != storagemodel.PaymentStatusPending {
return record, nil
}
if record.ExpiresAt.IsZero() {
return record, nil
}
if time.Now().Before(record.ExpiresAt) {
return record, nil
}
record.Status = storagemodel.PaymentStatusExpired
if record.ExpiredAt.IsZero() {
record.ExpiredAt = time.Now()
}
if s != nil && s.repo != nil && s.repo.Payments() != nil {
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
return record, err
}
}
return record, nil
}
func (s *Service) removeIntent(requestID string) {
requestID = strings.TrimSpace(requestID)
if requestID == "" {
func (s *Service) markPaymentExpired(ctx context.Context, record *storagemodel.PaymentRecord, when time.Time) {
if record == nil || s == nil || s.repo == nil || s.repo.Payments() == nil {
return
}
s.mu.Lock()
delete(s.pending, requestID)
s.mu.Unlock()
}
func (s *Service) hasPending(requestID string) bool {
requestID = strings.TrimSpace(requestID)
if requestID == "" {
return false
if when.IsZero() {
when = time.Now()
}
record.Status = storagemodel.PaymentStatusExpired
record.ExpiredAt = when
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
s.logger.Warn("Failed to mark payment as expired", zap.Error(err), zap.String("request_id", record.IdempotencyKey))
}
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.pending[requestID]
return ok
}
func (s *Service) startAnnouncer() {
@@ -533,6 +565,91 @@ func normalizeIntent(intent *model.PaymentGatewayIntent) *model.PaymentGatewayIn
return &cp
}
func paymentStatus(record *storagemodel.PaymentRecord) storagemodel.PaymentStatus {
if record == nil {
return storagemodel.PaymentStatusPending
}
if record.Status != "" {
return record.Status
}
if record.ExecutedMoney != nil || !record.ExecutedAt.IsZero() {
return storagemodel.PaymentStatusExecuted
}
return storagemodel.PaymentStatusPending
}
func paymentStatusFromResult(result *model.ConfirmationResult) storagemodel.PaymentStatus {
if result == nil {
return storagemodel.PaymentStatusPending
}
switch result.Status {
case model.ConfirmationStatusConfirmed, model.ConfirmationStatusClarified:
return storagemodel.PaymentStatusExecuted
case model.ConfirmationStatusTimeout, model.ConfirmationStatusRejected:
return storagemodel.PaymentStatusExpired
default:
return storagemodel.PaymentStatusPending
}
}
func (s *Service) applyPaymentResult(record *storagemodel.PaymentRecord, status storagemodel.PaymentStatus, result *model.ConfirmationResult) {
if record == nil {
return
}
record.Status = status
switch status {
case storagemodel.PaymentStatusExecuted:
record.ExecutedMoney = result.Money
if record.ExecutedAt.IsZero() {
record.ExecutedAt = time.Now()
}
case storagemodel.PaymentStatusExpired:
if record.ExpiredAt.IsZero() {
record.ExpiredAt = time.Now()
}
}
}
func paymentRecordFromIntent(intent *model.PaymentGatewayIntent, confirmReq *model.ConfirmationRequest) *storagemodel.PaymentRecord {
record := &storagemodel.PaymentRecord{
Status: storagemodel.PaymentStatusPending,
}
if intent != nil {
record.IdempotencyKey = strings.TrimSpace(intent.IdempotencyKey)
record.PaymentIntentID = strings.TrimSpace(intent.PaymentIntentID)
record.QuoteRef = strings.TrimSpace(intent.QuoteRef)
record.OutgoingLeg = strings.TrimSpace(intent.OutgoingLeg)
record.TargetChatID = strings.TrimSpace(intent.TargetChatID)
record.RequestedMoney = intent.RequestedMoney
}
if confirmReq != nil {
record.IdempotencyKey = strings.TrimSpace(confirmReq.RequestID)
record.PaymentIntentID = strings.TrimSpace(confirmReq.PaymentIntentID)
record.QuoteRef = strings.TrimSpace(confirmReq.QuoteRef)
record.OutgoingLeg = strings.TrimSpace(confirmReq.Rail)
record.TargetChatID = strings.TrimSpace(confirmReq.TargetChatID)
record.RequestedMoney = confirmReq.RequestedMoney
if confirmReq.TimeoutSeconds > 0 {
record.ExpiresAt = time.Now().Add(time.Duration(confirmReq.TimeoutSeconds) * time.Second)
}
}
return record
}
func intentFromPayment(record *storagemodel.PaymentRecord) *model.PaymentGatewayIntent {
if record == nil {
return nil
}
return &model.PaymentGatewayIntent{
PaymentIntentID: strings.TrimSpace(record.PaymentIntentID),
IdempotencyKey: strings.TrimSpace(record.IdempotencyKey),
OutgoingLeg: strings.TrimSpace(record.OutgoingLeg),
QuoteRef: strings.TrimSpace(record.QuoteRef),
RequestedMoney: record.RequestedMoney,
TargetChatID: strings.TrimSpace(record.TargetChatID),
}
}
func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, defaultChatID string) (*model.PaymentGatewayIntent, error) {
if req == nil {
return nil, merrors.InvalidArgument("submit_transfer: request is required")
@@ -603,22 +720,27 @@ func transferFromRequest(req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
}
}
func transferFromExecution(exec *storagemodel.PaymentExecution, req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
if exec == nil {
func transferFromPayment(record *storagemodel.PaymentRecord, req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
if record == nil {
return nil
}
var requested *moneyv1.Money
if req != nil && req.GetAmount() != nil {
requested = req.GetAmount()
} else {
requested = moneyFromPayment(record.RequestedMoney)
}
net := moneyFromPayment(exec.ExecutedMoney)
status := chainv1.TransferStatus_TRANSFER_CONFIRMED
if strings.TrimSpace(exec.Status) != "" && !strings.EqualFold(exec.Status, executedStatus) {
status = chainv1.TransferStatus_TRANSFER_PENDING
net := moneyFromPayment(record.ExecutedMoney)
status := chainv1.TransferStatus_TRANSFER_SUBMITTED
switch paymentStatus(record) {
case storagemodel.PaymentStatusExecuted:
status = chainv1.TransferStatus_TRANSFER_CONFIRMED
case storagemodel.PaymentStatusExpired:
status = chainv1.TransferStatus_TRANSFER_CANCELLED
}
transfer := &chainv1.Transfer{
TransferRef: strings.TrimSpace(exec.IdempotencyKey),
IdempotencyKey: strings.TrimSpace(exec.IdempotencyKey),
TransferRef: strings.TrimSpace(record.IdempotencyKey),
IdempotencyKey: strings.TrimSpace(record.IdempotencyKey),
RequestedAmount: requested,
NetAmount: net,
Status: status,
@@ -628,26 +750,20 @@ func transferFromExecution(exec *storagemodel.PaymentExecution, req *chainv1.Sub
transfer.SourceWalletRef = strings.TrimSpace(req.GetSourceWalletRef())
transfer.Destination = req.GetDestination()
}
if !exec.ExecutedAt.IsZero() {
ts := timestamppb.New(exec.ExecutedAt)
if !record.ExecutedAt.IsZero() {
ts := timestamppb.New(record.ExecutedAt)
transfer.CreatedAt = ts
transfer.UpdatedAt = ts
} else if !record.UpdatedAt.IsZero() {
ts := timestamppb.New(record.UpdatedAt)
transfer.UpdatedAt = ts
if !record.CreatedAt.IsZero() {
transfer.CreatedAt = timestamppb.New(record.CreatedAt)
}
}
return transfer
}
func transferPending(requestID string) *chainv1.Transfer {
ref := strings.TrimSpace(requestID)
if ref == "" {
return nil
}
return &chainv1.Transfer{
TransferRef: ref,
IdempotencyKey: ref,
Status: chainv1.TransferStatus_TRANSFER_SUBMITTED,
}
}
func moneyFromPayment(m *paymenttypes.Money) *moneyv1.Money {
if m == nil {
return nil

View File

@@ -19,26 +19,23 @@ import (
)
type fakePaymentsStore struct {
mu sync.Mutex
executions map[string]*storagemodel.PaymentExecution
mu sync.Mutex
records map[string]*storagemodel.PaymentRecord
}
func (f *fakePaymentsStore) FindByIdempotencyKey(_ context.Context, key string) (*storagemodel.PaymentExecution, error) {
func (f *fakePaymentsStore) FindByIdempotencyKey(_ context.Context, key string) (*storagemodel.PaymentRecord, error) {
f.mu.Lock()
defer f.mu.Unlock()
return f.executions[key], nil
return f.records[key], nil
}
func (f *fakePaymentsStore) InsertExecution(_ context.Context, exec *storagemodel.PaymentExecution) error {
func (f *fakePaymentsStore) Upsert(_ context.Context, record *storagemodel.PaymentRecord) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.executions == nil {
f.executions = map[string]*storagemodel.PaymentExecution{}
if f.records == nil {
f.records = map[string]*storagemodel.PaymentRecord{}
}
if _, ok := f.executions[exec.IdempotencyKey]; ok {
return storage.ErrDuplicate
}
f.executions[exec.IdempotencyKey] = exec
f.records[record.IdempotencyKey] = record
return nil
}
@@ -147,6 +144,16 @@ func TestOnIntentCreatesConfirmationRequest(t *testing.T) {
if req.SourceService != string(mservice.PaymentGateway) || req.Rail != "card" {
t.Fatalf("unexpected source/rail: %#v", req)
}
record := repo.payments.records["idem-1"]
if record == nil {
t.Fatalf("expected pending payment to be stored")
}
if record.Status != storagemodel.PaymentStatusPending {
t.Fatalf("expected pending status, got %q", record.Status)
}
if record.RequestedMoney == nil || record.RequestedMoney.Amount != "10.50" {
t.Fatalf("requested money not stored correctly: %#v", record.RequestedMoney)
}
}
func TestIntentFromSubmitTransferUsesSourceMoney(t *testing.T) {
@@ -180,7 +187,14 @@ func TestConfirmationResultPersistsExecutionAndReply(t *testing.T) {
OutgoingLeg: "card",
RequestedMoney: &paymenttypes.Money{Amount: "5", Currency: "EUR"},
}
svc.trackIntent("idem-2", intent)
_ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{
IdempotencyKey: "idem-2",
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
OutgoingLeg: intent.OutgoingLeg,
RequestedMoney: intent.RequestedMoney,
Status: storagemodel.PaymentStatusPending,
})
result := &model.ConfirmationResult{
RequestID: "idem-2",
@@ -191,11 +205,15 @@ func TestConfirmationResultPersistsExecutionAndReply(t *testing.T) {
if err := svc.onConfirmationResult(context.Background(), result); err != nil {
t.Fatalf("onConfirmationResult error: %v", err)
}
if repo.payments.executions["idem-2"] == nil {
t.Fatalf("expected payment execution to be stored")
record := repo.payments.records["idem-2"]
if record == nil {
t.Fatalf("expected payment record to be stored")
}
if repo.payments.executions["idem-2"].ExecutedMoney == nil || repo.payments.executions["idem-2"].ExecutedMoney.Amount != "5" {
t.Fatalf("executed money not stored correctly")
if record.Status != storagemodel.PaymentStatusExecuted {
t.Fatalf("expected executed status, got %q", record.Status)
}
if record.ExecutedMoney == nil || record.ExecutedMoney.Amount != "5" {
t.Fatalf("executed money not stored correctly: %#v", record.ExecutedMoney)
}
if repo.tg.records["idem-2"] == nil || repo.tg.records["idem-2"].RawReply.Text != "5 EUR" {
t.Fatalf("telegram reply not stored correctly")
@@ -214,7 +232,14 @@ func TestClarifiedResultPersistsExecution(t *testing.T) {
OutgoingLeg: "card",
RequestedMoney: &paymenttypes.Money{Amount: "12", Currency: "USD"},
}
svc.trackIntent("idem-clarified", intent)
_ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{
IdempotencyKey: "idem-clarified",
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
OutgoingLeg: intent.OutgoingLeg,
RequestedMoney: intent.RequestedMoney,
Status: storagemodel.PaymentStatusPending,
})
result := &model.ConfirmationResult{
RequestID: "idem-clarified",
@@ -225,15 +250,16 @@ func TestClarifiedResultPersistsExecution(t *testing.T) {
if err := svc.onConfirmationResult(context.Background(), result); err != nil {
t.Fatalf("onConfirmationResult error: %v", err)
}
if repo.payments.executions["idem-clarified"] == nil {
t.Fatalf("expected payment execution to be stored")
record := repo.payments.records["idem-clarified"]
if record == nil || record.Status != storagemodel.PaymentStatusExecuted {
t.Fatalf("expected payment executed status, got %#v", record)
}
}
func TestIdempotencyPreventsDuplicateWrites(t *testing.T) {
logger := mloggerfactory.NewLogger(false)
repo := &fakeRepo{payments: &fakePaymentsStore{executions: map[string]*storagemodel.PaymentExecution{
"idem-3": {IdempotencyKey: "idem-3"},
repo := &fakeRepo{payments: &fakePaymentsStore{records: map[string]*storagemodel.PaymentRecord{
"idem-3": {IdempotencyKey: "idem-3", Status: storagemodel.PaymentStatusPending},
}}, tg: &fakeTelegramStore{}}
prod := &captureProducer{}
svc := NewService(logger, repo, prod, nil, Config{Rail: "card"})
@@ -265,7 +291,14 @@ func TestTimeoutDoesNotPersistExecution(t *testing.T) {
OutgoingLeg: "card",
RequestedMoney: &paymenttypes.Money{Amount: "8", Currency: "USD"},
}
svc.trackIntent("idem-4", intent)
_ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{
IdempotencyKey: "idem-4",
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
OutgoingLeg: intent.OutgoingLeg,
RequestedMoney: intent.RequestedMoney,
Status: storagemodel.PaymentStatusPending,
})
result := &model.ConfirmationResult{
RequestID: "idem-4",
@@ -274,8 +307,9 @@ func TestTimeoutDoesNotPersistExecution(t *testing.T) {
if err := svc.onConfirmationResult(context.Background(), result); err != nil {
t.Fatalf("onConfirmationResult error: %v", err)
}
if repo.payments.executions["idem-4"] != nil {
t.Fatalf("expected no execution record for timeout")
record := repo.payments.records["idem-4"]
if record == nil || record.Status != storagemodel.PaymentStatusExpired {
t.Fatalf("expected expired status for timeout, got %#v", record)
}
}
@@ -291,7 +325,14 @@ func TestRejectedDoesNotPersistExecution(t *testing.T) {
OutgoingLeg: "card",
RequestedMoney: &paymenttypes.Money{Amount: "3", Currency: "USD"},
}
svc.trackIntent("idem-reject", intent)
_ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{
IdempotencyKey: "idem-reject",
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
OutgoingLeg: intent.OutgoingLeg,
RequestedMoney: intent.RequestedMoney,
Status: storagemodel.PaymentStatusPending,
})
result := &model.ConfirmationResult{
RequestID: "idem-reject",
@@ -301,8 +342,9 @@ func TestRejectedDoesNotPersistExecution(t *testing.T) {
if err := svc.onConfirmationResult(context.Background(), result); err != nil {
t.Fatalf("onConfirmationResult error: %v", err)
}
if repo.payments.executions["idem-reject"] != nil {
t.Fatalf("expected no execution record for rejection")
record := repo.payments.records["idem-reject"]
if record == nil || record.Status != storagemodel.PaymentStatusExpired {
t.Fatalf("expected expired status for rejection, got %#v", record)
}
if repo.tg.records["idem-reject"] == nil {
t.Fatalf("expected raw reply to be stored for rejection")

View File

@@ -8,21 +8,36 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
)
type PaymentExecution struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
type PaymentStatus string
const (
PaymentStatusPending PaymentStatus = "pending"
PaymentStatusExpired PaymentStatus = "expired"
PaymentStatusExecuted PaymentStatus = "executed"
)
type PaymentRecord struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
Status string `bson:"status,omitempty" json:"status,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"`
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"`
UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"`
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
}
type TelegramConfirmation struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
RawReply *model.TelegramMessage `bson:"rawReply,omitempty" json:"raw_reply,omitempty"`
ReceivedAt time.Time `bson:"receivedAt,omitempty" json:"received_at,omitempty"`
ReceivedAt time.Time `bson:"receivedAt,omitempty" json:"received_at,omitempty"`
}

View File

@@ -13,13 +13,15 @@ import (
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
const (
paymentsCollection = "payments"
fieldIdempotencyKey = "idempotencyKey"
paymentsCollection = "payments"
fieldIdempotencyKey = "idempotencyKey"
)
type Payments struct {
@@ -53,44 +55,53 @@ func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
return p, nil
}
func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentExecution, error) {
func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error) {
key = strings.TrimSpace(key)
if key == "" {
return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key")
}
var result model.PaymentExecution
var result model.PaymentRecord
err := p.coll.FindOne(ctx, bson.M{fieldIdempotencyKey: key}).Decode(&result)
if err == mongo.ErrNoDocuments {
return nil, nil
}
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Payment execution lookup failed", zap.String("idempotency_key", key), zap.Error(err))
p.logger.Warn("Payment record lookup failed", zap.String("idempotency_key", key), zap.Error(err))
}
return nil, err
}
return &result, nil
}
func (p *Payments) InsertExecution(ctx context.Context, exec *model.PaymentExecution) error {
if exec == nil {
return merrors.InvalidArgument("payment execution is nil", "execution")
func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) error {
if record == nil {
return merrors.InvalidArgument("payment record is nil", "record")
}
exec.IdempotencyKey = strings.TrimSpace(exec.IdempotencyKey)
exec.PaymentIntentID = strings.TrimSpace(exec.PaymentIntentID)
exec.QuoteRef = strings.TrimSpace(exec.QuoteRef)
if exec.ExecutedAt.IsZero() {
exec.ExecutedAt = time.Now()
record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey)
record.PaymentIntentID = strings.TrimSpace(record.PaymentIntentID)
record.QuoteRef = strings.TrimSpace(record.QuoteRef)
record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg)
record.TargetChatID = strings.TrimSpace(record.TargetChatID)
if record.IdempotencyKey == "" {
return merrors.InvalidArgument("idempotency key is required", "idempotency_key")
}
if _, err := p.coll.InsertOne(ctx, exec); err != nil {
if mongo.IsDuplicateKeyError(err) {
return storage.ErrDuplicate
}
now := time.Now()
if record.CreatedAt.IsZero() {
record.CreatedAt = now
}
record.UpdatedAt = now
record.ID = primitive.NilObjectID
update := bson.M{
"$set": record,
}
_, err := p.coll.UpdateOne(ctx, bson.M{fieldIdempotencyKey: record.IdempotencyKey}, update, options.Update().SetUpsert(true))
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to insert payment execution",
zap.String("idempotency_key", exec.IdempotencyKey),
zap.String("payment_intent_id", exec.PaymentIntentID),
zap.String("quote_ref", exec.QuoteRef),
p.logger.Warn("Failed to upsert payment record",
zap.String("idempotency_key", record.IdempotencyKey),
zap.String("payment_intent_id", record.PaymentIntentID),
zap.String("quote_ref", record.QuoteRef),
zap.Error(err))
}
return err

View File

@@ -15,8 +15,8 @@ type Repository interface {
}
type PaymentsStore interface {
FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentExecution, error)
InsertExecution(ctx context.Context, exec *model.PaymentExecution) error
FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error)
Upsert(ctx context.Context, record *model.PaymentRecord) error
}
type TelegramConfirmationsStore interface {