759 lines
27 KiB
Go
759 lines
27 KiB
Go
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
gatewayoutbox "github.com/tech/sendico/gateway/common/outbox"
|
|
"github.com/tech/sendico/gateway/tgsettle/storage"
|
|
storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model"
|
|
"github.com/tech/sendico/pkg/api/routers"
|
|
"github.com/tech/sendico/pkg/discovery"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
msg "github.com/tech/sendico/pkg/messaging"
|
|
mb "github.com/tech/sendico/pkg/messaging/broker"
|
|
cons "github.com/tech/sendico/pkg/messaging/consumer"
|
|
confirmations "github.com/tech/sendico/pkg/messaging/notifications/confirmations"
|
|
np "github.com/tech/sendico/pkg/messaging/notifications/processor"
|
|
tnotifications "github.com/tech/sendico/pkg/messaging/notifications/telegram"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/model"
|
|
pmodel "github.com/tech/sendico/pkg/model"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
|
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
|
|
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
|
|
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
|
|
"github.com/tech/sendico/pkg/server/grpcapp"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
const (
|
|
defaultConfirmationTimeoutSeconds = 345600
|
|
defaultTelegramSuccessReaction = "\U0001FAE1"
|
|
defaultConfirmationSweepInterval = 5 * time.Second
|
|
)
|
|
|
|
const (
|
|
metadataPaymentIntentID = "payment_intent_id"
|
|
metadataQuoteRef = "quote_ref"
|
|
metadataTargetChatID = "target_chat_id"
|
|
metadataOutgoingLeg = "outgoing_leg"
|
|
metadataSourceAmount = "source_amount"
|
|
metadataSourceCurrency = "source_currency"
|
|
)
|
|
|
|
type Config struct {
|
|
Rail string
|
|
TargetChatIDEnv string
|
|
TimeoutSeconds int32
|
|
AcceptedUserIDs []string
|
|
SuccessReaction string
|
|
InvokeURI string
|
|
MessagingSettings pmodel.SettingsT
|
|
}
|
|
|
|
type Service struct {
|
|
logger mlogger.Logger
|
|
repo storage.Repository
|
|
producer msg.Producer
|
|
broker mb.Broker
|
|
cfg Config
|
|
msgCfg pmodel.SettingsT
|
|
rail string
|
|
chatID string
|
|
announcer *discovery.Announcer
|
|
invokeURI string
|
|
successReaction string
|
|
outbox gatewayoutbox.ReliableRuntime
|
|
|
|
consumers []msg.Consumer
|
|
timeoutCtx context.Context
|
|
timeoutCancel context.CancelFunc
|
|
timeoutWG sync.WaitGroup
|
|
|
|
connectorv1.UnimplementedConnectorServiceServer
|
|
}
|
|
|
|
func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Producer, broker mb.Broker, cfg Config) *Service {
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
logger = logger.Named("service")
|
|
svc := &Service{
|
|
logger: logger,
|
|
repo: repo,
|
|
producer: producer,
|
|
broker: broker,
|
|
cfg: cfg,
|
|
msgCfg: cfg.MessagingSettings,
|
|
rail: discovery.NormalizeRail(cfg.Rail),
|
|
invokeURI: strings.TrimSpace(cfg.InvokeURI),
|
|
}
|
|
if svc.rail == "" {
|
|
svc.rail = strings.TrimSpace(cfg.Rail)
|
|
}
|
|
svc.chatID = strings.TrimSpace(readEnv(cfg.TargetChatIDEnv))
|
|
svc.successReaction = strings.TrimSpace(cfg.SuccessReaction)
|
|
if svc.successReaction == "" {
|
|
svc.successReaction = defaultTelegramSuccessReaction
|
|
}
|
|
if err := svc.startOutboxReliableProducer(); err != nil {
|
|
svc.logger.Warn("Failed to initialise outbox reliable producer", zap.Error(err))
|
|
}
|
|
svc.startConsumers()
|
|
svc.startAnnouncer()
|
|
svc.startConfirmationTimeoutWatcher()
|
|
return svc
|
|
}
|
|
|
|
func (s *Service) Register(router routers.GRPC) error {
|
|
return router.Register(func(reg grpc.ServiceRegistrar) {
|
|
connectorv1.RegisterConnectorServiceServer(reg, s)
|
|
})
|
|
}
|
|
|
|
func (s *Service) Shutdown() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
s.outbox.Stop()
|
|
if s.announcer != nil {
|
|
s.announcer.Stop()
|
|
}
|
|
for _, consumer := range s.consumers {
|
|
if consumer != nil {
|
|
consumer.Close()
|
|
}
|
|
}
|
|
if s.timeoutCancel != nil {
|
|
s.timeoutCancel()
|
|
}
|
|
s.timeoutWG.Wait()
|
|
}
|
|
|
|
func (s *Service) startConsumers() {
|
|
if s == nil || s.broker == nil {
|
|
if s != nil && s.logger != nil {
|
|
s.logger.Warn("Messaging broker not configured; confirmation flow disabled")
|
|
}
|
|
return
|
|
}
|
|
resultProcessor := confirmations.NewConfirmationResultProcessor(s.logger, string(mservice.PaymentGateway), s.rail, s.onConfirmationResult)
|
|
s.consumeProcessor(resultProcessor)
|
|
dispatchProcessor := confirmations.NewConfirmationDispatchProcessor(s.logger, string(mservice.PaymentGateway), s.rail, s.onConfirmationDispatch)
|
|
s.consumeProcessor(dispatchProcessor)
|
|
updateProcessor := tnotifications.NewTelegramUpdateProcessor(s.logger, s.onTelegramUpdate)
|
|
s.consumeProcessor(updateProcessor)
|
|
}
|
|
|
|
func (s *Service) consumeProcessor(processor np.EnvelopeProcessor) {
|
|
consumer, err := cons.NewConsumer(s.logger, s.broker, processor.GetSubject())
|
|
if err != nil {
|
|
s.logger.Warn("Failed to create messaging consumer", zap.Error(err), zap.String("event", processor.GetSubject().ToString()))
|
|
return
|
|
}
|
|
s.consumers = append(s.consumers, consumer)
|
|
go func() {
|
|
if err := consumer.ConsumeMessages(processor.Process); err != nil && !errors.Is(err, context.Canceled) {
|
|
s.logger.Warn("Messaging consumer stopped", zap.Error(err), zap.String("event", processor.GetSubject().ToString()))
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) {
|
|
if req == nil {
|
|
s.logger.Warn("Submit transfer rejected", zap.String("reason", "request is required"))
|
|
return nil, merrors.InvalidArgument("submit_transfer: request is required")
|
|
}
|
|
idempotencyKey := strings.TrimSpace(req.GetIdempotencyKey())
|
|
if idempotencyKey == "" {
|
|
s.logger.Warn("Submit transfer rejected", zap.String("reason", "idempotency_key is required"))
|
|
return nil, merrors.InvalidArgument("submit_transfer: idempotency_key is required")
|
|
}
|
|
amount := req.GetAmount()
|
|
if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" {
|
|
s.logger.Warn("Submit transfer rejected", zap.String("reason", "amount is required"), zap.String("idempotency_key", idempotencyKey))
|
|
return nil, merrors.InvalidArgument("submit_transfer: amount is required")
|
|
}
|
|
intent, err := intentFromSubmitTransfer(req, s.rail, s.chatID)
|
|
if err != nil {
|
|
s.logger.Warn("Submit transfer rejected", zap.Error(err), zap.String("idempotency_key", idempotencyKey))
|
|
return nil, err
|
|
}
|
|
logFields := []zap.Field{
|
|
zap.String("idempotency_key", intent.IdempotencyKey),
|
|
zap.String("payment_intent_id", intent.PaymentIntentID),
|
|
zap.String("quote_ref", intent.QuoteRef),
|
|
zap.String("rail", intent.OutgoingLeg),
|
|
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
|
|
zap.String("source_wallet_ref", strings.TrimSpace(req.GetSourceWalletRef())),
|
|
zap.String("operation_ref", strings.TrimSpace(req.GetOperationRef())),
|
|
zap.String("intent_ref", strings.TrimSpace(req.GetIntentRef())),
|
|
}
|
|
if intent.RequestedMoney != nil {
|
|
logFields = append(logFields,
|
|
zap.String("amount", strings.TrimSpace(intent.RequestedMoney.Amount)),
|
|
zap.String("currency", strings.TrimSpace(intent.RequestedMoney.Currency)),
|
|
)
|
|
}
|
|
logFields = append(logFields, transferDestinationLogFields(req.GetDestination())...)
|
|
if s.repo == nil || s.repo.Payments() == nil {
|
|
s.logger.Warn("Payment gateway storage unavailable", logFields...)
|
|
return nil, merrors.Internal("payment gateway storage unavailable")
|
|
}
|
|
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, idempotencyKey)
|
|
if err != nil {
|
|
s.logger.Warn("Submit transfer lookup failed", append(logFields, zap.Error(err))...)
|
|
return nil, err
|
|
}
|
|
if existing != nil {
|
|
s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", string(existing.Status)))...)
|
|
return &chainv1.SubmitTransferResponse{Transfer: transferFromPayment(existing, req)}, nil
|
|
}
|
|
if err := s.onIntent(ctx, intent); err != nil {
|
|
s.logger.Warn("Submit transfer intent handling failed", append(logFields, zap.Error(err))...)
|
|
return nil, err
|
|
}
|
|
s.logger.Info("Submit transfer accepted", logFields...)
|
|
return &chainv1.SubmitTransferResponse{Transfer: transferFromRequest(req)}, nil
|
|
}
|
|
|
|
func (s *Service) GetTransfer(ctx context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) {
|
|
if req == nil {
|
|
s.logger.Warn("Get transfer rejected", zap.String("reason", "request is required"))
|
|
return nil, merrors.InvalidArgument("get_transfer: request is required")
|
|
}
|
|
transferRef := strings.TrimSpace(req.GetTransferRef())
|
|
if transferRef == "" {
|
|
s.logger.Warn("Get transfer rejected", zap.String("reason", "transfer_ref is required"))
|
|
return nil, merrors.InvalidArgument("get_transfer: transfer_ref is required")
|
|
}
|
|
logFields := []zap.Field{zap.String("transfer_ref", transferRef)}
|
|
if s.repo == nil || s.repo.Payments() == nil {
|
|
s.logger.Warn("Payment gateway storage unavailable", logFields...)
|
|
return nil, merrors.Internal("payment gateway storage unavailable")
|
|
}
|
|
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, transferRef)
|
|
if err != nil {
|
|
s.logger.Warn("Get transfer lookup failed", append(logFields, zap.Error(err))...)
|
|
return nil, err
|
|
}
|
|
if existing != nil {
|
|
s.logger.Info("Get transfer resolved from execution", append(logFields,
|
|
zap.String("payment_intent_id", strings.TrimSpace(existing.PaymentIntentID)),
|
|
zap.String("status", string(existing.Status)),
|
|
)...)
|
|
return &chainv1.GetTransferResponse{Transfer: transferFromPayment(existing, nil)}, nil
|
|
}
|
|
s.logger.Warn("Get transfer not found", logFields...)
|
|
return nil, status.Error(codes.NotFound, "transfer not found")
|
|
}
|
|
|
|
func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayIntent) error {
|
|
if intent == nil {
|
|
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "intent is nil"))
|
|
return merrors.InvalidArgument("payment gateway intent is nil", "intent")
|
|
}
|
|
intent = normalizeIntent(intent)
|
|
if intent.IdempotencyKey == "" {
|
|
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "idempotency_key is required"))
|
|
return merrors.InvalidArgument("idempotency_key is required", "idempotency_key")
|
|
}
|
|
if intent.PaymentIntentID == "" {
|
|
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "payment_intent_id is required"), zap.String("idempotency_key", intent.IdempotencyKey))
|
|
return merrors.InvalidArgument("payment_intent_id is required", "payment_intent_id")
|
|
}
|
|
if intent.IntentRef == "" {
|
|
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "payment_intent_ref is required"), zap.String("idempotency_key", intent.IdempotencyKey))
|
|
return merrors.InvalidArgument("payment_intent_ref is required", "payment_intent_ref")
|
|
}
|
|
if intent.RequestedMoney == nil || strings.TrimSpace(intent.RequestedMoney.Amount) == "" || strings.TrimSpace(intent.RequestedMoney.Currency) == "" {
|
|
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "requested_money is required"), zap.String("idempotency_key", intent.IdempotencyKey))
|
|
return merrors.InvalidArgument("requested_money is required", "requested_money")
|
|
}
|
|
if intent.OperationRef == "" {
|
|
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "operation_ref is required"))
|
|
return merrors.InvalidArgument("operation_ref is required", "operation_ref")
|
|
}
|
|
if s.repo == nil || s.repo.Payments() == nil {
|
|
s.logger.Warn("Payment gateway storage unavailable", zap.String("idempotency_key", intent.IdempotencyKey))
|
|
return merrors.Internal("payment gateway storage unavailable")
|
|
}
|
|
|
|
confirmReq, err := s.buildConfirmationRequest(intent)
|
|
if err != nil {
|
|
s.logger.Warn("Failed to build confirmation request", zap.Error(err), zap.String("idempotency_key", intent.IdempotencyKey), zap.String("payment_intent_id", intent.PaymentIntentID))
|
|
return err
|
|
}
|
|
|
|
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, confirmReq.RequestID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if existing != nil {
|
|
s.logger.Info("Payment gateway intent already recorded",
|
|
zap.String("idempotency_key", confirmReq.RequestID),
|
|
zap.String("payment_intent_id", confirmReq.PaymentIntentID),
|
|
zap.String("quote_ref", confirmReq.QuoteRef),
|
|
zap.String("rail", confirmReq.Rail),
|
|
zap.String("status", string(existing.Status)),
|
|
zap.String("operation_ref", confirmReq.OperationRef),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
record := paymentRecordFromIntent(intent, confirmReq)
|
|
if err := s.updateTransferStatus(ctx, record); err != nil {
|
|
s.logger.Warn("Failed to persist payment record", zap.Error(err),
|
|
zap.String("idempotency_key", confirmReq.RequestID), zap.String("intent_ref", record.IntentRef))
|
|
return err
|
|
}
|
|
if err := s.persistPendingConfirmation(ctx, confirmReq); err != nil {
|
|
s.logger.Warn("Failed to persist pending confirmation", zap.Error(err), zap.String("request_id", confirmReq.RequestID))
|
|
return err
|
|
}
|
|
if err := s.sendConfirmationRequest(confirmReq); err != nil {
|
|
s.logger.Warn("Failed to publish confirmation request", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID))
|
|
_ = s.clearPendingConfirmation(ctx, confirmReq.RequestID)
|
|
// If the confirmation request was not sent, we keep the record in waiting
|
|
// (or it can be marked as failed — depending on your semantics).
|
|
// Here, failed is chosen to avoid it hanging indefinitely.
|
|
record.Status = storagemodel.PaymentStatusFailed
|
|
record.UpdatedAt = time.Now()
|
|
if e := s.updateTransferStatus(ctx, record); e != nil {
|
|
s.logger.Warn("Failed to update payment status change", zap.Error(e), zap.String("idempotency_key", confirmReq.RequestID))
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) onConfirmationResult(ctx context.Context, result *model.ConfirmationResult) error {
|
|
if result == nil {
|
|
return merrors.InvalidArgument("confirmation result is nil", "result")
|
|
}
|
|
|
|
requestID := strings.TrimSpace(result.RequestID)
|
|
if requestID == "" {
|
|
return merrors.InvalidArgument("confirmation request_id is required", "request_id")
|
|
}
|
|
|
|
record, err := s.loadPayment(ctx, requestID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if record == nil {
|
|
return nil
|
|
}
|
|
|
|
// Store raw reply for audit/debug purposes. This does NOT affect payment state.
|
|
if result.RawReply != nil && s.repo != nil && s.repo.TelegramConfirmations() != nil {
|
|
if e := s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{
|
|
RequestID: requestID,
|
|
PaymentIntentID: record.PaymentIntentID,
|
|
QuoteRef: record.QuoteRef,
|
|
RawReply: result.RawReply,
|
|
}); e != nil {
|
|
s.logger.Warn("Failed to store confirmation error", zap.Error(e),
|
|
zap.String("request_id", requestID),
|
|
zap.String("status", string(result.Status)))
|
|
}
|
|
}
|
|
|
|
// If the payment is already finalized — ignore the result.
|
|
switch record.Status {
|
|
case storagemodel.PaymentStatusSuccess,
|
|
storagemodel.PaymentStatusFailed,
|
|
storagemodel.PaymentStatusCancelled:
|
|
return nil
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
switch result.Status {
|
|
|
|
// FINAL: confirmation succeeded
|
|
case model.ConfirmationStatusConfirmed:
|
|
record.Status = storagemodel.PaymentStatusSuccess
|
|
record.ExecutedMoney = result.Money
|
|
if record.ExecutedAt.IsZero() {
|
|
record.ExecutedAt = now
|
|
}
|
|
record.UpdatedAt = now
|
|
|
|
// FINAL: confirmation rejected or timed out
|
|
case model.ConfirmationStatusRejected,
|
|
model.ConfirmationStatusTimeout:
|
|
record.Status = storagemodel.PaymentStatusFailed
|
|
record.UpdatedAt = now
|
|
|
|
// NOT FINAL: do absolutely nothing
|
|
case model.ConfirmationStatusClarified:
|
|
s.logger.Debug("Confirmation clarified — no state change",
|
|
zap.String("request_id", requestID))
|
|
return nil
|
|
|
|
default:
|
|
s.logger.Debug("Non-final confirmation status — ignored",
|
|
zap.String("request_id", requestID),
|
|
zap.String("status", string(result.Status)))
|
|
return nil
|
|
}
|
|
|
|
// The ONLY place where state is persisted and rail event may be emitted
|
|
if err := s.updateTransferStatus(ctx, record); err != nil {
|
|
return err
|
|
}
|
|
|
|
if isFinalConfirmationStatus(result.Status) {
|
|
_ = s.clearPendingConfirmation(ctx, requestID)
|
|
}
|
|
|
|
s.publishTelegramReaction(result)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) buildConfirmationRequest(intent *model.PaymentGatewayIntent) (*model.ConfirmationRequest, error) {
|
|
targetChatID := s.chatID
|
|
if targetChatID == "" {
|
|
return nil, merrors.InvalidArgument("target_chat_id is required", "target_chat_id")
|
|
}
|
|
rail := strings.TrimSpace(intent.OutgoingLeg)
|
|
if rail == "" {
|
|
rail = s.rail
|
|
}
|
|
timeout := s.cfg.TimeoutSeconds
|
|
if timeout <= 0 {
|
|
timeout = int32(defaultConfirmationTimeoutSeconds)
|
|
}
|
|
return &model.ConfirmationRequest{
|
|
RequestID: intent.IdempotencyKey,
|
|
TargetChatID: targetChatID,
|
|
RequestedMoney: intent.RequestedMoney,
|
|
PaymentIntentID: intent.PaymentIntentID,
|
|
QuoteRef: intent.QuoteRef,
|
|
AcceptedUserIDs: s.cfg.AcceptedUserIDs,
|
|
TimeoutSeconds: timeout,
|
|
SourceService: string(mservice.PaymentGateway),
|
|
Rail: rail,
|
|
OperationRef: intent.OperationRef,
|
|
IntentRef: intent.IntentRef,
|
|
PaymentRef: intent.PaymentRef,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) sendConfirmationRequest(request *model.ConfirmationRequest) error {
|
|
if request == nil {
|
|
s.logger.Warn("Confirmation request rejected", zap.String("reason", "request is nil"))
|
|
return merrors.InvalidArgument("confirmation request is nil", "request")
|
|
}
|
|
if s.producer == nil {
|
|
s.logger.Warn("Messaging producer not configured")
|
|
return merrors.Internal("messaging producer is not configured")
|
|
}
|
|
env := confirmations.ConfirmationRequest(string(mservice.PaymentGateway), request)
|
|
if err := s.producer.SendMessage(env); err != nil {
|
|
s.logger.Warn("Failed to publish confirmation request",
|
|
zap.Error(err),
|
|
zap.String("request_id", request.RequestID),
|
|
zap.String("payment_intent_id", request.PaymentIntentID),
|
|
zap.String("quote_ref", request.QuoteRef),
|
|
zap.String("rail", request.Rail),
|
|
zap.Int32("timeout_seconds", request.TimeoutSeconds))
|
|
return err
|
|
}
|
|
s.logger.Info("Published confirmation request",
|
|
zap.String("request_id", request.RequestID),
|
|
zap.String("payment_intent_id", request.PaymentIntentID),
|
|
zap.String("quote_ref", request.QuoteRef),
|
|
zap.String("rail", request.Rail),
|
|
zap.Int32("timeout_seconds", request.TimeoutSeconds))
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) publishTelegramReaction(result *model.ConfirmationResult) {
|
|
if s == nil || s.producer == nil || result == nil || result.RawReply == nil {
|
|
return
|
|
}
|
|
if result.Status != model.ConfirmationStatusConfirmed && result.Status != model.ConfirmationStatusClarified {
|
|
return
|
|
}
|
|
request := &model.TelegramReactionRequest{
|
|
RequestID: strings.TrimSpace(result.RequestID),
|
|
ChatID: strings.TrimSpace(result.RawReply.ChatID),
|
|
MessageID: strings.TrimSpace(result.RawReply.MessageID),
|
|
Emoji: s.successReaction,
|
|
}
|
|
if request.ChatID == "" || request.MessageID == "" || request.Emoji == "" {
|
|
return
|
|
}
|
|
env := tnotifications.TelegramReaction(string(mservice.PaymentGateway), request)
|
|
if err := s.producer.SendMessage(env); err != nil {
|
|
s.logger.Warn("Failed to publish telegram reaction",
|
|
zap.Error(err),
|
|
zap.String("request_id", request.RequestID),
|
|
zap.String("chat_id", request.ChatID),
|
|
zap.String("message_id", request.MessageID),
|
|
zap.String("emoji", request.Emoji))
|
|
return
|
|
}
|
|
s.logger.Info("Published telegram reaction",
|
|
zap.String("request_id", request.RequestID),
|
|
zap.String("chat_id", request.ChatID),
|
|
zap.String("message_id", request.MessageID),
|
|
zap.String("emoji", request.Emoji))
|
|
}
|
|
|
|
func (s *Service) loadPayment(ctx context.Context, requestID string) (*storagemodel.PaymentRecord, error) {
|
|
if s == nil || s.repo == nil || s.repo.Payments() == nil {
|
|
return nil, merrors.Internal("payment gateway storage unavailable")
|
|
}
|
|
requestID = strings.TrimSpace(requestID)
|
|
if requestID == "" {
|
|
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
|
}
|
|
return s.repo.Payments().FindByIdempotencyKey(ctx, requestID)
|
|
}
|
|
|
|
func (s *Service) startAnnouncer() {
|
|
if s == nil || s.producer == nil {
|
|
return
|
|
}
|
|
rail := discovery.RailProviderSettlement
|
|
caps := discovery.ProviderSettlementRailGatewayOperations()
|
|
announce := discovery.Announcement{
|
|
ID: discovery.StablePaymentGatewayID(rail),
|
|
Service: string(mservice.PaymentGateway),
|
|
Rail: rail,
|
|
Operations: caps,
|
|
InvokeURI: s.invokeURI,
|
|
}
|
|
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.PaymentGateway), announce)
|
|
s.announcer.Start()
|
|
}
|
|
|
|
func normalizeIntent(intent *model.PaymentGatewayIntent) *model.PaymentGatewayIntent {
|
|
if intent == nil {
|
|
return nil
|
|
}
|
|
cp := *intent
|
|
cp.PaymentIntentID = strings.TrimSpace(cp.PaymentIntentID)
|
|
cp.IdempotencyKey = strings.TrimSpace(cp.IdempotencyKey)
|
|
cp.OutgoingLeg = strings.TrimSpace(cp.OutgoingLeg)
|
|
cp.QuoteRef = strings.TrimSpace(cp.QuoteRef)
|
|
if cp.RequestedMoney != nil {
|
|
cp.RequestedMoney.Amount = strings.TrimSpace(cp.RequestedMoney.Amount)
|
|
cp.RequestedMoney.Currency = strings.TrimSpace(cp.RequestedMoney.Currency)
|
|
}
|
|
cp.IntentRef = strings.TrimSpace(cp.IntentRef)
|
|
cp.OperationRef = strings.TrimSpace(cp.OperationRef)
|
|
return &cp
|
|
}
|
|
|
|
func paymentRecordFromIntent(intent *model.PaymentGatewayIntent, confirmReq *model.ConfirmationRequest) *storagemodel.PaymentRecord {
|
|
record := &storagemodel.PaymentRecord{
|
|
Status: storagemodel.PaymentStatusWaiting,
|
|
}
|
|
if intent != nil {
|
|
record.IdempotencyKey = strings.TrimSpace(intent.IdempotencyKey)
|
|
record.PaymentIntentID = strings.TrimSpace(intent.PaymentIntentID)
|
|
record.QuoteRef = strings.TrimSpace(intent.QuoteRef)
|
|
record.OutgoingLeg = strings.TrimSpace(intent.OutgoingLeg)
|
|
record.RequestedMoney = intent.RequestedMoney
|
|
record.IntentRef = intent.IntentRef
|
|
record.OperationRef = intent.OperationRef
|
|
record.PaymentRef = intent.PaymentRef
|
|
}
|
|
if confirmReq != nil {
|
|
record.IdempotencyKey = strings.TrimSpace(confirmReq.RequestID)
|
|
record.PaymentIntentID = strings.TrimSpace(confirmReq.PaymentIntentID)
|
|
record.QuoteRef = strings.TrimSpace(confirmReq.QuoteRef)
|
|
record.OutgoingLeg = strings.TrimSpace(confirmReq.Rail)
|
|
record.RequestedMoney = confirmReq.RequestedMoney
|
|
record.IntentRef = strings.TrimSpace(confirmReq.IntentRef)
|
|
record.OperationRef = strings.TrimSpace(confirmReq.OperationRef)
|
|
record.PaymentRef = confirmReq.PaymentRef
|
|
// ExpiresAt is not used to derive an "expired" status — it can be kept for informational purposes only.
|
|
if confirmReq.TimeoutSeconds > 0 {
|
|
record.ExpiresAt = time.Now().Add(time.Duration(confirmReq.TimeoutSeconds) * time.Second)
|
|
}
|
|
}
|
|
return record
|
|
}
|
|
|
|
func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, defaultChatID string) (*model.PaymentGatewayIntent, error) {
|
|
if req == nil {
|
|
return nil, merrors.InvalidArgument("submit_transfer: request is required")
|
|
}
|
|
idempotencyKey := strings.TrimSpace(req.GetIdempotencyKey())
|
|
if idempotencyKey == "" {
|
|
return nil, merrors.InvalidArgument("submit_transfer: idempotency_key is required")
|
|
}
|
|
intentRef := strings.TrimSpace(req.GetIntentRef())
|
|
if intentRef == "" {
|
|
return nil, merrors.InvalidArgument("submit_transfer: intent_ref is required")
|
|
}
|
|
amount := req.GetAmount()
|
|
if amount == nil {
|
|
return nil, merrors.InvalidArgument("submit_transfer: amount is required")
|
|
}
|
|
metadata := req.GetMetadata()
|
|
requestedMoney := &paymenttypes.Money{
|
|
Amount: strings.TrimSpace(amount.GetAmount()),
|
|
Currency: strings.TrimSpace(amount.GetCurrency()),
|
|
}
|
|
if requestedMoney.Amount == "" || requestedMoney.Currency == "" {
|
|
return nil, merrors.InvalidArgument("submit_transfer: amount is required")
|
|
}
|
|
sourceAmount := strings.TrimSpace(metadata[metadataSourceAmount])
|
|
sourceCurrency := strings.TrimSpace(metadata[metadataSourceCurrency])
|
|
if sourceAmount != "" && sourceCurrency != "" {
|
|
requestedMoney = &paymenttypes.Money{
|
|
Amount: sourceAmount,
|
|
Currency: sourceCurrency,
|
|
}
|
|
}
|
|
paymentIntentID := strings.TrimSpace(req.GetIntentRef())
|
|
if paymentIntentID == "" {
|
|
paymentIntentID = strings.TrimSpace(metadata[metadataPaymentIntentID])
|
|
}
|
|
if paymentIntentID == "" {
|
|
return nil, merrors.InvalidArgument("submit_transfer: payment_intent_id is required")
|
|
}
|
|
paymentRef := strings.TrimSpace(req.PaymentRef)
|
|
if paymentRef == "" {
|
|
return nil, merrors.InvalidArgument("submit_transfer: payment_ref is required")
|
|
}
|
|
operationRef := strings.TrimSpace(req.OperationRef)
|
|
if operationRef == "" {
|
|
return nil, merrors.InvalidArgument("submit_transfer: operation_ref is required")
|
|
}
|
|
quoteRef := strings.TrimSpace(metadata[metadataQuoteRef])
|
|
targetChatID := strings.TrimSpace(metadata[metadataTargetChatID])
|
|
outgoingLeg := strings.TrimSpace(metadata[metadataOutgoingLeg])
|
|
if outgoingLeg == "" {
|
|
outgoingLeg = strings.TrimSpace(defaultRail)
|
|
}
|
|
if targetChatID == "" {
|
|
targetChatID = strings.TrimSpace(defaultChatID)
|
|
}
|
|
return &model.PaymentGatewayIntent{
|
|
PaymentRef: paymentRef,
|
|
PaymentIntentID: paymentIntentID,
|
|
IdempotencyKey: idempotencyKey,
|
|
OutgoingLeg: outgoingLeg,
|
|
QuoteRef: quoteRef,
|
|
RequestedMoney: requestedMoney,
|
|
IntentRef: intentRef,
|
|
OperationRef: operationRef,
|
|
}, nil
|
|
}
|
|
|
|
func transferFromRequest(req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
|
|
if req == nil {
|
|
return nil
|
|
}
|
|
return &chainv1.Transfer{
|
|
TransferRef: strings.TrimSpace(req.GetIdempotencyKey()),
|
|
IdempotencyKey: strings.TrimSpace(req.GetIdempotencyKey()),
|
|
OrganizationRef: strings.TrimSpace(req.GetOrganizationRef()),
|
|
SourceWalletRef: strings.TrimSpace(req.GetSourceWalletRef()),
|
|
Destination: req.GetDestination(),
|
|
RequestedAmount: req.GetAmount(),
|
|
Status: chainv1.TransferStatus_TRANSFER_CREATED,
|
|
}
|
|
}
|
|
|
|
func transferFromPayment(record *storagemodel.PaymentRecord, req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
|
|
if record == nil {
|
|
return nil
|
|
}
|
|
|
|
var requested *moneyv1.Money
|
|
if req != nil && req.GetAmount() != nil {
|
|
requested = req.GetAmount()
|
|
} else {
|
|
requested = moneyFromPayment(record.RequestedMoney)
|
|
}
|
|
net := moneyFromPayment(record.RequestedMoney)
|
|
|
|
var status chainv1.TransferStatus
|
|
|
|
switch record.Status {
|
|
case storagemodel.PaymentStatusSuccess:
|
|
status = chainv1.TransferStatus_TRANSFER_SUCCESS
|
|
case storagemodel.PaymentStatusCancelled:
|
|
status = chainv1.TransferStatus_TRANSFER_CANCELLED
|
|
case storagemodel.PaymentStatusFailed:
|
|
status = chainv1.TransferStatus_TRANSFER_FAILED
|
|
case storagemodel.PaymentStatusProcessing:
|
|
status = chainv1.TransferStatus_TRANSFER_PROCESSING
|
|
case storagemodel.PaymentStatusWaiting:
|
|
status = chainv1.TransferStatus_TRANSFER_WAITING
|
|
default:
|
|
status = chainv1.TransferStatus_TRANSFER_CREATED
|
|
}
|
|
|
|
transfer := &chainv1.Transfer{
|
|
TransferRef: strings.TrimSpace(record.IdempotencyKey),
|
|
IdempotencyKey: strings.TrimSpace(record.IdempotencyKey),
|
|
RequestedAmount: requested,
|
|
NetAmount: net,
|
|
Status: status,
|
|
}
|
|
|
|
if req != nil {
|
|
transfer.OrganizationRef = strings.TrimSpace(req.GetOrganizationRef())
|
|
transfer.SourceWalletRef = strings.TrimSpace(req.GetSourceWalletRef())
|
|
transfer.Destination = req.GetDestination()
|
|
}
|
|
|
|
if !record.ExecutedAt.IsZero() {
|
|
ts := timestamppb.New(record.ExecutedAt)
|
|
transfer.CreatedAt = ts
|
|
transfer.UpdatedAt = ts
|
|
} else if !record.UpdatedAt.IsZero() {
|
|
ts := timestamppb.New(record.UpdatedAt)
|
|
transfer.UpdatedAt = ts
|
|
if !record.CreatedAt.IsZero() {
|
|
transfer.CreatedAt = timestamppb.New(record.CreatedAt)
|
|
}
|
|
}
|
|
|
|
return transfer
|
|
}
|
|
|
|
func moneyFromPayment(m *paymenttypes.Money) *moneyv1.Money {
|
|
if m == nil {
|
|
return nil
|
|
}
|
|
currency := strings.TrimSpace(m.Currency)
|
|
amount := strings.TrimSpace(m.Amount)
|
|
if currency == "" || amount == "" {
|
|
return nil
|
|
}
|
|
return &moneyv1.Money{
|
|
Currency: currency,
|
|
Amount: amount,
|
|
}
|
|
}
|
|
|
|
func readEnv(env string) string {
|
|
if strings.TrimSpace(env) == "" {
|
|
return ""
|
|
}
|
|
return strings.TrimSpace(os.Getenv(env))
|
|
}
|
|
|
|
var _ grpcapp.Service = (*Service)(nil)
|