package gateway import ( "context" "errors" "regexp" "strings" "time" storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" "github.com/tech/sendico/pkg/merrors" confirmations "github.com/tech/sendico/pkg/messaging/notifications/confirmations" tnotifications "github.com/tech/sendico/pkg/messaging/notifications/telegram" "github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/mservice" paymenttypes "github.com/tech/sendico/pkg/payments/types" "go.uber.org/zap" ) var amountPattern = regexp.MustCompile(`^[0-9]+(\.[0-9]+)?$`) var currencyPattern = regexp.MustCompile(`^[A-Za-z]{3,10}$`) func (s *Service) startConfirmationTimeoutWatcher() { if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil { return } if s.timeoutCancel != nil { return } ctx, cancel := context.WithCancel(context.Background()) s.timeoutCtx = ctx s.timeoutCancel = cancel s.timeoutWG.Add(1) go func() { defer s.timeoutWG.Done() ticker := time.NewTicker(defaultConfirmationSweepInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.sweepExpiredConfirmations(ctx) } } }() } func (s *Service) sweepExpiredConfirmations(ctx context.Context) { if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil { return } expired, err := s.repo.PendingConfirmations().ListExpired(ctx, time.Now(), 100) if err != nil { s.logger.Warn("Failed to list expired pending confirmations", zap.Error(err)) return } for i := range expired { pending := &expired[i] if strings.TrimSpace(pending.RequestID) == "" { continue } result := &model.ConfirmationResult{ RequestID: pending.RequestID, Status: model.ConfirmationStatusTimeout, } if err := s.publishPendingConfirmationResult(pending, result); err != nil { s.logger.Warn("Failed to publish timeout confirmation result", zap.Error(err), zap.String("request_id", pending.RequestID)) continue } if err := s.clearPendingConfirmation(ctx, pending.RequestID); err != nil { s.logger.Warn("Failed to remove expired pending confirmation", zap.Error(err), zap.String("request_id", pending.RequestID)) } } } func (s *Service) persistPendingConfirmation(ctx context.Context, request *model.ConfirmationRequest) error { if request == nil { return merrors.InvalidArgument("confirmation request is nil", "request") } if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil { return merrors.Internal("pending confirmations store unavailable") } timeout := request.TimeoutSeconds if timeout <= 0 { timeout = int32(defaultConfirmationTimeoutSeconds) } pending := &storagemodel.PendingConfirmation{ RequestID: strings.TrimSpace(request.RequestID), TargetChatID: strings.TrimSpace(request.TargetChatID), AcceptedUserIDs: normalizeStringList(request.AcceptedUserIDs), RequestedMoney: request.RequestedMoney, SourceService: strings.TrimSpace(request.SourceService), Rail: strings.TrimSpace(request.Rail), ExpiresAt: time.Now().Add(time.Duration(timeout) * time.Second), } return s.repo.PendingConfirmations().Upsert(ctx, pending) } func (s *Service) clearPendingConfirmation(ctx context.Context, requestID string) error { if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil { return nil } requestID = strings.TrimSpace(requestID) if requestID == "" { return nil } return s.repo.PendingConfirmations().DeleteByRequestID(ctx, requestID) } func (s *Service) onConfirmationDispatch(ctx context.Context, dispatch *model.ConfirmationRequestDispatch) error { if dispatch == nil { return merrors.InvalidArgument("confirmation dispatch is nil", "dispatch") } if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil { return merrors.Internal("pending confirmations store unavailable") } requestID := strings.TrimSpace(dispatch.RequestID) messageID := strings.TrimSpace(dispatch.MessageID) if requestID == "" { return merrors.InvalidArgument("confirmation request_id is required", "request_id") } if messageID == "" { return merrors.InvalidArgument("confirmation message_id is required", "message_id") } if err := s.repo.PendingConfirmations().AttachMessage(ctx, requestID, messageID); err != nil { if errors.Is(err, merrors.ErrNoData) { s.logger.Info("Confirmation dispatch ignored: pending request not found", zap.String("request_id", requestID), zap.String("message_id", messageID)) return nil } s.logger.Warn("Failed to attach confirmation message id", zap.Error(err), zap.String("request_id", requestID), zap.String("message_id", messageID)) return err } s.logger.Info("Pending confirmation message attached", zap.String("request_id", requestID), zap.String("message_id", messageID)) return nil } func (s *Service) onTelegramUpdate(ctx context.Context, update *model.TelegramWebhookUpdate) error { if update == nil || update.Message == nil { return nil } if s == nil || s.repo == nil || s.repo.PendingConfirmations() == nil { return merrors.Internal("pending confirmations store unavailable") } message := update.Message replyToID := strings.TrimSpace(message.ReplyToMessageID) if replyToID == "" { s.handleTreasuryTelegramUpdate(ctx, update) return nil } replyFields := telegramReplyLogFields(update) pending, err := s.repo.PendingConfirmations().FindByMessageID(ctx, replyToID) if err != nil { return err } if pending == nil { if s.handleTreasuryTelegramUpdate(ctx, update) { return nil } s.logger.Warn("Telegram confirmation reply dropped", append(replyFields, zap.String("outcome", "dropped"), zap.String("reason", "no_pending_confirmation"), )...) return nil } replyFields = append(replyFields, zap.String("request_id", strings.TrimSpace(pending.RequestID)), zap.String("target_chat_id", strings.TrimSpace(pending.TargetChatID)), ) if !pending.ExpiresAt.IsZero() && time.Now().After(pending.ExpiresAt) { result := &model.ConfirmationResult{ RequestID: pending.RequestID, Status: model.ConfirmationStatusTimeout, } if err := s.publishPendingConfirmationResult(pending, result); err != nil { return err } if err := s.clearPendingConfirmation(ctx, pending.RequestID); err != nil { return err } s.logger.Info("Telegram confirmation reply processed", append(replyFields, zap.String("outcome", "processed"), zap.String("result_status", string(result.Status)), zap.String("reason", "expired_confirmation"), )...) return nil } if strings.TrimSpace(message.ChatID) != strings.TrimSpace(pending.TargetChatID) { s.logger.Warn("Telegram confirmation reply dropped", append(replyFields, zap.String("outcome", "dropped"), zap.String("reason", "chat_mismatch"), zap.String("expected_chat_id", strings.TrimSpace(pending.TargetChatID)), )...) return nil } if !isUserAllowed(message.FromUserID, pending.AcceptedUserIDs) { result := &model.ConfirmationResult{ RequestID: pending.RequestID, Status: model.ConfirmationStatusRejected, ParseError: "unauthorized_user", RawReply: message, } if err := s.publishPendingConfirmationResult(pending, result); err != nil { return err } if e := s.sendTelegramText(ctx, &model.TelegramTextRequest{ RequestID: pending.RequestID, ChatID: pending.TargetChatID, ReplyToMessageID: message.MessageID, Text: "Only approved users can confirm this payment.", }); e != nil { s.logger.Warn("Failed to create telegram text", append(replyFields, zap.Error(err))...) } if err := s.clearPendingConfirmation(ctx, pending.RequestID); err != nil { return err } s.logger.Info("Telegram confirmation reply processed", append(replyFields, zap.String("outcome", "processed"), zap.String("result_status", string(result.Status)), zap.String("reason", "unauthorized_user"), )...) return nil } money, reason, err := parseConfirmationReply(message.Text) if err != nil { if markErr := s.repo.PendingConfirmations().MarkClarified(ctx, pending.RequestID); markErr != nil { s.logger.Warn("Failed to mark confirmation as clarified", zap.Error(markErr), zap.String("request_id", pending.RequestID)) } if e := s.sendTelegramText(ctx, &model.TelegramTextRequest{ RequestID: pending.RequestID, ChatID: pending.TargetChatID, ReplyToMessageID: message.MessageID, Text: clarificationMessage(reason), }); e != nil { s.logger.Warn("Failed to create telegram text", append(replyFields, zap.Error(err))...) } s.logger.Warn("Telegram confirmation reply dropped", append(replyFields, zap.String("outcome", "dropped"), zap.String("reason", "invalid_reply_format"), zap.String("parse_reason", reason), )...) return nil } status := model.ConfirmationStatusConfirmed if pending.Clarified { status = model.ConfirmationStatusClarified } result := &model.ConfirmationResult{ RequestID: pending.RequestID, Money: money, RawReply: message, Status: status, } if err := s.publishPendingConfirmationResult(pending, result); err != nil { return err } if err := s.clearPendingConfirmation(ctx, pending.RequestID); err != nil { return err } s.logger.Info("Telegram confirmation reply processed", append(replyFields, zap.String("outcome", "processed"), zap.String("result_status", string(result.Status)), )...) return nil } func (s *Service) handleTreasuryTelegramUpdate(ctx context.Context, update *model.TelegramWebhookUpdate) bool { if s == nil || s.treasury == nil || update == nil || update.Message == nil { return false } return s.treasury.HandleUpdate(ctx, update) } func telegramReplyLogFields(update *model.TelegramWebhookUpdate) []zap.Field { if update == nil || update.Message == nil { return nil } message := update.Message return []zap.Field{ zap.Int64("update_id", update.UpdateID), zap.String("message_id", strings.TrimSpace(message.MessageID)), zap.String("reply_to_message_id", strings.TrimSpace(message.ReplyToMessageID)), zap.String("chat_id", strings.TrimSpace(message.ChatID)), zap.String("from_user_id", strings.TrimSpace(message.FromUserID)), } } func (s *Service) publishPendingConfirmationResult(pending *storagemodel.PendingConfirmation, result *model.ConfirmationResult) error { if pending == nil || result == nil { return merrors.InvalidArgument("pending confirmation context is required") } if s == nil || s.producer == nil { return merrors.Internal("messaging producer is not configured") } sourceService := strings.TrimSpace(pending.SourceService) if sourceService == "" { sourceService = string(mservice.PaymentGateway) } rail := strings.TrimSpace(pending.Rail) if rail == "" { rail = s.rail } env := confirmations.ConfirmationResult(string(mservice.PaymentGateway), result, sourceService, rail) if err := s.producer.SendMessage(env); err != nil { s.logger.Warn("Failed to publish confirmation result", zap.Error(err), zap.String("request_id", strings.TrimSpace(result.RequestID)), zap.String("status", string(result.Status)), zap.String("source_service", sourceService), zap.String("rail", rail)) return err } return nil } func (s *Service) sendTelegramText(_ context.Context, request *model.TelegramTextRequest) error { if request == nil { return merrors.InvalidArgument("telegram text request is nil", "request") } if s == nil || s.producer == nil { return merrors.Internal("messaging producer is not configured") } request.ChatID = strings.TrimSpace(request.ChatID) request.Text = strings.TrimSpace(request.Text) request.ReplyToMessageID = strings.TrimSpace(request.ReplyToMessageID) if request.ChatID == "" || request.Text == "" { return merrors.InvalidArgument("telegram chat_id and text are required", "chat_id", "text") } env := tnotifications.TelegramText(string(mservice.PaymentGateway), request) if err := s.producer.SendMessage(env); err != nil { s.logger.Warn("Failed to publish telegram text request", zap.Error(err), zap.String("request_id", request.RequestID), zap.String("chat_id", request.ChatID), zap.String("reply_to_message_id", request.ReplyToMessageID)) return err } return nil } func isFinalConfirmationStatus(status model.ConfirmationStatus) bool { switch status { case model.ConfirmationStatusConfirmed, model.ConfirmationStatusRejected, model.ConfirmationStatusTimeout, model.ConfirmationStatusClarified: return true default: return false } } func isUserAllowed(userID string, allowed []string) bool { allowed = normalizeStringList(allowed) if len(allowed) == 0 { return true } userID = strings.TrimSpace(userID) if userID == "" { return false } for _, id := range allowed { if id == userID { return true } } return false } func parseConfirmationReply(text string) (*paymenttypes.Money, string, error) { text = strings.TrimSpace(text) if text == "" { return nil, "empty", merrors.InvalidArgument("empty reply") } parts := strings.Fields(text) if len(parts) < 2 { if len(parts) == 1 && amountPattern.MatchString(parts[0]) { return nil, "missing_currency", merrors.InvalidArgument("currency is required") } return nil, "missing_amount", merrors.InvalidArgument("amount is required") } if len(parts) > 2 { return nil, "format", merrors.InvalidArgument("reply format is invalid") } amount := parts[0] currency := parts[1] if !amountPattern.MatchString(amount) { return nil, "invalid_amount", merrors.InvalidArgument("amount format is invalid") } if !currencyPattern.MatchString(currency) { return nil, "invalid_currency", merrors.InvalidArgument("currency format is invalid") } return &paymenttypes.Money{ Amount: amount, Currency: strings.ToUpper(currency), }, "", nil } func clarificationMessage(reason string) string { switch reason { case "missing_currency": return "Currency code is required. Reply with \" \" (e.g., 12.34 USD)." case "missing_amount": return "Amount is required. Reply with \" \" (e.g., 12.34 USD)." case "invalid_amount": return "Amount must be a decimal number. Reply with \" \" (e.g., 12.34 USD)." case "invalid_currency": return "Currency must be a code like USD or EUR. Reply with \" \"." default: return "Reply with \" \" (e.g., 12.34 USD)." } } func normalizeStringList(values []string) []string { if len(values) == 0 { return nil } result := make([]string, 0, len(values)) seen := map[string]struct{}{} for _, value := range values { value = strings.TrimSpace(value) if value == "" { continue } if _, ok := seen[value]; ok { continue } seen[value] = struct{}{} result = append(result, value) } if len(result) == 0 { return nil } return result }