refactored notificatoin / tgsettle responsibility boundaries
This commit is contained in:
@@ -38,6 +38,6 @@ messaging:
|
||||
gateway:
|
||||
rail: "provider_settlement"
|
||||
target_chat_id_env: TGSETTLE_GATEWAY_CHAT_ID
|
||||
timeout_seconds: 259200
|
||||
timeout_seconds: 345600
|
||||
accepted_user_ids: []
|
||||
success_reaction: "\U0001FAE1"
|
||||
|
||||
@@ -38,6 +38,6 @@ messaging:
|
||||
gateway:
|
||||
rail: "provider_settlement"
|
||||
target_chat_id_env: TGSETTLE_GATEWAY_CHAT_ID
|
||||
timeout_seconds: 259200
|
||||
timeout_seconds: 345600
|
||||
accepted_user_ids: []
|
||||
success_reaction: "\U0001FAE1"
|
||||
|
||||
@@ -0,0 +1,370 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
confirmations "github.com/tech/sendico/pkg/messaging/notifications/confirmations"
|
||||
tnotifications "github.com/tech/sendico/pkg/messaging/notifications/telegram"
|
||||
"github.com/tech/sendico/pkg/model"
|
||||
"github.com/tech/sendico/pkg/mservice"
|
||||
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var amountPattern = regexp.MustCompile(`^[0-9]+(\.[0-9]+)?$`)
|
||||
var currencyPattern = regexp.MustCompile(`^[A-Za-z]{3,10}$`)
|
||||
|
||||
func (s *Service) startConfirmationTimeoutWatcher() {
|
||||
if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil {
|
||||
return
|
||||
}
|
||||
if s.timeoutCancel != nil {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.timeoutCtx = ctx
|
||||
s.timeoutCancel = cancel
|
||||
s.timeoutWG.Add(1)
|
||||
go func() {
|
||||
defer s.timeoutWG.Done()
|
||||
ticker := time.NewTicker(defaultConfirmationSweepInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.sweepExpiredConfirmations(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Service) sweepExpiredConfirmations(ctx context.Context) {
|
||||
if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil {
|
||||
return
|
||||
}
|
||||
expired, err := s.repo.PendingConfirmations().ListExpired(ctx, time.Now(), 100)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to list expired pending confirmations", zap.Error(err))
|
||||
return
|
||||
}
|
||||
for _, pending := range expired {
|
||||
if pending == nil || strings.TrimSpace(pending.RequestID) == "" {
|
||||
continue
|
||||
}
|
||||
result := &model.ConfirmationResult{
|
||||
RequestID: pending.RequestID,
|
||||
Status: model.ConfirmationStatusTimeout,
|
||||
}
|
||||
if err := s.publishPendingConfirmationResult(pending, result); err != nil {
|
||||
s.logger.Warn("Failed to publish timeout confirmation result", zap.Error(err), zap.String("request_id", pending.RequestID))
|
||||
continue
|
||||
}
|
||||
if err := s.clearPendingConfirmation(ctx, pending.RequestID); err != nil {
|
||||
s.logger.Warn("Failed to remove expired pending confirmation", zap.Error(err), zap.String("request_id", pending.RequestID))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) persistPendingConfirmation(ctx context.Context, request *model.ConfirmationRequest) error {
|
||||
if request == nil {
|
||||
return merrors.InvalidArgument("confirmation request is nil", "request")
|
||||
}
|
||||
if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil {
|
||||
return merrors.Internal("pending confirmations store unavailable")
|
||||
}
|
||||
timeout := request.TimeoutSeconds
|
||||
if timeout <= 0 {
|
||||
timeout = int32(defaultConfirmationTimeoutSeconds)
|
||||
}
|
||||
pending := &storagemodel.PendingConfirmation{
|
||||
RequestID: strings.TrimSpace(request.RequestID),
|
||||
TargetChatID: strings.TrimSpace(request.TargetChatID),
|
||||
AcceptedUserIDs: normalizeStringList(request.AcceptedUserIDs),
|
||||
RequestedMoney: request.RequestedMoney,
|
||||
SourceService: strings.TrimSpace(request.SourceService),
|
||||
Rail: strings.TrimSpace(request.Rail),
|
||||
ExpiresAt: time.Now().Add(time.Duration(timeout) * time.Second),
|
||||
}
|
||||
return s.repo.PendingConfirmations().Upsert(ctx, pending)
|
||||
}
|
||||
|
||||
func (s *Service) clearPendingConfirmation(ctx context.Context, requestID string) error {
|
||||
if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil {
|
||||
return nil
|
||||
}
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return nil
|
||||
}
|
||||
return s.repo.PendingConfirmations().DeleteByRequestID(ctx, requestID)
|
||||
}
|
||||
|
||||
func (s *Service) onConfirmationDispatch(ctx context.Context, dispatch *model.ConfirmationRequestDispatch) error {
|
||||
if dispatch == nil {
|
||||
return merrors.InvalidArgument("confirmation dispatch is nil", "dispatch")
|
||||
}
|
||||
if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil {
|
||||
return merrors.Internal("pending confirmations store unavailable")
|
||||
}
|
||||
requestID := strings.TrimSpace(dispatch.RequestID)
|
||||
messageID := strings.TrimSpace(dispatch.MessageID)
|
||||
if requestID == "" {
|
||||
return merrors.InvalidArgument("confirmation request_id is required", "request_id")
|
||||
}
|
||||
if messageID == "" {
|
||||
return merrors.InvalidArgument("confirmation message_id is required", "message_id")
|
||||
}
|
||||
if err := s.repo.PendingConfirmations().AttachMessage(ctx, requestID, messageID); err != nil {
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
s.logger.Info("Confirmation dispatch ignored: pending request not found",
|
||||
zap.String("request_id", requestID),
|
||||
zap.String("message_id", messageID))
|
||||
return nil
|
||||
}
|
||||
s.logger.Warn("Failed to attach confirmation message id", zap.Error(err), zap.String("request_id", requestID), zap.String("message_id", messageID))
|
||||
return err
|
||||
}
|
||||
s.logger.Info("Pending confirmation message attached", zap.String("request_id", requestID), zap.String("message_id", messageID))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) onTelegramUpdate(ctx context.Context, update *model.TelegramWebhookUpdate) error {
|
||||
if update == nil || update.Message == nil {
|
||||
return nil
|
||||
}
|
||||
if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil {
|
||||
return merrors.Internal("pending confirmations store unavailable")
|
||||
}
|
||||
message := update.Message
|
||||
replyToID := strings.TrimSpace(message.ReplyToMessageID)
|
||||
if replyToID == "" {
|
||||
return nil
|
||||
}
|
||||
pending, err := s.repo.PendingConfirmations().FindByMessageID(ctx, replyToID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if pending == nil {
|
||||
s.logger.Debug("Telegram reply ignored: no pending confirmation for message", zap.String("reply_to_message_id", replyToID), zap.Int64("update_id", update.UpdateID))
|
||||
return nil
|
||||
}
|
||||
|
||||
if !pending.ExpiresAt.IsZero() && time.Now().After(pending.ExpiresAt) {
|
||||
result := &model.ConfirmationResult{
|
||||
RequestID: pending.RequestID,
|
||||
Status: model.ConfirmationStatusTimeout,
|
||||
}
|
||||
if err := s.publishPendingConfirmationResult(pending, result); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.clearPendingConfirmation(ctx, pending.RequestID)
|
||||
}
|
||||
|
||||
if strings.TrimSpace(message.ChatID) != strings.TrimSpace(pending.TargetChatID) {
|
||||
s.logger.Debug("Telegram reply ignored: chat mismatch",
|
||||
zap.String("request_id", pending.RequestID),
|
||||
zap.String("expected_chat_id", pending.TargetChatID),
|
||||
zap.String("chat_id", strings.TrimSpace(message.ChatID)))
|
||||
return nil
|
||||
}
|
||||
|
||||
if !isUserAllowed(message.FromUserID, pending.AcceptedUserIDs) {
|
||||
result := &model.ConfirmationResult{
|
||||
RequestID: pending.RequestID,
|
||||
Status: model.ConfirmationStatusRejected,
|
||||
ParseError: "unauthorized_user",
|
||||
RawReply: message,
|
||||
}
|
||||
if err := s.publishPendingConfirmationResult(pending, result); err != nil {
|
||||
return err
|
||||
}
|
||||
_ = s.sendTelegramText(ctx, &model.TelegramTextRequest{
|
||||
RequestID: pending.RequestID,
|
||||
ChatID: pending.TargetChatID,
|
||||
ReplyToMessageID: message.MessageID,
|
||||
Text: "Only approved users can confirm this payment.",
|
||||
})
|
||||
return s.clearPendingConfirmation(ctx, pending.RequestID)
|
||||
}
|
||||
|
||||
money, reason, err := parseConfirmationReply(message.Text)
|
||||
if err != nil {
|
||||
if markErr := s.repo.PendingConfirmations().MarkClarified(ctx, pending.RequestID); markErr != nil {
|
||||
s.logger.Warn("Failed to mark confirmation as clarified", zap.Error(markErr), zap.String("request_id", pending.RequestID))
|
||||
}
|
||||
_ = s.sendTelegramText(ctx, &model.TelegramTextRequest{
|
||||
RequestID: pending.RequestID,
|
||||
ChatID: pending.TargetChatID,
|
||||
ReplyToMessageID: message.MessageID,
|
||||
Text: clarificationMessage(reason),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
status := model.ConfirmationStatusConfirmed
|
||||
if pending.Clarified {
|
||||
status = model.ConfirmationStatusClarified
|
||||
}
|
||||
result := &model.ConfirmationResult{
|
||||
RequestID: pending.RequestID,
|
||||
Money: money,
|
||||
RawReply: message,
|
||||
Status: status,
|
||||
}
|
||||
if err := s.publishPendingConfirmationResult(pending, result); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.clearPendingConfirmation(ctx, pending.RequestID)
|
||||
}
|
||||
|
||||
func (s *Service) publishPendingConfirmationResult(pending *storagemodel.PendingConfirmation, result *model.ConfirmationResult) error {
|
||||
if pending == nil || result == nil {
|
||||
return merrors.InvalidArgument("pending confirmation context is required")
|
||||
}
|
||||
if s == nil || s.producer == nil {
|
||||
return merrors.Internal("messaging producer is not configured")
|
||||
}
|
||||
sourceService := strings.TrimSpace(pending.SourceService)
|
||||
if sourceService == "" {
|
||||
sourceService = string(mservice.PaymentGateway)
|
||||
}
|
||||
rail := strings.TrimSpace(pending.Rail)
|
||||
if rail == "" {
|
||||
rail = s.rail
|
||||
}
|
||||
env := confirmations.ConfirmationResult(string(mservice.PaymentGateway), result, sourceService, rail)
|
||||
if err := s.producer.SendMessage(env); err != nil {
|
||||
s.logger.Warn("Failed to publish confirmation result", zap.Error(err),
|
||||
zap.String("request_id", strings.TrimSpace(result.RequestID)),
|
||||
zap.String("status", string(result.Status)),
|
||||
zap.String("source_service", sourceService),
|
||||
zap.String("rail", rail))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) sendTelegramText(ctx context.Context, request *model.TelegramTextRequest) error {
|
||||
if request == nil {
|
||||
return merrors.InvalidArgument("telegram text request is nil", "request")
|
||||
}
|
||||
if s == nil || s.producer == nil {
|
||||
return merrors.Internal("messaging producer is not configured")
|
||||
}
|
||||
request.ChatID = strings.TrimSpace(request.ChatID)
|
||||
request.Text = strings.TrimSpace(request.Text)
|
||||
request.ReplyToMessageID = strings.TrimSpace(request.ReplyToMessageID)
|
||||
if request.ChatID == "" || request.Text == "" {
|
||||
return merrors.InvalidArgument("telegram chat_id and text are required", "chat_id", "text")
|
||||
}
|
||||
env := tnotifications.TelegramText(string(mservice.PaymentGateway), request)
|
||||
if err := s.producer.SendMessage(env); err != nil {
|
||||
s.logger.Warn("Failed to publish telegram text request", zap.Error(err),
|
||||
zap.String("request_id", request.RequestID),
|
||||
zap.String("chat_id", request.ChatID),
|
||||
zap.String("reply_to_message_id", request.ReplyToMessageID))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isFinalConfirmationStatus(status model.ConfirmationStatus) bool {
|
||||
switch status {
|
||||
case model.ConfirmationStatusConfirmed, model.ConfirmationStatusRejected, model.ConfirmationStatusTimeout, model.ConfirmationStatusClarified:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func isUserAllowed(userID string, allowed []string) bool {
|
||||
allowed = normalizeStringList(allowed)
|
||||
if len(allowed) == 0 {
|
||||
return true
|
||||
}
|
||||
userID = strings.TrimSpace(userID)
|
||||
if userID == "" {
|
||||
return false
|
||||
}
|
||||
for _, id := range allowed {
|
||||
if id == userID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func parseConfirmationReply(text string) (*paymenttypes.Money, string, error) {
|
||||
text = strings.TrimSpace(text)
|
||||
if text == "" {
|
||||
return nil, "empty", merrors.InvalidArgument("empty reply")
|
||||
}
|
||||
parts := strings.Fields(text)
|
||||
if len(parts) < 2 {
|
||||
if len(parts) == 1 && amountPattern.MatchString(parts[0]) {
|
||||
return nil, "missing_currency", merrors.InvalidArgument("currency is required")
|
||||
}
|
||||
return nil, "missing_amount", merrors.InvalidArgument("amount is required")
|
||||
}
|
||||
if len(parts) > 2 {
|
||||
return nil, "format", merrors.InvalidArgument("reply format is invalid")
|
||||
}
|
||||
amount := parts[0]
|
||||
currency := parts[1]
|
||||
if !amountPattern.MatchString(amount) {
|
||||
return nil, "invalid_amount", merrors.InvalidArgument("amount format is invalid")
|
||||
}
|
||||
if !currencyPattern.MatchString(currency) {
|
||||
return nil, "invalid_currency", merrors.InvalidArgument("currency format is invalid")
|
||||
}
|
||||
return &paymenttypes.Money{
|
||||
Amount: amount,
|
||||
Currency: strings.ToUpper(currency),
|
||||
}, "", nil
|
||||
}
|
||||
|
||||
func clarificationMessage(reason string) string {
|
||||
switch reason {
|
||||
case "missing_currency":
|
||||
return "Currency code is required. Reply with \"<amount> <currency>\" (e.g., 12.34 USD)."
|
||||
case "missing_amount":
|
||||
return "Amount is required. Reply with \"<amount> <currency>\" (e.g., 12.34 USD)."
|
||||
case "invalid_amount":
|
||||
return "Amount must be a decimal number. Reply with \"<amount> <currency>\" (e.g., 12.34 USD)."
|
||||
case "invalid_currency":
|
||||
return "Currency must be a code like USD or EUR. Reply with \"<amount> <currency>\"."
|
||||
default:
|
||||
return "Reply with \"<amount> <currency>\" (e.g., 12.34 USD)."
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeStringList(values []string) []string {
|
||||
if len(values) == 0 {
|
||||
return nil
|
||||
}
|
||||
result := make([]string, 0, len(values))
|
||||
seen := map[string]struct{}{}
|
||||
for _, value := range values {
|
||||
value = strings.TrimSpace(value)
|
||||
if value == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[value]; ok {
|
||||
continue
|
||||
}
|
||||
seen[value] = struct{}{}
|
||||
result = append(result, value)
|
||||
}
|
||||
if len(result) == 0 {
|
||||
return nil
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
gatewayoutbox "github.com/tech/sendico/gateway/common/outbox"
|
||||
@@ -36,8 +37,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultConfirmationTimeoutSeconds = 259200
|
||||
defaultConfirmationTimeoutSeconds = 345600
|
||||
defaultTelegramSuccessReaction = "\U0001FAE1"
|
||||
defaultConfirmationSweepInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -73,7 +75,10 @@ type Service struct {
|
||||
successReaction string
|
||||
outbox gatewayoutbox.ReliableRuntime
|
||||
|
||||
consumers []msg.Consumer
|
||||
consumers []msg.Consumer
|
||||
timeoutCtx context.Context
|
||||
timeoutCancel context.CancelFunc
|
||||
timeoutWG sync.WaitGroup
|
||||
|
||||
connectorv1.UnimplementedConnectorServiceServer
|
||||
}
|
||||
@@ -103,6 +108,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
|
||||
}
|
||||
svc.startConsumers()
|
||||
svc.startAnnouncer()
|
||||
svc.startConfirmationTimeoutWatcher()
|
||||
return svc
|
||||
}
|
||||
|
||||
@@ -125,6 +131,10 @@ func (s *Service) Shutdown() {
|
||||
consumer.Close()
|
||||
}
|
||||
}
|
||||
if s.timeoutCancel != nil {
|
||||
s.timeoutCancel()
|
||||
}
|
||||
s.timeoutWG.Wait()
|
||||
}
|
||||
|
||||
func (s *Service) startConsumers() {
|
||||
@@ -136,6 +146,10 @@ func (s *Service) startConsumers() {
|
||||
}
|
||||
resultProcessor := confirmations.NewConfirmationResultProcessor(s.logger, string(mservice.PaymentGateway), s.rail, s.onConfirmationResult)
|
||||
s.consumeProcessor(resultProcessor)
|
||||
dispatchProcessor := confirmations.NewConfirmationDispatchProcessor(s.logger, string(mservice.PaymentGateway), s.rail, s.onConfirmationDispatch)
|
||||
s.consumeProcessor(dispatchProcessor)
|
||||
updateProcessor := tnotifications.NewTelegramUpdateProcessor(s.logger, s.onTelegramUpdate)
|
||||
s.consumeProcessor(updateProcessor)
|
||||
}
|
||||
|
||||
func (s *Service) consumeProcessor(processor np.EnvelopeProcessor) {
|
||||
@@ -300,8 +314,13 @@ func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayInte
|
||||
zap.String("idempotency_key", confirmReq.RequestID), zap.String("intent_ref", record.IntentRef))
|
||||
return err
|
||||
}
|
||||
if err := s.persistPendingConfirmation(ctx, confirmReq); err != nil {
|
||||
s.logger.Warn("Failed to persist pending confirmation", zap.Error(err), zap.String("request_id", confirmReq.RequestID))
|
||||
return err
|
||||
}
|
||||
if err := s.sendConfirmationRequest(confirmReq); err != nil {
|
||||
s.logger.Warn("Failed to publish confirmation request", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID))
|
||||
_ = s.clearPendingConfirmation(ctx, confirmReq.RequestID)
|
||||
// If the confirmation request was not sent, we keep the record in waiting
|
||||
// (or it can be marked as failed — depending on your semantics).
|
||||
// Here, failed is chosen to avoid it hanging indefinitely.
|
||||
@@ -392,6 +411,10 @@ func (s *Service) onConfirmationResult(ctx context.Context, result *model.Confir
|
||||
return err
|
||||
}
|
||||
|
||||
if isFinalConfirmationStatus(result.Status) {
|
||||
_ = s.clearPendingConfirmation(ctx, requestID)
|
||||
}
|
||||
|
||||
s.publishTelegramReaction(result)
|
||||
|
||||
return nil
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/tgsettle/storage"
|
||||
storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model"
|
||||
@@ -61,6 +62,7 @@ func (f *fakeTelegramStore) Upsert(_ context.Context, record *storagemodel.Teleg
|
||||
type fakeRepo struct {
|
||||
payments *fakePaymentsStore
|
||||
tg *fakeTelegramStore
|
||||
pending *fakePendingStore
|
||||
}
|
||||
|
||||
func (f *fakeRepo) Payments() storage.PaymentsStore {
|
||||
@@ -71,6 +73,93 @@ func (f *fakeRepo) TelegramConfirmations() storage.TelegramConfirmationsStore {
|
||||
return f.tg
|
||||
}
|
||||
|
||||
func (f *fakeRepo) PendingConfirmations() storage.PendingConfirmationsStore {
|
||||
return f.pending
|
||||
}
|
||||
|
||||
type fakePendingStore struct {
|
||||
mu sync.Mutex
|
||||
records map[string]*storagemodel.PendingConfirmation
|
||||
}
|
||||
|
||||
func (f *fakePendingStore) Upsert(_ context.Context, record *storagemodel.PendingConfirmation) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if f.records == nil {
|
||||
f.records = map[string]*storagemodel.PendingConfirmation{}
|
||||
}
|
||||
cp := *record
|
||||
f.records[record.RequestID] = &cp
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakePendingStore) FindByRequestID(_ context.Context, requestID string) (*storagemodel.PendingConfirmation, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if f.records == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return f.records[requestID], nil
|
||||
}
|
||||
|
||||
func (f *fakePendingStore) FindByMessageID(_ context.Context, messageID string) (*storagemodel.PendingConfirmation, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
for _, record := range f.records {
|
||||
if record != nil && record.MessageID == messageID {
|
||||
return record, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *fakePendingStore) MarkClarified(_ context.Context, requestID string) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if record := f.records[requestID]; record != nil {
|
||||
record.Clarified = true
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakePendingStore) AttachMessage(_ context.Context, requestID string, messageID string) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if record := f.records[requestID]; record != nil {
|
||||
if record.MessageID == "" {
|
||||
record.MessageID = messageID
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakePendingStore) DeleteByRequestID(_ context.Context, requestID string) error {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
delete(f.records, requestID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakePendingStore) ListExpired(_ context.Context, now time.Time, limit int64) ([]*storagemodel.PendingConfirmation, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
result := make([]*storagemodel.PendingConfirmation, 0)
|
||||
for _, record := range f.records {
|
||||
if record == nil || record.ExpiresAt.IsZero() || record.ExpiresAt.After(now) {
|
||||
continue
|
||||
}
|
||||
cp := *record
|
||||
result = append(result, &cp)
|
||||
if int64(len(result)) >= limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
//
|
||||
// FAKE BROKER (ОБЯЗАТЕЛЕН ДЛЯ СЕРВИСА)
|
||||
//
|
||||
@@ -119,6 +208,7 @@ func newTestService(_ *testing.T) (*Service, *fakeRepo, *captureProducer) {
|
||||
repo := &fakeRepo{
|
||||
payments: &fakePaymentsStore{},
|
||||
tg: &fakeTelegramStore{},
|
||||
pending: &fakePendingStore{},
|
||||
}
|
||||
|
||||
sigEnv := tnotifications.TelegramReaction(string(mservice.PaymentGateway), &model.TelegramReactionRequest{
|
||||
|
||||
@@ -48,3 +48,18 @@ type TelegramConfirmation struct {
|
||||
RawReply *model.TelegramMessage `bson:"rawReply,omitempty" json:"raw_reply,omitempty"`
|
||||
ReceivedAt time.Time `bson:"receivedAt,omitempty" json:"received_at,omitempty"`
|
||||
}
|
||||
|
||||
type PendingConfirmation struct {
|
||||
ID bson.ObjectID `bson:"_id,omitempty" json:"id"`
|
||||
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
|
||||
MessageID string `bson:"messageId,omitempty" json:"message_id,omitempty"`
|
||||
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
|
||||
AcceptedUserIDs []string `bson:"acceptedUserIds,omitempty" json:"accepted_user_ids,omitempty"`
|
||||
RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"`
|
||||
SourceService string `bson:"sourceService,omitempty" json:"source_service,omitempty"`
|
||||
Rail string `bson:"rail,omitempty" json:"rail,omitempty"`
|
||||
Clarified bool `bson:"clarified,omitempty" json:"clarified,omitempty"`
|
||||
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
|
||||
CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"`
|
||||
UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"`
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ type Repository struct {
|
||||
|
||||
payments storage.PaymentsStore
|
||||
tg storage.TelegramConfirmationsStore
|
||||
pending storage.PendingConfirmationsStore
|
||||
outbox gatewayoutbox.Store
|
||||
}
|
||||
|
||||
@@ -68,6 +69,11 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) {
|
||||
result.logger.Error("Failed to initialise telegram confirmations store", zap.Error(err), zap.String("store", "telegram_confirmations"))
|
||||
return nil, err
|
||||
}
|
||||
pendingStore, err := store.NewPendingConfirmations(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise pending confirmations store", zap.Error(err), zap.String("store", "pending_confirmations"))
|
||||
return nil, err
|
||||
}
|
||||
outboxStore, err := gatewayoutbox.NewMongoStore(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise outbox store", zap.Error(err), zap.String("store", "outbox"))
|
||||
@@ -75,6 +81,7 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) {
|
||||
}
|
||||
result.payments = paymentsStore
|
||||
result.tg = tgStore
|
||||
result.pending = pendingStore
|
||||
result.outbox = outboxStore
|
||||
result.logger.Info("Payment gateway MongoDB storage initialised")
|
||||
return result, nil
|
||||
@@ -88,6 +95,10 @@ func (r *Repository) TelegramConfirmations() storage.TelegramConfirmationsStore
|
||||
return r.tg
|
||||
}
|
||||
|
||||
func (r *Repository) PendingConfirmations() storage.PendingConfirmationsStore {
|
||||
return r.pending
|
||||
}
|
||||
|
||||
func (r *Repository) Outbox() gatewayoutbox.Store {
|
||||
return r.outbox
|
||||
}
|
||||
|
||||
@@ -0,0 +1,240 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/tgsettle/storage"
|
||||
"github.com/tech/sendico/gateway/tgsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/db/repository"
|
||||
ri "github.com/tech/sendico/pkg/db/repository/index"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
pendingConfirmationsCollection = "pending_confirmations"
|
||||
fieldPendingRequestID = "requestId"
|
||||
fieldPendingMessageID = "messageId"
|
||||
fieldPendingExpiresAt = "expiresAt"
|
||||
)
|
||||
|
||||
type PendingConfirmations struct {
|
||||
logger mlogger.Logger
|
||||
coll *mongo.Collection
|
||||
}
|
||||
|
||||
func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*PendingConfirmations, error) {
|
||||
if db == nil {
|
||||
return nil, merrors.InvalidArgument("mongo database is nil")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
logger = logger.Named("pending_confirmations").With(zap.String("collection", pendingConfirmationsCollection))
|
||||
|
||||
repo := repository.CreateMongoRepository(db, pendingConfirmationsCollection)
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldPendingRequestID, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create pending confirmations request_id index", zap.Error(err), zap.String("index_field", fieldPendingRequestID))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldPendingMessageID, Sort: ri.Asc}},
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create pending confirmations message_id index", zap.Error(err), zap.String("index_field", fieldPendingMessageID))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldPendingExpiresAt, Sort: ri.Asc}},
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create pending confirmations expires_at index", zap.Error(err), zap.String("index_field", fieldPendingExpiresAt))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := &PendingConfirmations{
|
||||
logger: logger,
|
||||
coll: db.Collection(pendingConfirmationsCollection),
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) Upsert(ctx context.Context, record *model.PendingConfirmation) error {
|
||||
if record == nil {
|
||||
return merrors.InvalidArgument("pending confirmation is nil", "record")
|
||||
}
|
||||
record.RequestID = strings.TrimSpace(record.RequestID)
|
||||
record.MessageID = strings.TrimSpace(record.MessageID)
|
||||
record.TargetChatID = strings.TrimSpace(record.TargetChatID)
|
||||
record.SourceService = strings.TrimSpace(record.SourceService)
|
||||
record.Rail = strings.TrimSpace(record.Rail)
|
||||
if record.RequestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
if record.TargetChatID == "" {
|
||||
return merrors.InvalidArgument("target_chat_id is required", "target_chat_id")
|
||||
}
|
||||
if record.ExpiresAt.IsZero() {
|
||||
return merrors.InvalidArgument("expires_at is required", "expires_at")
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
createdAt := record.CreatedAt
|
||||
if createdAt.IsZero() {
|
||||
createdAt = now
|
||||
}
|
||||
record.UpdatedAt = now
|
||||
record.CreatedAt = createdAt
|
||||
record.ID = bson.NilObjectID
|
||||
|
||||
// Explicit map avoids accidentally overriding immutable fields from stale callers.
|
||||
update := bson.M{
|
||||
"$set": bson.M{
|
||||
"messageId": record.MessageID,
|
||||
"targetChatId": record.TargetChatID,
|
||||
"acceptedUserIds": record.AcceptedUserIDs,
|
||||
"requestedMoney": record.RequestedMoney,
|
||||
"sourceService": record.SourceService,
|
||||
"rail": record.Rail,
|
||||
"clarified": record.Clarified,
|
||||
"expiresAt": record.ExpiresAt,
|
||||
"updatedAt": record.UpdatedAt,
|
||||
},
|
||||
"$setOnInsert": bson.M{
|
||||
"createdAt": createdAt,
|
||||
},
|
||||
}
|
||||
|
||||
_, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: record.RequestID}, update, options.UpdateOne().SetUpsert(true))
|
||||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) FindByRequestID(ctx context.Context, requestID string) (*model.PendingConfirmation, error) {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
var result model.PendingConfirmation
|
||||
err := p.coll.FindOne(ctx, bson.M{fieldPendingRequestID: requestID}).Decode(&result)
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) FindByMessageID(ctx context.Context, messageID string) (*model.PendingConfirmation, error) {
|
||||
messageID = strings.TrimSpace(messageID)
|
||||
if messageID == "" {
|
||||
return nil, merrors.InvalidArgument("message_id is required", "message_id")
|
||||
}
|
||||
var result model.PendingConfirmation
|
||||
err := p.coll.FindOne(ctx, bson.M{fieldPendingMessageID: messageID}).Decode(&result)
|
||||
if err == mongo.ErrNoDocuments {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) MarkClarified(ctx context.Context, requestID string) error {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
_, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: requestID}, bson.M{
|
||||
"$set": bson.M{
|
||||
"clarified": true,
|
||||
"updatedAt": time.Now(),
|
||||
},
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) AttachMessage(ctx context.Context, requestID string, messageID string) error {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
messageID = strings.TrimSpace(messageID)
|
||||
if requestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
if messageID == "" {
|
||||
return merrors.InvalidArgument("message_id is required", "message_id")
|
||||
}
|
||||
|
||||
filter := bson.M{
|
||||
fieldPendingRequestID: requestID,
|
||||
"$or": []bson.M{
|
||||
{fieldPendingMessageID: bson.M{"$exists": false}},
|
||||
{fieldPendingMessageID: ""},
|
||||
{fieldPendingMessageID: messageID},
|
||||
},
|
||||
}
|
||||
res, err := p.coll.UpdateOne(ctx, filter, bson.M{
|
||||
"$set": bson.M{
|
||||
fieldPendingMessageID: messageID,
|
||||
"updatedAt": time.Now(),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.MatchedCount == 0 {
|
||||
return merrors.NoData("pending confirmation not found")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) DeleteByRequestID(ctx context.Context, requestID string) error {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
_, err := p.coll.DeleteOne(ctx, bson.M{fieldPendingRequestID: requestID})
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) {
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
filter := bson.M{
|
||||
fieldPendingExpiresAt: bson.M{"$lte": now},
|
||||
}
|
||||
opts := options.Find().SetLimit(limit).SetSort(bson.D{{Key: fieldPendingExpiresAt, Value: 1}})
|
||||
|
||||
cursor, err := p.coll.Find(ctx, filter, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer cursor.Close(ctx)
|
||||
|
||||
result := make([]*model.PendingConfirmation, 0)
|
||||
for cursor.Next(ctx) {
|
||||
var next model.PendingConfirmation
|
||||
if err := cursor.Decode(&next); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, &next)
|
||||
}
|
||||
if err := cursor.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
var _ storage.PendingConfirmationsStore = (*PendingConfirmations)(nil)
|
||||
@@ -2,6 +2,7 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/tgsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
@@ -12,6 +13,7 @@ var ErrDuplicate = merrors.DataConflict("payment gateway storage: duplicate reco
|
||||
type Repository interface {
|
||||
Payments() PaymentsStore
|
||||
TelegramConfirmations() TelegramConfirmationsStore
|
||||
PendingConfirmations() PendingConfirmationsStore
|
||||
}
|
||||
|
||||
type PaymentsStore interface {
|
||||
@@ -22,3 +24,13 @@ type PaymentsStore interface {
|
||||
type TelegramConfirmationsStore interface {
|
||||
Upsert(ctx context.Context, record *model.TelegramConfirmation) error
|
||||
}
|
||||
|
||||
type PendingConfirmationsStore interface {
|
||||
Upsert(ctx context.Context, record *model.PendingConfirmation) error
|
||||
FindByRequestID(ctx context.Context, requestID string) (*model.PendingConfirmation, error)
|
||||
FindByMessageID(ctx context.Context, messageID string) (*model.PendingConfirmation, error)
|
||||
MarkClarified(ctx context.Context, requestID string) error
|
||||
AttachMessage(ctx context.Context, requestID string, messageID string) error
|
||||
DeleteByRequestID(ctx context.Context, requestID string) error
|
||||
ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user