diff --git a/api/gateway/tgsettle/config.yml b/api/gateway/tgsettle/config.yml index e3208662..32a8072a 100644 --- a/api/gateway/tgsettle/config.yml +++ b/api/gateway/tgsettle/config.yml @@ -40,3 +40,4 @@ gateway: target_chat_id_env: TGSETTLE_GATEWAY_CHAT_ID timeout_seconds: 259200 accepted_user_ids: [] + success_reaction: "\U0001FAE1" diff --git a/api/gateway/tgsettle/internal/server/internal/serverimp.go b/api/gateway/tgsettle/internal/server/internal/serverimp.go index ee43abd8..a114be45 100644 --- a/api/gateway/tgsettle/internal/server/internal/serverimp.go +++ b/api/gateway/tgsettle/internal/server/internal/serverimp.go @@ -35,10 +35,11 @@ type config struct { } type gatewayConfig struct { - Rail string `yaml:"rail"` - TargetChatIDEnv string `yaml:"target_chat_id_env"` - TimeoutSeconds int32 `yaml:"timeout_seconds"` - AcceptedUserIDs []string `yaml:"accepted_user_ids"` + Rail string `yaml:"rail"` + TargetChatIDEnv string `yaml:"target_chat_id_env"` + TimeoutSeconds int32 `yaml:"timeout_seconds"` + AcceptedUserIDs []string `yaml:"accepted_user_ids"` + SuccessReaction string `yaml:"success_reaction"` } func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) { @@ -94,6 +95,7 @@ func (i *Imp) Start() error { TargetChatIDEnv: cfg.Gateway.TargetChatIDEnv, TimeoutSeconds: cfg.Gateway.TimeoutSeconds, AcceptedUserIDs: cfg.Gateway.AcceptedUserIDs, + SuccessReaction: cfg.Gateway.SuccessReaction, InvokeURI: invokeURI, } svc := gateway.NewService(logger, repo, producer, broker, gwCfg) diff --git a/api/gateway/tgsettle/internal/service/gateway/service.go b/api/gateway/tgsettle/internal/service/gateway/service.go index 463cc784..bb77e175 100644 --- a/api/gateway/tgsettle/internal/service/gateway/service.go +++ b/api/gateway/tgsettle/internal/service/gateway/service.go @@ -5,7 +5,7 @@ import ( "errors" "os" "strings" - "sync" + "time" "github.com/tech/sendico/gateway/tgsettle/storage" storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" @@ -36,8 +36,7 @@ import ( const ( defaultConfirmationTimeoutSeconds = 259200 - executedStatus = "executed" - telegramSuccessReaction = "\u2705" + defaultTelegramSuccessReaction = "\U0001FAE1" ) const ( @@ -54,22 +53,22 @@ type Config struct { TargetChatIDEnv string TimeoutSeconds int32 AcceptedUserIDs []string + SuccessReaction string InvokeURI string } type Service struct { - logger mlogger.Logger - repo storage.Repository - producer msg.Producer - broker mb.Broker - cfg Config - rail string - chatID string - announcer *discovery.Announcer - invokeURI string + logger mlogger.Logger + repo storage.Repository + producer msg.Producer + broker mb.Broker + cfg Config + rail string + chatID string + announcer *discovery.Announcer + invokeURI string + successReaction string - mu sync.Mutex - pending map[string]*model.PaymentGatewayIntent consumers []msg.Consumer connectorv1.UnimplementedConnectorServiceServer @@ -88,9 +87,12 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro cfg: cfg, rail: strings.TrimSpace(cfg.Rail), invokeURI: strings.TrimSpace(cfg.InvokeURI), - pending: map[string]*model.PaymentGatewayIntent{}, } svc.chatID = strings.TrimSpace(readEnv(cfg.TargetChatIDEnv)) + svc.successReaction = strings.TrimSpace(cfg.SuccessReaction) + if svc.successReaction == "" { + svc.successReaction = defaultTelegramSuccessReaction + } svc.startConsumers() svc.startAnnouncer() return svc @@ -186,8 +188,13 @@ func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransfe return nil, err } if existing != nil { - s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", strings.TrimSpace(existing.Status)))...) - return &chainv1.SubmitTransferResponse{Transfer: transferFromExecution(existing, req)}, nil + existing, err = s.expirePaymentIfNeeded(ctx, existing) + if err != nil { + s.logger.Warn("Submit transfer status refresh failed", append(logFields, zap.Error(err))...) + return nil, err + } + s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", string(paymentStatus(existing))))...) + return &chainv1.SubmitTransferResponse{Transfer: transferFromPayment(existing, req)}, nil } if err := s.onIntent(ctx, intent); err != nil { s.logger.Warn("Submit transfer intent handling failed", append(logFields, zap.Error(err))...) @@ -218,15 +225,16 @@ func (s *Service) GetTransfer(ctx context.Context, req *chainv1.GetTransferReque return nil, err } if existing != nil { + existing, err = s.expirePaymentIfNeeded(ctx, existing) + if err != nil { + s.logger.Warn("Get transfer status refresh failed", append(logFields, zap.Error(err))...) + return nil, err + } s.logger.Info("Get transfer resolved from execution", append(logFields, zap.String("payment_intent_id", strings.TrimSpace(existing.PaymentIntentID)), - zap.String("status", strings.TrimSpace(existing.Status)), + zap.String("status", string(paymentStatus(existing))), )...) - return &chainv1.GetTransferResponse{Transfer: transferFromExecution(existing, nil)}, nil - } - if s.hasPending(transferRef) { - s.logger.Info("Get transfer pending", logFields...) - return &chainv1.GetTransferResponse{Transfer: transferPending(transferRef)}, nil + return &chainv1.GetTransferResponse{Transfer: transferFromPayment(existing, nil)}, nil } s.logger.Warn("Get transfer not found", logFields...) return nil, status.Error(codes.NotFound, "transfer not found") @@ -255,28 +263,40 @@ func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayInte return merrors.Internal("payment gateway storage unavailable") } - existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, intent.IdempotencyKey) - if err != nil { - return err - } - if existing != nil { - s.logger.Info("Payment gateway intent already executed", - zap.String("idempotency_key", intent.IdempotencyKey), - zap.String("payment_intent_id", intent.PaymentIntentID), - zap.String("quote_ref", intent.QuoteRef), - zap.String("rail", intent.OutgoingLeg)) - return nil - } - confirmReq, err := s.buildConfirmationRequest(intent) if err != nil { s.logger.Warn("Failed to build confirmation request", zap.Error(err), zap.String("idempotency_key", intent.IdempotencyKey), zap.String("payment_intent_id", intent.PaymentIntentID)) return err } - if err := s.sendConfirmationRequest(confirmReq); err != nil { + + existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, confirmReq.RequestID) + if err != nil { + return err + } + if existing != nil { + existing, err = s.expirePaymentIfNeeded(ctx, existing) + if err != nil { + return err + } + s.logger.Info("Payment gateway intent already recorded", + zap.String("idempotency_key", confirmReq.RequestID), + zap.String("payment_intent_id", confirmReq.PaymentIntentID), + zap.String("quote_ref", confirmReq.QuoteRef), + zap.String("rail", confirmReq.Rail), + zap.String("status", string(paymentStatus(existing)))) + return nil + } + + record := paymentRecordFromIntent(intent, confirmReq) + if err := s.repo.Payments().Upsert(ctx, record); err != nil { + s.logger.Warn("Failed to persist pending payment", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID)) + return err + } + if err := s.sendConfirmationRequest(confirmReq); err != nil { + s.logger.Warn("Failed to publish confirmation request", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID)) + s.markPaymentExpired(ctx, record, time.Now()) return err } - s.trackIntent(confirmReq.RequestID, intent) return nil } @@ -290,44 +310,49 @@ func (s *Service) onConfirmationResult(ctx context.Context, result *model.Confir s.logger.Warn("Confirmation result rejected", zap.String("reason", "request_id is required")) return merrors.InvalidArgument("confirmation request_id is required", "request_id") } - intent := s.lookupIntent(requestID) - if intent == nil { - s.logger.Warn("Confirmation result ignored: intent not found", zap.String("request_id", requestID)) + record, err := s.loadPayment(ctx, requestID) + if err != nil { + s.logger.Warn("Confirmation result lookup failed", zap.Error(err), zap.String("request_id", requestID)) + return err + } + if record == nil { + s.logger.Warn("Confirmation result ignored: payment not found", zap.String("request_id", requestID)) return nil } if result.RawReply != nil && s.repo != nil && s.repo.TelegramConfirmations() != nil { if err := s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{ RequestID: requestID, - PaymentIntentID: intent.PaymentIntentID, - QuoteRef: intent.QuoteRef, + PaymentIntentID: record.PaymentIntentID, + QuoteRef: record.QuoteRef, RawReply: result.RawReply, }); err != nil { s.logger.Warn("Failed to store telegram confirmation", zap.Error(err), zap.String("request_id", requestID)) } else { s.logger.Info("Stored telegram confirmation", zap.String("request_id", requestID), - zap.String("payment_intent_id", intent.PaymentIntentID), + zap.String("payment_intent_id", record.PaymentIntentID), zap.String("reply_text", result.RawReply.Text), zap.String("reply_user_id", result.RawReply.FromUserID), zap.String("reply_user", result.RawReply.FromUsername)) } } - if result.Status == model.ConfirmationStatusConfirmed || result.Status == model.ConfirmationStatusClarified { - exec := &storagemodel.PaymentExecution{ - IdempotencyKey: intent.IdempotencyKey, - PaymentIntentID: intent.PaymentIntentID, - ExecutedMoney: result.Money, - QuoteRef: intent.QuoteRef, - Status: executedStatus, - } - if err := s.repo.Payments().InsertExecution(ctx, exec); err != nil && err != storage.ErrDuplicate { - return err - } + nextStatus := paymentStatusFromResult(result) + currentStatus := paymentStatus(record) + if currentStatus == storagemodel.PaymentStatusExecuted || currentStatus == storagemodel.PaymentStatusExpired { + s.logger.Info("Confirmation result ignored: payment already finalized", + zap.String("request_id", requestID), + zap.String("status", string(currentStatus))) + return nil + } + s.applyPaymentResult(record, nextStatus, result) + if err := s.repo.Payments().Upsert(ctx, record); err != nil { + s.logger.Warn("Failed to persist payment status", zap.Error(err), zap.String("request_id", requestID)) + return err } + intent := intentFromPayment(record) s.publishExecution(intent, result) s.publishTelegramReaction(result) - s.removeIntent(requestID) return nil } @@ -432,7 +457,7 @@ func (s *Service) publishTelegramReaction(result *model.ConfirmationResult) { RequestID: strings.TrimSpace(result.RequestID), ChatID: strings.TrimSpace(result.RawReply.ChatID), MessageID: strings.TrimSpace(result.RawReply.MessageID), - Emoji: telegramSuccessReaction, + Emoji: s.successReaction, } if request.ChatID == "" || request.MessageID == "" || request.Emoji == "" { return @@ -454,48 +479,55 @@ func (s *Service) publishTelegramReaction(result *model.ConfirmationResult) { zap.String("emoji", request.Emoji)) } -func (s *Service) trackIntent(requestID string, intent *model.PaymentGatewayIntent) { - if s == nil || intent == nil { - return +func (s *Service) loadPayment(ctx context.Context, requestID string) (*storagemodel.PaymentRecord, error) { + if s == nil || s.repo == nil || s.repo.Payments() == nil { + return nil, merrors.Internal("payment gateway storage unavailable") } requestID = strings.TrimSpace(requestID) if requestID == "" { - return + return nil, merrors.InvalidArgument("request_id is required", "request_id") } - s.mu.Lock() - s.pending[requestID] = intent - s.mu.Unlock() + return s.repo.Payments().FindByIdempotencyKey(ctx, requestID) } -func (s *Service) lookupIntent(requestID string) *model.PaymentGatewayIntent { - requestID = strings.TrimSpace(requestID) - if requestID == "" { - return nil +func (s *Service) expirePaymentIfNeeded(ctx context.Context, record *storagemodel.PaymentRecord) (*storagemodel.PaymentRecord, error) { + if record == nil { + return nil, nil } - s.mu.Lock() - defer s.mu.Unlock() - return s.pending[requestID] + status := paymentStatus(record) + if status != storagemodel.PaymentStatusPending { + return record, nil + } + if record.ExpiresAt.IsZero() { + return record, nil + } + if time.Now().Before(record.ExpiresAt) { + return record, nil + } + record.Status = storagemodel.PaymentStatusExpired + if record.ExpiredAt.IsZero() { + record.ExpiredAt = time.Now() + } + if s != nil && s.repo != nil && s.repo.Payments() != nil { + if err := s.repo.Payments().Upsert(ctx, record); err != nil { + return record, err + } + } + return record, nil } -func (s *Service) removeIntent(requestID string) { - requestID = strings.TrimSpace(requestID) - if requestID == "" { +func (s *Service) markPaymentExpired(ctx context.Context, record *storagemodel.PaymentRecord, when time.Time) { + if record == nil || s == nil || s.repo == nil || s.repo.Payments() == nil { return } - s.mu.Lock() - delete(s.pending, requestID) - s.mu.Unlock() -} - -func (s *Service) hasPending(requestID string) bool { - requestID = strings.TrimSpace(requestID) - if requestID == "" { - return false + if when.IsZero() { + when = time.Now() + } + record.Status = storagemodel.PaymentStatusExpired + record.ExpiredAt = when + if err := s.repo.Payments().Upsert(ctx, record); err != nil { + s.logger.Warn("Failed to mark payment as expired", zap.Error(err), zap.String("request_id", record.IdempotencyKey)) } - s.mu.Lock() - defer s.mu.Unlock() - _, ok := s.pending[requestID] - return ok } func (s *Service) startAnnouncer() { @@ -533,6 +565,91 @@ func normalizeIntent(intent *model.PaymentGatewayIntent) *model.PaymentGatewayIn return &cp } +func paymentStatus(record *storagemodel.PaymentRecord) storagemodel.PaymentStatus { + if record == nil { + return storagemodel.PaymentStatusPending + } + if record.Status != "" { + return record.Status + } + if record.ExecutedMoney != nil || !record.ExecutedAt.IsZero() { + return storagemodel.PaymentStatusExecuted + } + return storagemodel.PaymentStatusPending +} + +func paymentStatusFromResult(result *model.ConfirmationResult) storagemodel.PaymentStatus { + if result == nil { + return storagemodel.PaymentStatusPending + } + switch result.Status { + case model.ConfirmationStatusConfirmed, model.ConfirmationStatusClarified: + return storagemodel.PaymentStatusExecuted + case model.ConfirmationStatusTimeout, model.ConfirmationStatusRejected: + return storagemodel.PaymentStatusExpired + default: + return storagemodel.PaymentStatusPending + } +} + +func (s *Service) applyPaymentResult(record *storagemodel.PaymentRecord, status storagemodel.PaymentStatus, result *model.ConfirmationResult) { + if record == nil { + return + } + record.Status = status + switch status { + case storagemodel.PaymentStatusExecuted: + record.ExecutedMoney = result.Money + if record.ExecutedAt.IsZero() { + record.ExecutedAt = time.Now() + } + case storagemodel.PaymentStatusExpired: + if record.ExpiredAt.IsZero() { + record.ExpiredAt = time.Now() + } + } +} + +func paymentRecordFromIntent(intent *model.PaymentGatewayIntent, confirmReq *model.ConfirmationRequest) *storagemodel.PaymentRecord { + record := &storagemodel.PaymentRecord{ + Status: storagemodel.PaymentStatusPending, + } + if intent != nil { + record.IdempotencyKey = strings.TrimSpace(intent.IdempotencyKey) + record.PaymentIntentID = strings.TrimSpace(intent.PaymentIntentID) + record.QuoteRef = strings.TrimSpace(intent.QuoteRef) + record.OutgoingLeg = strings.TrimSpace(intent.OutgoingLeg) + record.TargetChatID = strings.TrimSpace(intent.TargetChatID) + record.RequestedMoney = intent.RequestedMoney + } + if confirmReq != nil { + record.IdempotencyKey = strings.TrimSpace(confirmReq.RequestID) + record.PaymentIntentID = strings.TrimSpace(confirmReq.PaymentIntentID) + record.QuoteRef = strings.TrimSpace(confirmReq.QuoteRef) + record.OutgoingLeg = strings.TrimSpace(confirmReq.Rail) + record.TargetChatID = strings.TrimSpace(confirmReq.TargetChatID) + record.RequestedMoney = confirmReq.RequestedMoney + if confirmReq.TimeoutSeconds > 0 { + record.ExpiresAt = time.Now().Add(time.Duration(confirmReq.TimeoutSeconds) * time.Second) + } + } + return record +} + +func intentFromPayment(record *storagemodel.PaymentRecord) *model.PaymentGatewayIntent { + if record == nil { + return nil + } + return &model.PaymentGatewayIntent{ + PaymentIntentID: strings.TrimSpace(record.PaymentIntentID), + IdempotencyKey: strings.TrimSpace(record.IdempotencyKey), + OutgoingLeg: strings.TrimSpace(record.OutgoingLeg), + QuoteRef: strings.TrimSpace(record.QuoteRef), + RequestedMoney: record.RequestedMoney, + TargetChatID: strings.TrimSpace(record.TargetChatID), + } +} + func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, defaultChatID string) (*model.PaymentGatewayIntent, error) { if req == nil { return nil, merrors.InvalidArgument("submit_transfer: request is required") @@ -603,22 +720,27 @@ func transferFromRequest(req *chainv1.SubmitTransferRequest) *chainv1.Transfer { } } -func transferFromExecution(exec *storagemodel.PaymentExecution, req *chainv1.SubmitTransferRequest) *chainv1.Transfer { - if exec == nil { +func transferFromPayment(record *storagemodel.PaymentRecord, req *chainv1.SubmitTransferRequest) *chainv1.Transfer { + if record == nil { return nil } var requested *moneyv1.Money if req != nil && req.GetAmount() != nil { requested = req.GetAmount() + } else { + requested = moneyFromPayment(record.RequestedMoney) } - net := moneyFromPayment(exec.ExecutedMoney) - status := chainv1.TransferStatus_TRANSFER_CONFIRMED - if strings.TrimSpace(exec.Status) != "" && !strings.EqualFold(exec.Status, executedStatus) { - status = chainv1.TransferStatus_TRANSFER_PENDING + net := moneyFromPayment(record.ExecutedMoney) + status := chainv1.TransferStatus_TRANSFER_SUBMITTED + switch paymentStatus(record) { + case storagemodel.PaymentStatusExecuted: + status = chainv1.TransferStatus_TRANSFER_CONFIRMED + case storagemodel.PaymentStatusExpired: + status = chainv1.TransferStatus_TRANSFER_CANCELLED } transfer := &chainv1.Transfer{ - TransferRef: strings.TrimSpace(exec.IdempotencyKey), - IdempotencyKey: strings.TrimSpace(exec.IdempotencyKey), + TransferRef: strings.TrimSpace(record.IdempotencyKey), + IdempotencyKey: strings.TrimSpace(record.IdempotencyKey), RequestedAmount: requested, NetAmount: net, Status: status, @@ -628,26 +750,20 @@ func transferFromExecution(exec *storagemodel.PaymentExecution, req *chainv1.Sub transfer.SourceWalletRef = strings.TrimSpace(req.GetSourceWalletRef()) transfer.Destination = req.GetDestination() } - if !exec.ExecutedAt.IsZero() { - ts := timestamppb.New(exec.ExecutedAt) + if !record.ExecutedAt.IsZero() { + ts := timestamppb.New(record.ExecutedAt) transfer.CreatedAt = ts transfer.UpdatedAt = ts + } else if !record.UpdatedAt.IsZero() { + ts := timestamppb.New(record.UpdatedAt) + transfer.UpdatedAt = ts + if !record.CreatedAt.IsZero() { + transfer.CreatedAt = timestamppb.New(record.CreatedAt) + } } return transfer } -func transferPending(requestID string) *chainv1.Transfer { - ref := strings.TrimSpace(requestID) - if ref == "" { - return nil - } - return &chainv1.Transfer{ - TransferRef: ref, - IdempotencyKey: ref, - Status: chainv1.TransferStatus_TRANSFER_SUBMITTED, - } -} - func moneyFromPayment(m *paymenttypes.Money) *moneyv1.Money { if m == nil { return nil diff --git a/api/gateway/tgsettle/internal/service/gateway/service_test.go b/api/gateway/tgsettle/internal/service/gateway/service_test.go index 3d5d4448..e7a27e45 100644 --- a/api/gateway/tgsettle/internal/service/gateway/service_test.go +++ b/api/gateway/tgsettle/internal/service/gateway/service_test.go @@ -19,26 +19,23 @@ import ( ) type fakePaymentsStore struct { - mu sync.Mutex - executions map[string]*storagemodel.PaymentExecution + mu sync.Mutex + records map[string]*storagemodel.PaymentRecord } -func (f *fakePaymentsStore) FindByIdempotencyKey(_ context.Context, key string) (*storagemodel.PaymentExecution, error) { +func (f *fakePaymentsStore) FindByIdempotencyKey(_ context.Context, key string) (*storagemodel.PaymentRecord, error) { f.mu.Lock() defer f.mu.Unlock() - return f.executions[key], nil + return f.records[key], nil } -func (f *fakePaymentsStore) InsertExecution(_ context.Context, exec *storagemodel.PaymentExecution) error { +func (f *fakePaymentsStore) Upsert(_ context.Context, record *storagemodel.PaymentRecord) error { f.mu.Lock() defer f.mu.Unlock() - if f.executions == nil { - f.executions = map[string]*storagemodel.PaymentExecution{} + if f.records == nil { + f.records = map[string]*storagemodel.PaymentRecord{} } - if _, ok := f.executions[exec.IdempotencyKey]; ok { - return storage.ErrDuplicate - } - f.executions[exec.IdempotencyKey] = exec + f.records[record.IdempotencyKey] = record return nil } @@ -147,6 +144,16 @@ func TestOnIntentCreatesConfirmationRequest(t *testing.T) { if req.SourceService != string(mservice.PaymentGateway) || req.Rail != "card" { t.Fatalf("unexpected source/rail: %#v", req) } + record := repo.payments.records["idem-1"] + if record == nil { + t.Fatalf("expected pending payment to be stored") + } + if record.Status != storagemodel.PaymentStatusPending { + t.Fatalf("expected pending status, got %q", record.Status) + } + if record.RequestedMoney == nil || record.RequestedMoney.Amount != "10.50" { + t.Fatalf("requested money not stored correctly: %#v", record.RequestedMoney) + } } func TestIntentFromSubmitTransferUsesSourceMoney(t *testing.T) { @@ -180,7 +187,14 @@ func TestConfirmationResultPersistsExecutionAndReply(t *testing.T) { OutgoingLeg: "card", RequestedMoney: &paymenttypes.Money{Amount: "5", Currency: "EUR"}, } - svc.trackIntent("idem-2", intent) + _ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{ + IdempotencyKey: "idem-2", + PaymentIntentID: intent.PaymentIntentID, + QuoteRef: intent.QuoteRef, + OutgoingLeg: intent.OutgoingLeg, + RequestedMoney: intent.RequestedMoney, + Status: storagemodel.PaymentStatusPending, + }) result := &model.ConfirmationResult{ RequestID: "idem-2", @@ -191,11 +205,15 @@ func TestConfirmationResultPersistsExecutionAndReply(t *testing.T) { if err := svc.onConfirmationResult(context.Background(), result); err != nil { t.Fatalf("onConfirmationResult error: %v", err) } - if repo.payments.executions["idem-2"] == nil { - t.Fatalf("expected payment execution to be stored") + record := repo.payments.records["idem-2"] + if record == nil { + t.Fatalf("expected payment record to be stored") } - if repo.payments.executions["idem-2"].ExecutedMoney == nil || repo.payments.executions["idem-2"].ExecutedMoney.Amount != "5" { - t.Fatalf("executed money not stored correctly") + if record.Status != storagemodel.PaymentStatusExecuted { + t.Fatalf("expected executed status, got %q", record.Status) + } + if record.ExecutedMoney == nil || record.ExecutedMoney.Amount != "5" { + t.Fatalf("executed money not stored correctly: %#v", record.ExecutedMoney) } if repo.tg.records["idem-2"] == nil || repo.tg.records["idem-2"].RawReply.Text != "5 EUR" { t.Fatalf("telegram reply not stored correctly") @@ -214,7 +232,14 @@ func TestClarifiedResultPersistsExecution(t *testing.T) { OutgoingLeg: "card", RequestedMoney: &paymenttypes.Money{Amount: "12", Currency: "USD"}, } - svc.trackIntent("idem-clarified", intent) + _ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{ + IdempotencyKey: "idem-clarified", + PaymentIntentID: intent.PaymentIntentID, + QuoteRef: intent.QuoteRef, + OutgoingLeg: intent.OutgoingLeg, + RequestedMoney: intent.RequestedMoney, + Status: storagemodel.PaymentStatusPending, + }) result := &model.ConfirmationResult{ RequestID: "idem-clarified", @@ -225,15 +250,16 @@ func TestClarifiedResultPersistsExecution(t *testing.T) { if err := svc.onConfirmationResult(context.Background(), result); err != nil { t.Fatalf("onConfirmationResult error: %v", err) } - if repo.payments.executions["idem-clarified"] == nil { - t.Fatalf("expected payment execution to be stored") + record := repo.payments.records["idem-clarified"] + if record == nil || record.Status != storagemodel.PaymentStatusExecuted { + t.Fatalf("expected payment executed status, got %#v", record) } } func TestIdempotencyPreventsDuplicateWrites(t *testing.T) { logger := mloggerfactory.NewLogger(false) - repo := &fakeRepo{payments: &fakePaymentsStore{executions: map[string]*storagemodel.PaymentExecution{ - "idem-3": {IdempotencyKey: "idem-3"}, + repo := &fakeRepo{payments: &fakePaymentsStore{records: map[string]*storagemodel.PaymentRecord{ + "idem-3": {IdempotencyKey: "idem-3", Status: storagemodel.PaymentStatusPending}, }}, tg: &fakeTelegramStore{}} prod := &captureProducer{} svc := NewService(logger, repo, prod, nil, Config{Rail: "card"}) @@ -265,7 +291,14 @@ func TestTimeoutDoesNotPersistExecution(t *testing.T) { OutgoingLeg: "card", RequestedMoney: &paymenttypes.Money{Amount: "8", Currency: "USD"}, } - svc.trackIntent("idem-4", intent) + _ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{ + IdempotencyKey: "idem-4", + PaymentIntentID: intent.PaymentIntentID, + QuoteRef: intent.QuoteRef, + OutgoingLeg: intent.OutgoingLeg, + RequestedMoney: intent.RequestedMoney, + Status: storagemodel.PaymentStatusPending, + }) result := &model.ConfirmationResult{ RequestID: "idem-4", @@ -274,8 +307,9 @@ func TestTimeoutDoesNotPersistExecution(t *testing.T) { if err := svc.onConfirmationResult(context.Background(), result); err != nil { t.Fatalf("onConfirmationResult error: %v", err) } - if repo.payments.executions["idem-4"] != nil { - t.Fatalf("expected no execution record for timeout") + record := repo.payments.records["idem-4"] + if record == nil || record.Status != storagemodel.PaymentStatusExpired { + t.Fatalf("expected expired status for timeout, got %#v", record) } } @@ -291,7 +325,14 @@ func TestRejectedDoesNotPersistExecution(t *testing.T) { OutgoingLeg: "card", RequestedMoney: &paymenttypes.Money{Amount: "3", Currency: "USD"}, } - svc.trackIntent("idem-reject", intent) + _ = repo.payments.Upsert(context.Background(), &storagemodel.PaymentRecord{ + IdempotencyKey: "idem-reject", + PaymentIntentID: intent.PaymentIntentID, + QuoteRef: intent.QuoteRef, + OutgoingLeg: intent.OutgoingLeg, + RequestedMoney: intent.RequestedMoney, + Status: storagemodel.PaymentStatusPending, + }) result := &model.ConfirmationResult{ RequestID: "idem-reject", @@ -301,8 +342,9 @@ func TestRejectedDoesNotPersistExecution(t *testing.T) { if err := svc.onConfirmationResult(context.Background(), result); err != nil { t.Fatalf("onConfirmationResult error: %v", err) } - if repo.payments.executions["idem-reject"] != nil { - t.Fatalf("expected no execution record for rejection") + record := repo.payments.records["idem-reject"] + if record == nil || record.Status != storagemodel.PaymentStatusExpired { + t.Fatalf("expected expired status for rejection, got %#v", record) } if repo.tg.records["idem-reject"] == nil { t.Fatalf("expected raw reply to be stored for rejection") diff --git a/api/gateway/tgsettle/storage/model/execution.go b/api/gateway/tgsettle/storage/model/execution.go index 672e4f40..600cc861 100644 --- a/api/gateway/tgsettle/storage/model/execution.go +++ b/api/gateway/tgsettle/storage/model/execution.go @@ -8,21 +8,36 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) -type PaymentExecution struct { - ID primitive.ObjectID `bson:"_id,omitempty" json:"id"` - IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` +type PaymentStatus string + +const ( + PaymentStatusPending PaymentStatus = "pending" + PaymentStatusExpired PaymentStatus = "expired" + PaymentStatusExecuted PaymentStatus = "executed" +) + +type PaymentRecord struct { + ID primitive.ObjectID `bson:"_id,omitempty" json:"id"` + IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` - ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"` - QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` - ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` - Status string `bson:"status,omitempty" json:"status,omitempty"` + QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` + OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"` + TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"` + RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"` + ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"` + Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"` + CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"` + UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"` + ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` + ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"` + ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"` } type TelegramConfirmation struct { - ID primitive.ObjectID `bson:"_id,omitempty" json:"id"` - RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"` - PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` - QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` + ID primitive.ObjectID `bson:"_id,omitempty" json:"id"` + RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"` + PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` + QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` RawReply *model.TelegramMessage `bson:"rawReply,omitempty" json:"raw_reply,omitempty"` - ReceivedAt time.Time `bson:"receivedAt,omitempty" json:"received_at,omitempty"` + ReceivedAt time.Time `bson:"receivedAt,omitempty" json:"received_at,omitempty"` } diff --git a/api/gateway/tgsettle/storage/mongo/store/payments.go b/api/gateway/tgsettle/storage/mongo/store/payments.go index 4f567dac..53cd50a5 100644 --- a/api/gateway/tgsettle/storage/mongo/store/payments.go +++ b/api/gateway/tgsettle/storage/mongo/store/payments.go @@ -13,13 +13,15 @@ import ( "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" ) const ( - paymentsCollection = "payments" - fieldIdempotencyKey = "idempotencyKey" + paymentsCollection = "payments" + fieldIdempotencyKey = "idempotencyKey" ) type Payments struct { @@ -53,44 +55,53 @@ func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) { return p, nil } -func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentExecution, error) { +func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error) { key = strings.TrimSpace(key) if key == "" { return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key") } - var result model.PaymentExecution + var result model.PaymentRecord err := p.coll.FindOne(ctx, bson.M{fieldIdempotencyKey: key}).Decode(&result) if err == mongo.ErrNoDocuments { return nil, nil } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { - p.logger.Warn("Payment execution lookup failed", zap.String("idempotency_key", key), zap.Error(err)) + p.logger.Warn("Payment record lookup failed", zap.String("idempotency_key", key), zap.Error(err)) } return nil, err } return &result, nil } -func (p *Payments) InsertExecution(ctx context.Context, exec *model.PaymentExecution) error { - if exec == nil { - return merrors.InvalidArgument("payment execution is nil", "execution") +func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) error { + if record == nil { + return merrors.InvalidArgument("payment record is nil", "record") } - exec.IdempotencyKey = strings.TrimSpace(exec.IdempotencyKey) - exec.PaymentIntentID = strings.TrimSpace(exec.PaymentIntentID) - exec.QuoteRef = strings.TrimSpace(exec.QuoteRef) - if exec.ExecutedAt.IsZero() { - exec.ExecutedAt = time.Now() + record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey) + record.PaymentIntentID = strings.TrimSpace(record.PaymentIntentID) + record.QuoteRef = strings.TrimSpace(record.QuoteRef) + record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg) + record.TargetChatID = strings.TrimSpace(record.TargetChatID) + if record.IdempotencyKey == "" { + return merrors.InvalidArgument("idempotency key is required", "idempotency_key") } - if _, err := p.coll.InsertOne(ctx, exec); err != nil { - if mongo.IsDuplicateKeyError(err) { - return storage.ErrDuplicate - } + now := time.Now() + if record.CreatedAt.IsZero() { + record.CreatedAt = now + } + record.UpdatedAt = now + record.ID = primitive.NilObjectID + update := bson.M{ + "$set": record, + } + _, err := p.coll.UpdateOne(ctx, bson.M{fieldIdempotencyKey: record.IdempotencyKey}, update, options.Update().SetUpsert(true)) + if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { - p.logger.Warn("Failed to insert payment execution", - zap.String("idempotency_key", exec.IdempotencyKey), - zap.String("payment_intent_id", exec.PaymentIntentID), - zap.String("quote_ref", exec.QuoteRef), + p.logger.Warn("Failed to upsert payment record", + zap.String("idempotency_key", record.IdempotencyKey), + zap.String("payment_intent_id", record.PaymentIntentID), + zap.String("quote_ref", record.QuoteRef), zap.Error(err)) } return err diff --git a/api/gateway/tgsettle/storage/storage.go b/api/gateway/tgsettle/storage/storage.go index 38741d1a..3001ee0e 100644 --- a/api/gateway/tgsettle/storage/storage.go +++ b/api/gateway/tgsettle/storage/storage.go @@ -15,8 +15,8 @@ type Repository interface { } type PaymentsStore interface { - FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentExecution, error) - InsertExecution(ctx context.Context, exec *model.PaymentExecution) error + FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error) + Upsert(ctx context.Context, record *model.PaymentRecord) error } type TelegramConfirmationsStore interface {