bettter message reaction and pending payments persistence #290

Merged
tech merged 1 commits from tgsettle-289 into main 2026-01-20 23:12:52 +00:00
7 changed files with 362 additions and 175 deletions

View File

@@ -40,3 +40,4 @@ gateway:
target_chat_id_env: TGSETTLE_GATEWAY_CHAT_ID target_chat_id_env: TGSETTLE_GATEWAY_CHAT_ID
timeout_seconds: 259200 timeout_seconds: 259200
accepted_user_ids: [] accepted_user_ids: []
success_reaction: "\U0001FAE1"

View File

@@ -39,6 +39,7 @@ type gatewayConfig struct {
TargetChatIDEnv string `yaml:"target_chat_id_env"` TargetChatIDEnv string `yaml:"target_chat_id_env"`
TimeoutSeconds int32 `yaml:"timeout_seconds"` TimeoutSeconds int32 `yaml:"timeout_seconds"`
AcceptedUserIDs []string `yaml:"accepted_user_ids"` AcceptedUserIDs []string `yaml:"accepted_user_ids"`
SuccessReaction string `yaml:"success_reaction"`
} }
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) { func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
@@ -94,6 +95,7 @@ func (i *Imp) Start() error {
TargetChatIDEnv: cfg.Gateway.TargetChatIDEnv, TargetChatIDEnv: cfg.Gateway.TargetChatIDEnv,
TimeoutSeconds: cfg.Gateway.TimeoutSeconds, TimeoutSeconds: cfg.Gateway.TimeoutSeconds,
AcceptedUserIDs: cfg.Gateway.AcceptedUserIDs, AcceptedUserIDs: cfg.Gateway.AcceptedUserIDs,
SuccessReaction: cfg.Gateway.SuccessReaction,
InvokeURI: invokeURI, InvokeURI: invokeURI,
} }
svc := gateway.NewService(logger, repo, producer, broker, gwCfg) svc := gateway.NewService(logger, repo, producer, broker, gwCfg)

View File

@@ -5,7 +5,7 @@ import (
"errors" "errors"
"os" "os"
"strings" "strings"
"sync" "time"
"github.com/tech/sendico/gateway/tgsettle/storage" "github.com/tech/sendico/gateway/tgsettle/storage"
storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model"
@@ -36,8 +36,7 @@ import (
const ( const (
defaultConfirmationTimeoutSeconds = 259200 defaultConfirmationTimeoutSeconds = 259200
executedStatus = "executed" defaultTelegramSuccessReaction = "\U0001FAE1"
telegramSuccessReaction = "\u2705"
) )
const ( const (
@@ -54,6 +53,7 @@ type Config struct {
TargetChatIDEnv string TargetChatIDEnv string
TimeoutSeconds int32 TimeoutSeconds int32
AcceptedUserIDs []string AcceptedUserIDs []string
SuccessReaction string
InvokeURI string InvokeURI string
} }
@@ -67,9 +67,8 @@ type Service struct {
chatID string chatID string
announcer *discovery.Announcer announcer *discovery.Announcer
invokeURI string invokeURI string
successReaction string
mu sync.Mutex
pending map[string]*model.PaymentGatewayIntent
consumers []msg.Consumer consumers []msg.Consumer
connectorv1.UnimplementedConnectorServiceServer connectorv1.UnimplementedConnectorServiceServer
@@ -88,9 +87,12 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
cfg: cfg, cfg: cfg,
rail: strings.TrimSpace(cfg.Rail), rail: strings.TrimSpace(cfg.Rail),
invokeURI: strings.TrimSpace(cfg.InvokeURI), invokeURI: strings.TrimSpace(cfg.InvokeURI),
pending: map[string]*model.PaymentGatewayIntent{},
} }
svc.chatID = strings.TrimSpace(readEnv(cfg.TargetChatIDEnv)) svc.chatID = strings.TrimSpace(readEnv(cfg.TargetChatIDEnv))
svc.successReaction = strings.TrimSpace(cfg.SuccessReaction)
if svc.successReaction == "" {
svc.successReaction = defaultTelegramSuccessReaction
}
svc.startConsumers() svc.startConsumers()
svc.startAnnouncer() svc.startAnnouncer()
return svc return svc
@@ -186,8 +188,13 @@ func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransfe
return nil, err return nil, err
} }
if existing != nil { if existing != nil {
s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", strings.TrimSpace(existing.Status)))...) existing, err = s.expirePaymentIfNeeded(ctx, existing)
return &chainv1.SubmitTransferResponse{Transfer: transferFromExecution(existing, req)}, nil if err != nil {
s.logger.Warn("Submit transfer status refresh failed", append(logFields, zap.Error(err))...)
return nil, err
}
s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", string(paymentStatus(existing))))...)
return &chainv1.SubmitTransferResponse{Transfer: transferFromPayment(existing, req)}, nil
} }
if err := s.onIntent(ctx, intent); err != nil { if err := s.onIntent(ctx, intent); err != nil {
s.logger.Warn("Submit transfer intent handling failed", append(logFields, zap.Error(err))...) s.logger.Warn("Submit transfer intent handling failed", append(logFields, zap.Error(err))...)
@@ -218,15 +225,16 @@ func (s *Service) GetTransfer(ctx context.Context, req *chainv1.GetTransferReque
return nil, err return nil, err
} }
if existing != nil { if existing != nil {
existing, err = s.expirePaymentIfNeeded(ctx, existing)
if err != nil {
s.logger.Warn("Get transfer status refresh failed", append(logFields, zap.Error(err))...)
return nil, err
}
s.logger.Info("Get transfer resolved from execution", append(logFields, s.logger.Info("Get transfer resolved from execution", append(logFields,
zap.String("payment_intent_id", strings.TrimSpace(existing.PaymentIntentID)), zap.String("payment_intent_id", strings.TrimSpace(existing.PaymentIntentID)),
zap.String("status", strings.TrimSpace(existing.Status)), zap.String("status", string(paymentStatus(existing))),
)...) )...)
return &chainv1.GetTransferResponse{Transfer: transferFromExecution(existing, nil)}, nil return &chainv1.GetTransferResponse{Transfer: transferFromPayment(existing, nil)}, nil
}
if s.hasPending(transferRef) {
s.logger.Info("Get transfer pending", logFields...)
return &chainv1.GetTransferResponse{Transfer: transferPending(transferRef)}, nil
} }
s.logger.Warn("Get transfer not found", logFields...) s.logger.Warn("Get transfer not found", logFields...)
return nil, status.Error(codes.NotFound, "transfer not found") return nil, status.Error(codes.NotFound, "transfer not found")
@@ -255,28 +263,40 @@ func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayInte
return merrors.Internal("payment gateway storage unavailable") return merrors.Internal("payment gateway storage unavailable")
} }
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, intent.IdempotencyKey)
if err != nil {
return err
}
if existing != nil {
s.logger.Info("Payment gateway intent already executed",
zap.String("idempotency_key", intent.IdempotencyKey),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("quote_ref", intent.QuoteRef),
zap.String("rail", intent.OutgoingLeg))
return nil
}
confirmReq, err := s.buildConfirmationRequest(intent) confirmReq, err := s.buildConfirmationRequest(intent)
if err != nil { if err != nil {
s.logger.Warn("Failed to build confirmation request", zap.Error(err), zap.String("idempotency_key", intent.IdempotencyKey), zap.String("payment_intent_id", intent.PaymentIntentID)) s.logger.Warn("Failed to build confirmation request", zap.Error(err), zap.String("idempotency_key", intent.IdempotencyKey), zap.String("payment_intent_id", intent.PaymentIntentID))
return err return err
} }
if err := s.sendConfirmationRequest(confirmReq); err != nil {
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, confirmReq.RequestID)
if err != nil {
return err
}
if existing != nil {
existing, err = s.expirePaymentIfNeeded(ctx, existing)
if err != nil {
return err
}
s.logger.Info("Payment gateway intent already recorded",
zap.String("idempotency_key", confirmReq.RequestID),
zap.String("payment_intent_id", confirmReq.PaymentIntentID),
zap.String("quote_ref", confirmReq.QuoteRef),
zap.String("rail", confirmReq.Rail),
zap.String("status", string(paymentStatus(existing))))
return nil
}
record := paymentRecordFromIntent(intent, confirmReq)
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
s.logger.Warn("Failed to persist pending payment", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID))
return err
}
if err := s.sendConfirmationRequest(confirmReq); err != nil {
s.logger.Warn("Failed to publish confirmation request", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID))
s.markPaymentExpired(ctx, record, time.Now())
return err return err
} }
s.trackIntent(confirmReq.RequestID, intent)
return nil return nil
} }
@@ -290,44 +310,49 @@ func (s *Service) onConfirmationResult(ctx context.Context, result *model.Confir
s.logger.Warn("Confirmation result rejected", zap.String("reason", "request_id is required")) s.logger.Warn("Confirmation result rejected", zap.String("reason", "request_id is required"))
return merrors.InvalidArgument("confirmation request_id is required", "request_id") return merrors.InvalidArgument("confirmation request_id is required", "request_id")
} }
intent := s.lookupIntent(requestID) record, err := s.loadPayment(ctx, requestID)
if intent == nil { if err != nil {
s.logger.Warn("Confirmation result ignored: intent not found", zap.String("request_id", requestID)) s.logger.Warn("Confirmation result lookup failed", zap.Error(err), zap.String("request_id", requestID))
return err
}
if record == nil {
s.logger.Warn("Confirmation result ignored: payment not found", zap.String("request_id", requestID))
return nil return nil
} }
if result.RawReply != nil && s.repo != nil && s.repo.TelegramConfirmations() != nil { if result.RawReply != nil && s.repo != nil && s.repo.TelegramConfirmations() != nil {
if err := s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{ if err := s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{
RequestID: requestID, RequestID: requestID,
PaymentIntentID: intent.PaymentIntentID, PaymentIntentID: record.PaymentIntentID,
QuoteRef: intent.QuoteRef, QuoteRef: record.QuoteRef,
RawReply: result.RawReply, RawReply: result.RawReply,
}); err != nil { }); err != nil {
s.logger.Warn("Failed to store telegram confirmation", zap.Error(err), zap.String("request_id", requestID)) s.logger.Warn("Failed to store telegram confirmation", zap.Error(err), zap.String("request_id", requestID))
} else { } else {
s.logger.Info("Stored telegram confirmation", zap.String("request_id", requestID), s.logger.Info("Stored telegram confirmation", zap.String("request_id", requestID),
zap.String("payment_intent_id", intent.PaymentIntentID), zap.String("payment_intent_id", record.PaymentIntentID),
zap.String("reply_text", result.RawReply.Text), zap.String("reply_user_id", result.RawReply.FromUserID), zap.String("reply_text", result.RawReply.Text), zap.String("reply_user_id", result.RawReply.FromUserID),
zap.String("reply_user", result.RawReply.FromUsername)) zap.String("reply_user", result.RawReply.FromUsername))
} }
} }
if result.Status == model.ConfirmationStatusConfirmed || result.Status == model.ConfirmationStatusClarified { nextStatus := paymentStatusFromResult(result)
exec := &storagemodel.PaymentExecution{ currentStatus := paymentStatus(record)
IdempotencyKey: intent.IdempotencyKey, if currentStatus == storagemodel.PaymentStatusExecuted || currentStatus == storagemodel.PaymentStatusExpired {
PaymentIntentID: intent.PaymentIntentID, s.logger.Info("Confirmation result ignored: payment already finalized",
ExecutedMoney: result.Money, zap.String("request_id", requestID),
QuoteRef: intent.QuoteRef, zap.String("status", string(currentStatus)))
Status: executedStatus, return nil
} }
if err := s.repo.Payments().InsertExecution(ctx, exec); err != nil && err != storage.ErrDuplicate { s.applyPaymentResult(record, nextStatus, result)
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
s.logger.Warn("Failed to persist payment status", zap.Error(err), zap.String("request_id", requestID))
return err return err
} }
}
intent := intentFromPayment(record)
s.publishExecution(intent, result) s.publishExecution(intent, result)
s.publishTelegramReaction(result) s.publishTelegramReaction(result)
s.removeIntent(requestID)
return nil return nil
} }
@@ -432,7 +457,7 @@ func (s *Service) publishTelegramReaction(result *model.ConfirmationResult) {
RequestID: strings.TrimSpace(result.RequestID), RequestID: strings.TrimSpace(result.RequestID),
ChatID: strings.TrimSpace(result.RawReply.ChatID), ChatID: strings.TrimSpace(result.RawReply.ChatID),
MessageID: strings.TrimSpace(result.RawReply.MessageID), MessageID: strings.TrimSpace(result.RawReply.MessageID),
Emoji: telegramSuccessReaction, Emoji: s.successReaction,
} }
if request.ChatID == "" || request.MessageID == "" || request.Emoji == "" { if request.ChatID == "" || request.MessageID == "" || request.Emoji == "" {
return return
@@ -454,48 +479,55 @@ func (s *Service) publishTelegramReaction(result *model.ConfirmationResult) {
zap.String("emoji", request.Emoji)) zap.String("emoji", request.Emoji))
} }
func (s *Service) trackIntent(requestID string, intent *model.PaymentGatewayIntent) { func (s *Service) loadPayment(ctx context.Context, requestID string) (*storagemodel.PaymentRecord, error) {
if s == nil || intent == nil { if s == nil || s.repo == nil || s.repo.Payments() == nil {
return return nil, merrors.Internal("payment gateway storage unavailable")
} }
requestID = strings.TrimSpace(requestID) requestID = strings.TrimSpace(requestID)
if requestID == "" { if requestID == "" {
return return nil, merrors.InvalidArgument("request_id is required", "request_id")
} }
s.mu.Lock() return s.repo.Payments().FindByIdempotencyKey(ctx, requestID)
s.pending[requestID] = intent
s.mu.Unlock()
} }
func (s *Service) lookupIntent(requestID string) *model.PaymentGatewayIntent { func (s *Service) expirePaymentIfNeeded(ctx context.Context, record *storagemodel.PaymentRecord) (*storagemodel.PaymentRecord, error) {
requestID = strings.TrimSpace(requestID) if record == nil {
if requestID == "" { return nil, nil
return nil
} }
s.mu.Lock() status := paymentStatus(record)
defer s.mu.Unlock() if status != storagemodel.PaymentStatusPending {
return s.pending[requestID] return record, nil
}
if record.ExpiresAt.IsZero() {
return record, nil
}
if time.Now().Before(record.ExpiresAt) {
return record, nil
}
record.Status = storagemodel.PaymentStatusExpired
if record.ExpiredAt.IsZero() {
record.ExpiredAt = time.Now()
}
if s != nil && s.repo != nil && s.repo.Payments() != nil {
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
return record, err
}
}
return record, nil
} }
func (s *Service) removeIntent(requestID string) { func (s *Service) markPaymentExpired(ctx context.Context, record *storagemodel.PaymentRecord, when time.Time) {
requestID = strings.TrimSpace(requestID) if record == nil || s == nil || s.repo == nil || s.repo.Payments() == nil {
if requestID == "" {
return return
} }
s.mu.Lock() if when.IsZero() {
delete(s.pending, requestID) when = time.Now()
s.mu.Unlock() }
} record.Status = storagemodel.PaymentStatusExpired
record.ExpiredAt = when
func (s *Service) hasPending(requestID string) bool { if err := s.repo.Payments().Upsert(ctx, record); err != nil {
requestID = strings.TrimSpace(requestID) s.logger.Warn("Failed to mark payment as expired", zap.Error(err), zap.String("request_id", record.IdempotencyKey))
if requestID == "" {
return false
} }
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.pending[requestID]
return ok
} }
func (s *Service) startAnnouncer() { func (s *Service) startAnnouncer() {
@@ -533,6 +565,91 @@ func normalizeIntent(intent *model.PaymentGatewayIntent) *model.PaymentGatewayIn
return &cp return &cp
} }
func paymentStatus(record *storagemodel.PaymentRecord) storagemodel.PaymentStatus {
if record == nil {
return storagemodel.PaymentStatusPending
}
if record.Status != "" {
return record.Status
}
if record.ExecutedMoney != nil || !record.ExecutedAt.IsZero() {
return storagemodel.PaymentStatusExecuted
}
return storagemodel.PaymentStatusPending
}
func paymentStatusFromResult(result *model.ConfirmationResult) storagemodel.PaymentStatus {
if result == nil {
return storagemodel.PaymentStatusPending
}
switch result.Status {
case model.ConfirmationStatusConfirmed, model.ConfirmationStatusClarified:
return storagemodel.PaymentStatusExecuted
case model.ConfirmationStatusTimeout, model.ConfirmationStatusRejected:
return storagemodel.PaymentStatusExpired
default:
return storagemodel.PaymentStatusPending
}
}
func (s *Service) applyPaymentResult(record *storagemodel.PaymentRecord, status storagemodel.PaymentStatus, result *model.ConfirmationResult) {
if record == nil {
return
}
record.Status = status
switch status {
case storagemodel.PaymentStatusExecuted:
record.ExecutedMoney = result.Money
if record.ExecutedAt.IsZero() {
record.ExecutedAt = time.Now()
}
case storagemodel.PaymentStatusExpired:
if record.ExpiredAt.IsZero() {
record.ExpiredAt = time.Now()
}
}
}
func paymentRecordFromIntent(intent *model.PaymentGatewayIntent, confirmReq *model.ConfirmationRequest) *storagemodel.PaymentRecord {
record := &storagemodel.PaymentRecord{
Status: storagemodel.PaymentStatusPending,
}
if intent != nil {
record.IdempotencyKey = strings.TrimSpace(intent.IdempotencyKey)
record.PaymentIntentID = strings.TrimSpace(intent.PaymentIntentID)
record.QuoteRef = strings.TrimSpace(intent.QuoteRef)
record.OutgoingLeg = strings.TrimSpace(intent.OutgoingLeg)
record.TargetChatID = strings.TrimSpace(intent.TargetChatID)
record.RequestedMoney = intent.RequestedMoney
}
if confirmReq != nil {
record.IdempotencyKey = strings.TrimSpace(confirmReq.RequestID)
record.PaymentIntentID = strings.TrimSpace(confirmReq.PaymentIntentID)
record.QuoteRef = strings.TrimSpace(confirmReq.QuoteRef)
record.OutgoingLeg = strings.TrimSpace(confirmReq.Rail)
record.TargetChatID = strings.TrimSpace(confirmReq.TargetChatID)
record.RequestedMoney = confirmReq.RequestedMoney
if confirmReq.TimeoutSeconds > 0 {
record.ExpiresAt = time.Now().Add(time.Duration(confirmReq.TimeoutSeconds) * time.Second)
}
}
return record
}
func intentFromPayment(record *storagemodel.PaymentRecord) *model.PaymentGatewayIntent {
if record == nil {
return nil
}
return &model.PaymentGatewayIntent{
PaymentIntentID: strings.TrimSpace(record.PaymentIntentID),
IdempotencyKey: strings.TrimSpace(record.IdempotencyKey),
OutgoingLeg: strings.TrimSpace(record.OutgoingLeg),
QuoteRef: strings.TrimSpace(record.QuoteRef),
RequestedMoney: record.RequestedMoney,
TargetChatID: strings.TrimSpace(record.TargetChatID),
}
}
func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, defaultChatID string) (*model.PaymentGatewayIntent, error) { func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, defaultChatID string) (*model.PaymentGatewayIntent, error) {
if req == nil { if req == nil {
return nil, merrors.InvalidArgument("submit_transfer: request is required") return nil, merrors.InvalidArgument("submit_transfer: request is required")
@@ -603,22 +720,27 @@ func transferFromRequest(req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
} }
} }
func transferFromExecution(exec *storagemodel.PaymentExecution, req *chainv1.SubmitTransferRequest) *chainv1.Transfer { func transferFromPayment(record *storagemodel.PaymentRecord, req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
if exec == nil { if record == nil {
return nil return nil
} }
var requested *moneyv1.Money var requested *moneyv1.Money
if req != nil && req.GetAmount() != nil { if req != nil && req.GetAmount() != nil {
requested = req.GetAmount() requested = req.GetAmount()
} else {
requested = moneyFromPayment(record.RequestedMoney)
} }
net := moneyFromPayment(exec.ExecutedMoney) net := moneyFromPayment(record.ExecutedMoney)
status := chainv1.TransferStatus_TRANSFER_CONFIRMED status := chainv1.TransferStatus_TRANSFER_SUBMITTED
if strings.TrimSpace(exec.Status) != "" && !strings.EqualFold(exec.Status, executedStatus) { switch paymentStatus(record) {
status = chainv1.TransferStatus_TRANSFER_PENDING case storagemodel.PaymentStatusExecuted:
status = chainv1.TransferStatus_TRANSFER_CONFIRMED
case storagemodel.PaymentStatusExpired:
status = chainv1.TransferStatus_TRANSFER_CANCELLED
} }
transfer := &chainv1.Transfer{ transfer := &chainv1.Transfer{
TransferRef: strings.TrimSpace(exec.IdempotencyKey), TransferRef: strings.TrimSpace(record.IdempotencyKey),
IdempotencyKey: strings.TrimSpace(exec.IdempotencyKey), IdempotencyKey: strings.TrimSpace(record.IdempotencyKey),
RequestedAmount: requested, RequestedAmount: requested,
NetAmount: net, NetAmount: net,
Status: status, Status: status,
@@ -628,26 +750,20 @@ func transferFromExecution(exec *storagemodel.PaymentExecution, req *chainv1.Sub
transfer.SourceWalletRef = strings.TrimSpace(req.GetSourceWalletRef()) transfer.SourceWalletRef = strings.TrimSpace(req.GetSourceWalletRef())
transfer.Destination = req.GetDestination() transfer.Destination = req.GetDestination()
} }
if !exec.ExecutedAt.IsZero() { if !record.ExecutedAt.IsZero() {
ts := timestamppb.New(exec.ExecutedAt) ts := timestamppb.New(record.ExecutedAt)
transfer.CreatedAt = ts transfer.CreatedAt = ts
transfer.UpdatedAt = ts transfer.UpdatedAt = ts
} else if !record.UpdatedAt.IsZero() {
ts := timestamppb.New(record.UpdatedAt)
transfer.UpdatedAt = ts
if !record.CreatedAt.IsZero() {
transfer.CreatedAt = timestamppb.New(record.CreatedAt)
}
} }
return transfer return transfer
} }
func transferPending(requestID string) *chainv1.Transfer {
ref := strings.TrimSpace(requestID)
if ref == "" {
return nil
}
return &chainv1.Transfer{
TransferRef: ref,
IdempotencyKey: ref,
Status: chainv1.TransferStatus_TRANSFER_SUBMITTED,
}
}
func moneyFromPayment(m *paymenttypes.Money) *moneyv1.Money { func moneyFromPayment(m *paymenttypes.Money) *moneyv1.Money {
if m == nil { if m == nil {
return nil return nil

View File

@@ -20,25 +20,22 @@ import (
type fakePaymentsStore struct { type fakePaymentsStore struct {
mu sync.Mutex mu sync.Mutex
executions map[string]*storagemodel.PaymentExecution records map[string]*storagemodel.PaymentRecord
} }
func (f *fakePaymentsStore) FindByIdempotencyKey(_ context.Context, key string) (*storagemodel.PaymentExecution, error) { func (f *fakePaymentsStore) FindByIdempotencyKey(_ context.Context, key string) (*storagemodel.PaymentRecord, error) {
f.mu.Lock() f.mu.Lock()
defer f.mu.Unlock() defer f.mu.Unlock()
return f.executions[key], nil return f.records[key], nil
} }
func (f *fakePaymentsStore) InsertExecution(_ context.Context, exec *storagemodel.PaymentExecution) error { func (f *fakePaymentsStore) Upsert(_ context.Context, record *storagemodel.PaymentRecord) error {
f.mu.Lock() f.mu.Lock()
defer f.mu.Unlock() defer f.mu.Unlock()
if f.executions == nil { if f.records == nil {
f.executions = map[string]*storagemodel.PaymentExecution{} f.records = map[string]*storagemodel.PaymentRecord{}
} }
if _, ok := f.executions[exec.IdempotencyKey]; ok { f.records[record.IdempotencyKey] = record
return storage.ErrDuplicate
}
f.executions[exec.IdempotencyKey] = exec
return nil return nil
} }
@@ -147,6 +144,16 @@ func TestOnIntentCreatesConfirmationRequest(t *testing.T) {
if req.SourceService != string(mservice.PaymentGateway) || req.Rail != "card" { if req.SourceService != string(mservice.PaymentGateway) || req.Rail != "card" {
t.Fatalf("unexpected source/rail: %#v", req) t.Fatalf("unexpected source/rail: %#v", req)
} }
record := repo.payments.records["idem-1"]
if record == nil {
t.Fatalf("expected pending payment to be stored")
}
if record.Status != storagemodel.PaymentStatusPending {
t.Fatalf("expected pending status, got %q", record.Status)
}
if record.RequestedMoney == nil || record.RequestedMoney.Amount != "10.50" {
t.Fatalf("requested money not stored correctly: %#v", record.RequestedMoney)
}
} }
func TestIntentFromSubmitTransferUsesSourceMoney(t *testing.T) { func TestIntentFromSubmitTransferUsesSourceMoney(t *testing.T) {
@@ -180,7 +187,14 @@ func TestConfirmationResultPersistsExecutionAndReply(t *testing.T) {
OutgoingLeg: "card", OutgoingLeg: "card",
RequestedMoney: &paymenttypes.Money{Amount: "5", Currency: "EUR"}, RequestedMoney: &paymenttypes.Money{Amount: "5", Currency: "EUR"},
} }
svc.trackIntent("idem-2", intent) _ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{
IdempotencyKey: "idem-2",
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
OutgoingLeg: intent.OutgoingLeg,
RequestedMoney: intent.RequestedMoney,
Status: storagemodel.PaymentStatusPending,
})
result := &model.ConfirmationResult{ result := &model.ConfirmationResult{
RequestID: "idem-2", RequestID: "idem-2",
@@ -191,11 +205,15 @@ func TestConfirmationResultPersistsExecutionAndReply(t *testing.T) {
if err := svc.onConfirmationResult(context.Background(), result); err != nil { if err := svc.onConfirmationResult(context.Background(), result); err != nil {
t.Fatalf("onConfirmationResult error: %v", err) t.Fatalf("onConfirmationResult error: %v", err)
} }
if repo.payments.executions["idem-2"] == nil { record := repo.payments.records["idem-2"]
t.Fatalf("expected payment execution to be stored") if record == nil {
t.Fatalf("expected payment record to be stored")
} }
if repo.payments.executions["idem-2"].ExecutedMoney == nil || repo.payments.executions["idem-2"].ExecutedMoney.Amount != "5" { if record.Status != storagemodel.PaymentStatusExecuted {
t.Fatalf("executed money not stored correctly") t.Fatalf("expected executed status, got %q", record.Status)
}
if record.ExecutedMoney == nil || record.ExecutedMoney.Amount != "5" {
t.Fatalf("executed money not stored correctly: %#v", record.ExecutedMoney)
} }
if repo.tg.records["idem-2"] == nil || repo.tg.records["idem-2"].RawReply.Text != "5 EUR" { if repo.tg.records["idem-2"] == nil || repo.tg.records["idem-2"].RawReply.Text != "5 EUR" {
t.Fatalf("telegram reply not stored correctly") t.Fatalf("telegram reply not stored correctly")
@@ -214,7 +232,14 @@ func TestClarifiedResultPersistsExecution(t *testing.T) {
OutgoingLeg: "card", OutgoingLeg: "card",
RequestedMoney: &paymenttypes.Money{Amount: "12", Currency: "USD"}, RequestedMoney: &paymenttypes.Money{Amount: "12", Currency: "USD"},
} }
svc.trackIntent("idem-clarified", intent) _ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{
IdempotencyKey: "idem-clarified",
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
OutgoingLeg: intent.OutgoingLeg,
RequestedMoney: intent.RequestedMoney,
Status: storagemodel.PaymentStatusPending,
})
result := &model.ConfirmationResult{ result := &model.ConfirmationResult{
RequestID: "idem-clarified", RequestID: "idem-clarified",
@@ -225,15 +250,16 @@ func TestClarifiedResultPersistsExecution(t *testing.T) {
if err := svc.onConfirmationResult(context.Background(), result); err != nil { if err := svc.onConfirmationResult(context.Background(), result); err != nil {
t.Fatalf("onConfirmationResult error: %v", err) t.Fatalf("onConfirmationResult error: %v", err)
} }
if repo.payments.executions["idem-clarified"] == nil { record := repo.payments.records["idem-clarified"]
t.Fatalf("expected payment execution to be stored") if record == nil || record.Status != storagemodel.PaymentStatusExecuted {
t.Fatalf("expected payment executed status, got %#v", record)
} }
} }
func TestIdempotencyPreventsDuplicateWrites(t *testing.T) { func TestIdempotencyPreventsDuplicateWrites(t *testing.T) {
logger := mloggerfactory.NewLogger(false) logger := mloggerfactory.NewLogger(false)
repo := &fakeRepo{payments: &fakePaymentsStore{executions: map[string]*storagemodel.PaymentExecution{ repo := &fakeRepo{payments: &fakePaymentsStore{records: map[string]*storagemodel.PaymentRecord{
"idem-3": {IdempotencyKey: "idem-3"}, "idem-3": {IdempotencyKey: "idem-3", Status: storagemodel.PaymentStatusPending},
}}, tg: &fakeTelegramStore{}} }}, tg: &fakeTelegramStore{}}
prod := &captureProducer{} prod := &captureProducer{}
svc := NewService(logger, repo, prod, nil, Config{Rail: "card"}) svc := NewService(logger, repo, prod, nil, Config{Rail: "card"})
@@ -265,7 +291,14 @@ func TestTimeoutDoesNotPersistExecution(t *testing.T) {
OutgoingLeg: "card", OutgoingLeg: "card",
RequestedMoney: &paymenttypes.Money{Amount: "8", Currency: "USD"}, RequestedMoney: &paymenttypes.Money{Amount: "8", Currency: "USD"},
} }
svc.trackIntent("idem-4", intent) _ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{
IdempotencyKey: "idem-4",
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
OutgoingLeg: intent.OutgoingLeg,
RequestedMoney: intent.RequestedMoney,
Status: storagemodel.PaymentStatusPending,
})
result := &model.ConfirmationResult{ result := &model.ConfirmationResult{
RequestID: "idem-4", RequestID: "idem-4",
@@ -274,8 +307,9 @@ func TestTimeoutDoesNotPersistExecution(t *testing.T) {
if err := svc.onConfirmationResult(context.Background(), result); err != nil { if err := svc.onConfirmationResult(context.Background(), result); err != nil {
t.Fatalf("onConfirmationResult error: %v", err) t.Fatalf("onConfirmationResult error: %v", err)
} }
if repo.payments.executions["idem-4"] != nil { record := repo.payments.records["idem-4"]
t.Fatalf("expected no execution record for timeout") if record == nil || record.Status != storagemodel.PaymentStatusExpired {
t.Fatalf("expected expired status for timeout, got %#v", record)
} }
} }
@@ -291,7 +325,14 @@ func TestRejectedDoesNotPersistExecution(t *testing.T) {
OutgoingLeg: "card", OutgoingLeg: "card",
RequestedMoney: &paymenttypes.Money{Amount: "3", Currency: "USD"}, RequestedMoney: &paymenttypes.Money{Amount: "3", Currency: "USD"},
} }
svc.trackIntent("idem-reject", intent) _ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{
IdempotencyKey: "idem-reject",
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
OutgoingLeg: intent.OutgoingLeg,
RequestedMoney: intent.RequestedMoney,
Status: storagemodel.PaymentStatusPending,
})
result := &model.ConfirmationResult{ result := &model.ConfirmationResult{
RequestID: "idem-reject", RequestID: "idem-reject",
@@ -301,8 +342,9 @@ func TestRejectedDoesNotPersistExecution(t *testing.T) {
if err := svc.onConfirmationResult(context.Background(), result); err != nil { if err := svc.onConfirmationResult(context.Background(), result); err != nil {
t.Fatalf("onConfirmationResult error: %v", err) t.Fatalf("onConfirmationResult error: %v", err)
} }
if repo.payments.executions["idem-reject"] != nil { record := repo.payments.records["idem-reject"]
t.Fatalf("expected no execution record for rejection") if record == nil || record.Status != storagemodel.PaymentStatusExpired {
t.Fatalf("expected expired status for rejection, got %#v", record)
} }
if repo.tg.records["idem-reject"] == nil { if repo.tg.records["idem-reject"] == nil {
t.Fatalf("expected raw reply to be stored for rejection") t.Fatalf("expected raw reply to be stored for rejection")

View File

@@ -8,14 +8,29 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
) )
type PaymentExecution struct { type PaymentStatus string
const (
PaymentStatusPending PaymentStatus = "pending"
PaymentStatusExpired PaymentStatus = "expired"
PaymentStatusExecuted PaymentStatus = "executed"
)
type PaymentRecord struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id"` ID primitive.ObjectID `bson:"_id,omitempty" json:"id"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"`
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"`
UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"`
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
Status string `bson:"status,omitempty" json:"status,omitempty"` ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
} }
type TelegramConfirmation struct { type TelegramConfirmation struct {

View File

@@ -13,7 +13,9 @@ import (
"github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -53,44 +55,53 @@ func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
return p, nil return p, nil
} }
func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentExecution, error) { func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error) {
key = strings.TrimSpace(key) key = strings.TrimSpace(key)
if key == "" { if key == "" {
return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key") return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key")
} }
var result model.PaymentExecution var result model.PaymentRecord
err := p.coll.FindOne(ctx, bson.M{fieldIdempotencyKey: key}).Decode(&result) err := p.coll.FindOne(ctx, bson.M{fieldIdempotencyKey: key}).Decode(&result)
if err == mongo.ErrNoDocuments { if err == mongo.ErrNoDocuments {
return nil, nil return nil, nil
} }
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Payment execution lookup failed", zap.String("idempotency_key", key), zap.Error(err)) p.logger.Warn("Payment record lookup failed", zap.String("idempotency_key", key), zap.Error(err))
} }
return nil, err return nil, err
} }
return &result, nil return &result, nil
} }
func (p *Payments) InsertExecution(ctx context.Context, exec *model.PaymentExecution) error { func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) error {
if exec == nil { if record == nil {
return merrors.InvalidArgument("payment execution is nil", "execution") return merrors.InvalidArgument("payment record is nil", "record")
} }
exec.IdempotencyKey = strings.TrimSpace(exec.IdempotencyKey) record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey)
exec.PaymentIntentID = strings.TrimSpace(exec.PaymentIntentID) record.PaymentIntentID = strings.TrimSpace(record.PaymentIntentID)
exec.QuoteRef = strings.TrimSpace(exec.QuoteRef) record.QuoteRef = strings.TrimSpace(record.QuoteRef)
if exec.ExecutedAt.IsZero() { record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg)
exec.ExecutedAt = time.Now() record.TargetChatID = strings.TrimSpace(record.TargetChatID)
if record.IdempotencyKey == "" {
return merrors.InvalidArgument("idempotency key is required", "idempotency_key")
} }
if _, err := p.coll.InsertOne(ctx, exec); err != nil { now := time.Now()
if mongo.IsDuplicateKeyError(err) { if record.CreatedAt.IsZero() {
return storage.ErrDuplicate record.CreatedAt = now
} }
record.UpdatedAt = now
record.ID = primitive.NilObjectID
update := bson.M{
"$set": record,
}
_, err := p.coll.UpdateOne(ctx, bson.M{fieldIdempotencyKey: record.IdempotencyKey}, update, options.Update().SetUpsert(true))
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to insert payment execution", p.logger.Warn("Failed to upsert payment record",
zap.String("idempotency_key", exec.IdempotencyKey), zap.String("idempotency_key", record.IdempotencyKey),
zap.String("payment_intent_id", exec.PaymentIntentID), zap.String("payment_intent_id", record.PaymentIntentID),
zap.String("quote_ref", exec.QuoteRef), zap.String("quote_ref", record.QuoteRef),
zap.Error(err)) zap.Error(err))
} }
return err return err

View File

@@ -15,8 +15,8 @@ type Repository interface {
} }
type PaymentsStore interface { type PaymentsStore interface {
FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentExecution, error) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error)
InsertExecution(ctx context.Context, exec *model.PaymentExecution) error Upsert(ctx context.Context, record *model.PaymentRecord) error
} }
type TelegramConfirmationsStore interface { type TelegramConfirmationsStore interface {