Files
sendico/api/gateway/tgsettle/internal/service/gateway/service.go
2026-01-05 01:22:47 +01:00

542 lines
17 KiB
Go

package gateway
import (
"context"
"os"
"strings"
"sync"
"github.com/tech/sendico/gateway/tgsettle/storage"
storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model"
"github.com/tech/sendico/pkg/api/routers"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
msg "github.com/tech/sendico/pkg/messaging"
mb "github.com/tech/sendico/pkg/messaging/broker"
cons "github.com/tech/sendico/pkg/messaging/consumer"
confirmations "github.com/tech/sendico/pkg/messaging/notifications/confirmations"
paymentgateway "github.com/tech/sendico/pkg/messaging/notifications/paymentgateway"
np "github.com/tech/sendico/pkg/messaging/notifications/processor"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"github.com/tech/sendico/pkg/server/grpcapp"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
defaultConfirmationTimeoutSeconds = 120
executedStatus = "executed"
)
const (
metadataPaymentIntentID = "payment_intent_id"
metadataQuoteRef = "quote_ref"
metadataTargetChatID = "target_chat_id"
metadataOutgoingLeg = "outgoing_leg"
)
type Config struct {
Rail string
TargetChatIDEnv string
TimeoutSeconds int32
AcceptedUserIDs []string
}
type Service struct {
logger mlogger.Logger
repo storage.Repository
producer msg.Producer
broker mb.Broker
cfg Config
rail string
chatID string
announcer *discovery.Announcer
mu sync.Mutex
pending map[string]*model.PaymentGatewayIntent
consumers []msg.Consumer
connectorv1.UnimplementedConnectorServiceServer
}
func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Producer, broker mb.Broker, cfg Config) *Service {
if logger != nil {
logger = logger.Named("tgsettle_gateway")
}
svc := &Service{
logger: logger,
repo: repo,
producer: producer,
broker: broker,
cfg: cfg,
rail: strings.TrimSpace(cfg.Rail),
pending: map[string]*model.PaymentGatewayIntent{},
}
svc.chatID = strings.TrimSpace(readEnv(cfg.TargetChatIDEnv))
svc.startConsumers()
svc.startAnnouncer()
return svc
}
func (s *Service) Register(router routers.GRPC) error {
return router.Register(func(reg grpc.ServiceRegistrar) {
connectorv1.RegisterConnectorServiceServer(reg, s)
})
}
func (s *Service) Shutdown() {
if s == nil {
return
}
if s.announcer != nil {
s.announcer.Stop()
}
for _, consumer := range s.consumers {
if consumer != nil {
consumer.Close()
}
}
}
func (s *Service) startConsumers() {
if s == nil || s.broker == nil {
if s != nil && s.logger != nil {
s.logger.Warn("Messaging broker not configured; confirmation flow disabled")
}
return
}
resultProcessor := confirmations.NewConfirmationResultProcessor(s.logger, string(mservice.PaymentGateway), s.rail, s.onConfirmationResult)
s.consumeProcessor(resultProcessor)
}
func (s *Service) consumeProcessor(processor np.EnvelopeProcessor) {
consumer, err := cons.NewConsumer(s.logger, s.broker, processor.GetSubject())
if err != nil {
s.logger.Warn("Failed to create messaging consumer", zap.Error(err), zap.String("event", processor.GetSubject().ToString()))
return
}
s.consumers = append(s.consumers, consumer)
go func() {
if err := consumer.ConsumeMessages(processor.Process); err != nil {
s.logger.Warn("Messaging consumer stopped", zap.Error(err), zap.String("event", processor.GetSubject().ToString()))
}
}()
}
func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) {
if req == nil {
return nil, merrors.InvalidArgument("submit_transfer: request is required")
}
idempotencyKey := strings.TrimSpace(req.GetIdempotencyKey())
if idempotencyKey == "" {
return nil, merrors.InvalidArgument("submit_transfer: idempotency_key is required")
}
amount := req.GetAmount()
if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" {
return nil, merrors.InvalidArgument("submit_transfer: amount is required")
}
intent, err := intentFromSubmitTransfer(req, s.rail, s.chatID)
if err != nil {
return nil, err
}
if s.repo == nil || s.repo.Payments() == nil {
return nil, merrors.Internal("payment gateway storage unavailable")
}
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, idempotencyKey)
if err != nil {
return nil, err
}
if existing != nil {
return &chainv1.SubmitTransferResponse{Transfer: transferFromExecution(existing, req)}, nil
}
if err := s.onIntent(ctx, intent); err != nil {
return nil, err
}
return &chainv1.SubmitTransferResponse{Transfer: transferFromRequest(req)}, nil
}
func (s *Service) GetTransfer(ctx context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) {
if req == nil {
return nil, merrors.InvalidArgument("get_transfer: request is required")
}
transferRef := strings.TrimSpace(req.GetTransferRef())
if transferRef == "" {
return nil, merrors.InvalidArgument("get_transfer: transfer_ref is required")
}
if s.repo == nil || s.repo.Payments() == nil {
return nil, merrors.Internal("payment gateway storage unavailable")
}
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, transferRef)
if err != nil {
return nil, err
}
if existing != nil {
return &chainv1.GetTransferResponse{Transfer: transferFromExecution(existing, nil)}, nil
}
if s.hasPending(transferRef) {
return &chainv1.GetTransferResponse{Transfer: transferPending(transferRef)}, nil
}
return nil, status.Error(codes.NotFound, "transfer not found")
}
func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayIntent) error {
if intent == nil {
return merrors.InvalidArgument("payment gateway intent is nil", "intent")
}
intent = normalizeIntent(intent)
if intent.IdempotencyKey == "" {
return merrors.InvalidArgument("idempotency_key is required", "idempotency_key")
}
if intent.PaymentIntentID == "" {
return merrors.InvalidArgument("payment_intent_id is required", "payment_intent_id")
}
if intent.RequestedMoney == nil || strings.TrimSpace(intent.RequestedMoney.Amount) == "" || strings.TrimSpace(intent.RequestedMoney.Currency) == "" {
return merrors.InvalidArgument("requested_money is required", "requested_money")
}
if s.repo == nil || s.repo.Payments() == nil {
return merrors.Internal("payment gateway storage unavailable")
}
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, intent.IdempotencyKey)
if err != nil {
return err
}
if existing != nil {
s.logger.Info("Payment gateway intent already executed", zap.String("idempotency_key", intent.IdempotencyKey))
return nil
}
confirmReq, err := s.buildConfirmationRequest(intent)
if err != nil {
return err
}
if err := s.sendConfirmationRequest(confirmReq); err != nil {
return err
}
s.trackIntent(confirmReq.RequestID, intent)
return nil
}
func (s *Service) onConfirmationResult(ctx context.Context, result *model.ConfirmationResult) error {
if result == nil {
return merrors.InvalidArgument("confirmation result is nil", "result")
}
requestID := strings.TrimSpace(result.RequestID)
if requestID == "" {
return merrors.InvalidArgument("confirmation request_id is required", "request_id")
}
intent := s.lookupIntent(requestID)
if intent == nil {
s.logger.Warn("Confirmation result ignored: intent not found", zap.String("request_id", requestID))
return nil
}
if result.RawReply != nil && s.repo != nil && s.repo.TelegramConfirmations() != nil {
_ = s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{
RequestID: requestID,
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
RawReply: result.RawReply,
})
}
if result.Status == model.ConfirmationStatusConfirmed || result.Status == model.ConfirmationStatusClarified {
exec := &storagemodel.PaymentExecution{
IdempotencyKey: intent.IdempotencyKey,
PaymentIntentID: intent.PaymentIntentID,
ExecutedMoney: result.Money,
QuoteRef: intent.QuoteRef,
Status: executedStatus,
}
if err := s.repo.Payments().InsertExecution(ctx, exec); err != nil && err != storage.ErrDuplicate {
return err
}
}
s.publishExecution(intent, result)
s.removeIntent(requestID)
return nil
}
func (s *Service) buildConfirmationRequest(intent *model.PaymentGatewayIntent) (*model.ConfirmationRequest, error) {
targetChatID := strings.TrimSpace(intent.TargetChatID)
if targetChatID == "" {
targetChatID = s.chatID
}
if targetChatID == "" {
return nil, merrors.InvalidArgument("target_chat_id is required", "target_chat_id")
}
rail := strings.TrimSpace(intent.OutgoingLeg)
if rail == "" {
rail = s.rail
}
timeout := s.cfg.TimeoutSeconds
if timeout <= 0 {
timeout = int32(defaultConfirmationTimeoutSeconds)
}
return &model.ConfirmationRequest{
RequestID: intent.IdempotencyKey,
TargetChatID: targetChatID,
RequestedMoney: intent.RequestedMoney,
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
AcceptedUserIDs: s.cfg.AcceptedUserIDs,
TimeoutSeconds: timeout,
SourceService: string(mservice.PaymentGateway),
Rail: rail,
}, nil
}
func (s *Service) sendConfirmationRequest(request *model.ConfirmationRequest) error {
if request == nil {
return merrors.InvalidArgument("confirmation request is nil", "request")
}
if s.producer == nil {
return merrors.Internal("messaging producer is not configured")
}
env := confirmations.ConfirmationRequest(string(mservice.PaymentGateway), request)
if err := s.producer.SendMessage(env); err != nil {
s.logger.Warn("Failed to publish confirmation request", zap.Error(err), zap.String("request_id", request.RequestID))
return err
}
return nil
}
func (s *Service) publishExecution(intent *model.PaymentGatewayIntent, result *model.ConfirmationResult) {
if s == nil || intent == nil || result == nil || s.producer == nil {
return
}
exec := &model.PaymentGatewayExecution{
PaymentIntentID: intent.PaymentIntentID,
IdempotencyKey: intent.IdempotencyKey,
QuoteRef: intent.QuoteRef,
ExecutedMoney: result.Money,
Status: result.Status,
RequestID: result.RequestID,
RawReply: result.RawReply,
}
env := paymentgateway.PaymentGatewayExecution(string(mservice.PaymentGateway), exec)
if err := s.producer.SendMessage(env); err != nil {
s.logger.Warn("Failed to publish gateway execution result", zap.Error(err), zap.String("request_id", result.RequestID))
}
}
func (s *Service) trackIntent(requestID string, intent *model.PaymentGatewayIntent) {
if s == nil || intent == nil {
return
}
requestID = strings.TrimSpace(requestID)
if requestID == "" {
return
}
s.mu.Lock()
s.pending[requestID] = intent
s.mu.Unlock()
}
func (s *Service) lookupIntent(requestID string) *model.PaymentGatewayIntent {
requestID = strings.TrimSpace(requestID)
if requestID == "" {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
return s.pending[requestID]
}
func (s *Service) removeIntent(requestID string) {
requestID = strings.TrimSpace(requestID)
if requestID == "" {
return
}
s.mu.Lock()
delete(s.pending, requestID)
s.mu.Unlock()
}
func (s *Service) hasPending(requestID string) bool {
requestID = strings.TrimSpace(requestID)
if requestID == "" {
return false
}
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.pending[requestID]
return ok
}
func (s *Service) startAnnouncer() {
if s == nil || s.producer == nil {
return
}
caps := []string{"telegram_confirmation", "money_persistence", "observe.confirm", "payout.fiat"}
if s.rail != "" {
caps = append(caps, "confirmations."+strings.ToLower(string(mservice.PaymentGateway))+"."+strings.ToLower(s.rail))
}
announce := discovery.Announcement{
Service: string(mservice.PaymentGateway),
Rail: s.rail,
Operations: caps,
InvokeURI: discovery.DefaultInvokeURI(string(mservice.PaymentGateway)),
}
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.PaymentGateway), announce)
s.announcer.Start()
}
func normalizeIntent(intent *model.PaymentGatewayIntent) *model.PaymentGatewayIntent {
if intent == nil {
return nil
}
cp := *intent
cp.PaymentIntentID = strings.TrimSpace(cp.PaymentIntentID)
cp.IdempotencyKey = strings.TrimSpace(cp.IdempotencyKey)
cp.OutgoingLeg = strings.TrimSpace(cp.OutgoingLeg)
cp.QuoteRef = strings.TrimSpace(cp.QuoteRef)
cp.TargetChatID = strings.TrimSpace(cp.TargetChatID)
if cp.RequestedMoney != nil {
cp.RequestedMoney.Amount = strings.TrimSpace(cp.RequestedMoney.Amount)
cp.RequestedMoney.Currency = strings.TrimSpace(cp.RequestedMoney.Currency)
}
return &cp
}
func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, defaultChatID string) (*model.PaymentGatewayIntent, error) {
if req == nil {
return nil, merrors.InvalidArgument("submit_transfer: request is required")
}
idempotencyKey := strings.TrimSpace(req.GetIdempotencyKey())
if idempotencyKey == "" {
return nil, merrors.InvalidArgument("submit_transfer: idempotency_key is required")
}
amount := req.GetAmount()
if amount == nil {
return nil, merrors.InvalidArgument("submit_transfer: amount is required")
}
requestedMoney := &paymenttypes.Money{
Amount: strings.TrimSpace(amount.GetAmount()),
Currency: strings.TrimSpace(amount.GetCurrency()),
}
if requestedMoney.Amount == "" || requestedMoney.Currency == "" {
return nil, merrors.InvalidArgument("submit_transfer: amount is required")
}
metadata := req.GetMetadata()
paymentIntentID := strings.TrimSpace(req.GetClientReference())
if paymentIntentID == "" {
paymentIntentID = strings.TrimSpace(metadata[metadataPaymentIntentID])
}
if paymentIntentID == "" {
return nil, merrors.InvalidArgument("submit_transfer: payment_intent_id is required")
}
quoteRef := strings.TrimSpace(metadata[metadataQuoteRef])
targetChatID := strings.TrimSpace(metadata[metadataTargetChatID])
outgoingLeg := strings.TrimSpace(metadata[metadataOutgoingLeg])
if outgoingLeg == "" {
outgoingLeg = strings.TrimSpace(defaultRail)
}
if targetChatID == "" {
targetChatID = strings.TrimSpace(defaultChatID)
}
return &model.PaymentGatewayIntent{
PaymentIntentID: paymentIntentID,
IdempotencyKey: idempotencyKey,
OutgoingLeg: outgoingLeg,
QuoteRef: quoteRef,
RequestedMoney: requestedMoney,
TargetChatID: targetChatID,
}, nil
}
func transferFromRequest(req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
if req == nil {
return nil
}
amount := req.GetAmount()
return &chainv1.Transfer{
TransferRef: strings.TrimSpace(req.GetIdempotencyKey()),
IdempotencyKey: strings.TrimSpace(req.GetIdempotencyKey()),
OrganizationRef: strings.TrimSpace(req.GetOrganizationRef()),
SourceWalletRef: strings.TrimSpace(req.GetSourceWalletRef()),
Destination: req.GetDestination(),
RequestedAmount: amount,
Status: chainv1.TransferStatus_TRANSFER_SUBMITTED,
}
}
func transferFromExecution(exec *storagemodel.PaymentExecution, req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
if exec == nil {
return nil
}
var requested *moneyv1.Money
if req != nil && req.GetAmount() != nil {
requested = req.GetAmount()
}
net := moneyFromPayment(exec.ExecutedMoney)
status := chainv1.TransferStatus_TRANSFER_CONFIRMED
if strings.TrimSpace(exec.Status) != "" && !strings.EqualFold(exec.Status, executedStatus) {
status = chainv1.TransferStatus_TRANSFER_PENDING
}
transfer := &chainv1.Transfer{
TransferRef: strings.TrimSpace(exec.IdempotencyKey),
IdempotencyKey: strings.TrimSpace(exec.IdempotencyKey),
RequestedAmount: requested,
NetAmount: net,
Status: status,
}
if req != nil {
transfer.OrganizationRef = strings.TrimSpace(req.GetOrganizationRef())
transfer.SourceWalletRef = strings.TrimSpace(req.GetSourceWalletRef())
transfer.Destination = req.GetDestination()
}
if !exec.ExecutedAt.IsZero() {
ts := timestamppb.New(exec.ExecutedAt)
transfer.CreatedAt = ts
transfer.UpdatedAt = ts
}
return transfer
}
func transferPending(requestID string) *chainv1.Transfer {
ref := strings.TrimSpace(requestID)
if ref == "" {
return nil
}
return &chainv1.Transfer{
TransferRef: ref,
IdempotencyKey: ref,
Status: chainv1.TransferStatus_TRANSFER_SUBMITTED,
}
}
func moneyFromPayment(m *paymenttypes.Money) *moneyv1.Money {
if m == nil {
return nil
}
currency := strings.TrimSpace(m.Currency)
amount := strings.TrimSpace(m.Amount)
if currency == "" || amount == "" {
return nil
}
return &moneyv1.Money{
Currency: currency,
Amount: amount,
}
}
func readEnv(env string) string {
if strings.TrimSpace(env) == "" {
return ""
}
return strings.TrimSpace(os.Getenv(env))
}
var _ grpcapp.Service = (*Service)(nil)