refactored payment orchestration

This commit is contained in:
Stephan D
2026-02-03 00:40:46 +01:00
parent 05d998e0f7
commit 5e87e2f2f9
184 changed files with 3920 additions and 2219 deletions

View File

@@ -16,7 +16,6 @@ import (
mb "github.com/tech/sendico/pkg/messaging/broker"
cons "github.com/tech/sendico/pkg/messaging/consumer"
confirmations "github.com/tech/sendico/pkg/messaging/notifications/confirmations"
paymentgateway "github.com/tech/sendico/pkg/messaging/notifications/paymentgateway"
np "github.com/tech/sendico/pkg/messaging/notifications/processor"
tnotifications "github.com/tech/sendico/pkg/messaging/notifications/telegram"
"github.com/tech/sendico/pkg/mlogger"
@@ -188,12 +187,7 @@ func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransfe
return nil, err
}
if existing != nil {
existing, err = s.expirePaymentIfNeeded(ctx, existing)
if err != nil {
s.logger.Warn("Submit transfer status refresh failed", append(logFields, zap.Error(err))...)
return nil, err
}
s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", string(paymentStatus(existing))))...)
s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", string(existing.Status)))...)
return &chainv1.SubmitTransferResponse{Transfer: transferFromPayment(existing, req)}, nil
}
if err := s.onIntent(ctx, intent); err != nil {
@@ -225,14 +219,9 @@ func (s *Service) GetTransfer(ctx context.Context, req *chainv1.GetTransferReque
return nil, err
}
if existing != nil {
existing, err = s.expirePaymentIfNeeded(ctx, existing)
if err != nil {
s.logger.Warn("Get transfer status refresh failed", append(logFields, zap.Error(err))...)
return nil, err
}
s.logger.Info("Get transfer resolved from execution", append(logFields,
zap.String("payment_intent_id", strings.TrimSpace(existing.PaymentIntentID)),
zap.String("status", string(paymentStatus(existing))),
zap.String("status", string(existing.Status)),
)...)
return &chainv1.GetTransferResponse{Transfer: transferFromPayment(existing, nil)}, nil
}
@@ -274,27 +263,30 @@ func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayInte
return err
}
if existing != nil {
existing, err = s.expirePaymentIfNeeded(ctx, existing)
if err != nil {
return err
}
s.logger.Info("Payment gateway intent already recorded",
zap.String("idempotency_key", confirmReq.RequestID),
zap.String("payment_intent_id", confirmReq.PaymentIntentID),
zap.String("quote_ref", confirmReq.QuoteRef),
zap.String("rail", confirmReq.Rail),
zap.String("status", string(paymentStatus(existing))))
zap.String("status", string(existing.Status)))
return nil
}
record := paymentRecordFromIntent(intent, confirmReq)
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
s.logger.Warn("Failed to persist pending payment", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID))
if err := s.updateTransferStatus(ctx, record); err != nil {
s.logger.Warn("Failed to persist payment record", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID))
return err
}
if err := s.sendConfirmationRequest(confirmReq); err != nil {
s.logger.Warn("Failed to publish confirmation request", zap.Error(err), zap.String("idempotency_key", confirmReq.RequestID))
s.markPaymentExpired(ctx, record, time.Now())
// If the confirmation request was not sent, we keep the record in waiting
// (or it can be marked as failed — depending on your semantics).
// Here, failed is chosen to avoid it hanging indefinitely.
record.Status = storagemodel.PaymentStatusFailed
record.UpdatedAt = time.Now()
if e := s.updateTransferStatus(ctx, record); e != nil {
s.logger.Warn("Failed to update payment status change", zap.Error(e), zap.String("idempotency_key", confirmReq.RequestID))
}
return err
}
return nil
@@ -302,65 +294,88 @@ func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayInte
func (s *Service) onConfirmationResult(ctx context.Context, result *model.ConfirmationResult) error {
if result == nil {
s.logger.Warn("Confirmation result rejected", zap.String("reason", "result is nil"))
return merrors.InvalidArgument("confirmation result is nil", "result")
}
requestID := strings.TrimSpace(result.RequestID)
if requestID == "" {
s.logger.Warn("Confirmation result rejected", zap.String("reason", "request_id is required"))
return merrors.InvalidArgument("confirmation request_id is required", "request_id")
}
record, err := s.loadPayment(ctx, requestID)
if err != nil {
s.logger.Warn("Confirmation result lookup failed", zap.Error(err), zap.String("request_id", requestID))
return err
}
if record == nil {
s.logger.Warn("Confirmation result ignored: payment not found", zap.String("request_id", requestID))
return nil
}
// Store raw reply for audit/debug purposes. This does NOT affect payment state.
if result.RawReply != nil && s.repo != nil && s.repo.TelegramConfirmations() != nil {
if err := s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{
if e := s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{
RequestID: requestID,
PaymentIntentID: record.PaymentIntentID,
QuoteRef: record.QuoteRef,
RawReply: result.RawReply,
}); err != nil {
s.logger.Warn("Failed to store telegram confirmation", zap.Error(err), zap.String("request_id", requestID))
} else {
s.logger.Info("Stored telegram confirmation", zap.String("request_id", requestID),
zap.String("payment_intent_id", record.PaymentIntentID),
zap.String("reply_text", result.RawReply.Text), zap.String("reply_user_id", result.RawReply.FromUserID),
zap.String("reply_user", result.RawReply.FromUsername))
}); e != nil {
s.logger.Warn("Failed to store confirmation error", zap.Error(e),
zap.String("request_id", requestID),
zap.String("status", string(result.Status)))
}
}
nextStatus := paymentStatusFromResult(result)
currentStatus := paymentStatus(record)
if currentStatus == storagemodel.PaymentStatusExecuted || currentStatus == storagemodel.PaymentStatusExpired {
s.logger.Info("Confirmation result ignored: payment already finalized",
zap.String("request_id", requestID),
zap.String("status", string(currentStatus)))
// If the payment is already finalized — ignore the result.
switch record.Status {
case storagemodel.PaymentStatusSuccess,
storagemodel.PaymentStatusFailed,
storagemodel.PaymentStatusCancelled:
return nil
}
s.applyPaymentResult(record, nextStatus, result)
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
s.logger.Warn("Failed to persist payment status", zap.Error(err), zap.String("request_id", requestID))
now := time.Now()
switch result.Status {
// FINAL: confirmation succeeded
case model.ConfirmationStatusConfirmed:
record.Status = storagemodel.PaymentStatusSuccess
record.ExecutedMoney = result.Money
if record.ExecutedAt.IsZero() {
record.ExecutedAt = now
}
record.UpdatedAt = now
// FINAL: confirmation rejected or timed out
case model.ConfirmationStatusRejected,
model.ConfirmationStatusTimeout:
record.Status = storagemodel.PaymentStatusFailed
record.UpdatedAt = now
// NOT FINAL: do absolutely nothing
case model.ConfirmationStatusClarified:
s.logger.Debug("Confirmation clarified — no state change",
zap.String("request_id", requestID))
return nil
default:
s.logger.Debug("Non-final confirmation status — ignored",
zap.String("request_id", requestID),
zap.String("status", string(result.Status)))
return nil
}
// The ONLY place where state is persisted and rail event may be emitted
if err := s.updateTransferStatus(ctx, record); err != nil {
return err
}
intent := intentFromPayment(record)
s.publishExecution(intent, result)
s.publishTelegramReaction(result)
return nil
}
func (s *Service) buildConfirmationRequest(intent *model.PaymentGatewayIntent) (*model.ConfirmationRequest, error) {
targetChatID := strings.TrimSpace(intent.TargetChatID)
if targetChatID == "" {
targetChatID = s.chatID
}
targetChatID := s.chatID
if targetChatID == "" {
return nil, merrors.InvalidArgument("target_chat_id is required", "target_chat_id")
}
@@ -414,38 +429,6 @@ func (s *Service) sendConfirmationRequest(request *model.ConfirmationRequest) er
return nil
}
func (s *Service) publishExecution(intent *model.PaymentGatewayIntent, result *model.ConfirmationResult) {
if s == nil || intent == nil || result == nil || s.producer == nil {
return
}
exec := &model.PaymentGatewayExecution{
PaymentIntentID: intent.PaymentIntentID,
IdempotencyKey: intent.IdempotencyKey,
QuoteRef: intent.QuoteRef,
ExecutedMoney: result.Money,
Status: result.Status,
RequestID: result.RequestID,
RawReply: result.RawReply,
}
env := paymentgateway.PaymentGatewayExecution(string(mservice.PaymentGateway), exec)
if err := s.producer.SendMessage(env); err != nil {
s.logger.Warn("Failed to publish gateway execution result",
zap.Error(err),
zap.String("request_id", result.RequestID),
zap.String("idempotency_key", intent.IdempotencyKey),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("quote_ref", intent.QuoteRef),
zap.String("status", string(result.Status)))
return
}
s.logger.Info("Published gateway execution result",
zap.String("request_id", result.RequestID),
zap.String("idempotency_key", intent.IdempotencyKey),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("quote_ref", intent.QuoteRef),
zap.String("status", string(result.Status)))
}
func (s *Service) publishTelegramReaction(result *model.ConfirmationResult) {
if s == nil || s.producer == nil || result == nil || result.RawReply == nil {
return
@@ -490,46 +473,6 @@ func (s *Service) loadPayment(ctx context.Context, requestID string) (*storagemo
return s.repo.Payments().FindByIdempotencyKey(ctx, requestID)
}
func (s *Service) expirePaymentIfNeeded(ctx context.Context, record *storagemodel.PaymentRecord) (*storagemodel.PaymentRecord, error) {
if record == nil {
return nil, nil
}
status := paymentStatus(record)
if status != storagemodel.PaymentStatusPending {
return record, nil
}
if record.ExpiresAt.IsZero() {
return record, nil
}
if time.Now().Before(record.ExpiresAt) {
return record, nil
}
record.Status = storagemodel.PaymentStatusExpired
if record.ExpiredAt.IsZero() {
record.ExpiredAt = time.Now()
}
if s != nil && s.repo != nil && s.repo.Payments() != nil {
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
return record, err
}
}
return record, nil
}
func (s *Service) markPaymentExpired(ctx context.Context, record *storagemodel.PaymentRecord, when time.Time) {
if record == nil || s == nil || s.repo == nil || s.repo.Payments() == nil {
return
}
if when.IsZero() {
when = time.Now()
}
record.Status = storagemodel.PaymentStatusExpired
record.ExpiredAt = when
if err := s.repo.Payments().Upsert(ctx, record); err != nil {
s.logger.Warn("Failed to mark payment as expired", zap.Error(err), zap.String("request_id", record.IdempotencyKey))
}
}
func (s *Service) startAnnouncer() {
if s == nil || s.producer == nil {
return
@@ -557,78 +500,37 @@ func normalizeIntent(intent *model.PaymentGatewayIntent) *model.PaymentGatewayIn
cp.IdempotencyKey = strings.TrimSpace(cp.IdempotencyKey)
cp.OutgoingLeg = strings.TrimSpace(cp.OutgoingLeg)
cp.QuoteRef = strings.TrimSpace(cp.QuoteRef)
cp.TargetChatID = strings.TrimSpace(cp.TargetChatID)
if cp.RequestedMoney != nil {
cp.RequestedMoney.Amount = strings.TrimSpace(cp.RequestedMoney.Amount)
cp.RequestedMoney.Currency = strings.TrimSpace(cp.RequestedMoney.Currency)
}
cp.IntentRef = strings.TrimSpace(cp.IntentRef)
cp.OperationRef = strings.TrimSpace(cp.OperationRef)
return &cp
}
func paymentStatus(record *storagemodel.PaymentRecord) storagemodel.PaymentStatus {
if record == nil {
return storagemodel.PaymentStatusPending
}
if record.Status != "" {
return record.Status
}
if record.ExecutedMoney != nil || !record.ExecutedAt.IsZero() {
return storagemodel.PaymentStatusExecuted
}
return storagemodel.PaymentStatusPending
}
func paymentStatusFromResult(result *model.ConfirmationResult) storagemodel.PaymentStatus {
if result == nil {
return storagemodel.PaymentStatusPending
}
switch result.Status {
case model.ConfirmationStatusConfirmed, model.ConfirmationStatusClarified:
return storagemodel.PaymentStatusExecuted
case model.ConfirmationStatusTimeout, model.ConfirmationStatusRejected:
return storagemodel.PaymentStatusExpired
default:
return storagemodel.PaymentStatusPending
}
}
func (s *Service) applyPaymentResult(record *storagemodel.PaymentRecord, status storagemodel.PaymentStatus, result *model.ConfirmationResult) {
if record == nil {
return
}
record.Status = status
switch status {
case storagemodel.PaymentStatusExecuted:
record.ExecutedMoney = result.Money
if record.ExecutedAt.IsZero() {
record.ExecutedAt = time.Now()
}
case storagemodel.PaymentStatusExpired:
if record.ExpiredAt.IsZero() {
record.ExpiredAt = time.Now()
}
}
}
func paymentRecordFromIntent(intent *model.PaymentGatewayIntent, confirmReq *model.ConfirmationRequest) *storagemodel.PaymentRecord {
record := &storagemodel.PaymentRecord{
Status: storagemodel.PaymentStatusPending,
Status: storagemodel.PaymentStatusWaiting,
}
if intent != nil {
record.IdempotencyKey = strings.TrimSpace(intent.IdempotencyKey)
record.PaymentIntentID = strings.TrimSpace(intent.PaymentIntentID)
record.QuoteRef = strings.TrimSpace(intent.QuoteRef)
record.OutgoingLeg = strings.TrimSpace(intent.OutgoingLeg)
record.TargetChatID = strings.TrimSpace(intent.TargetChatID)
record.RequestedMoney = intent.RequestedMoney
record.IntentRef = intent.IntentRef
record.OperationRef = intent.OperationRef
}
if confirmReq != nil {
record.IdempotencyKey = strings.TrimSpace(confirmReq.RequestID)
record.PaymentIntentID = strings.TrimSpace(confirmReq.PaymentIntentID)
record.QuoteRef = strings.TrimSpace(confirmReq.QuoteRef)
record.OutgoingLeg = strings.TrimSpace(confirmReq.Rail)
record.TargetChatID = strings.TrimSpace(confirmReq.TargetChatID)
record.RequestedMoney = confirmReq.RequestedMoney
record.IntentRef = strings.TrimSpace(confirmReq.IntentRef)
record.OperationRef = strings.TrimSpace(confirmReq.OperationRef)
// ExpiresAt is not used to derive an "expired" status — it can be kept for informational purposes only.
if confirmReq.TimeoutSeconds > 0 {
record.ExpiresAt = time.Now().Add(time.Duration(confirmReq.TimeoutSeconds) * time.Second)
}
@@ -641,12 +543,14 @@ func intentFromPayment(record *storagemodel.PaymentRecord) *model.PaymentGateway
return nil
}
return &model.PaymentGatewayIntent{
PaymentRef: strings.TrimSpace(record.PaymentRef),
PaymentIntentID: strings.TrimSpace(record.PaymentIntentID),
IdempotencyKey: strings.TrimSpace(record.IdempotencyKey),
OutgoingLeg: strings.TrimSpace(record.OutgoingLeg),
QuoteRef: strings.TrimSpace(record.QuoteRef),
IntentRef: strings.TrimSpace(record.IntentRef),
OperationRef: strings.TrimSpace(record.OperationRef),
RequestedMoney: record.RequestedMoney,
TargetChatID: strings.TrimSpace(record.TargetChatID),
}
}
@@ -678,13 +582,17 @@ func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, d
Currency: sourceCurrency,
}
}
paymentIntentID := strings.TrimSpace(req.GetClientReference())
paymentIntentID := strings.TrimSpace(req.GetIntentRef())
if paymentIntentID == "" {
paymentIntentID = strings.TrimSpace(metadata[metadataPaymentIntentID])
}
if paymentIntentID == "" {
return nil, merrors.InvalidArgument("submit_transfer: payment_intent_id is required")
}
paymentRef := strings.TrimSpace(req.PaymentRef)
if paymentRef == "" {
return nil, merrors.InvalidArgument("submit_transfer: payment_ref is required")
}
quoteRef := strings.TrimSpace(metadata[metadataQuoteRef])
targetChatID := strings.TrimSpace(metadata[metadataTargetChatID])
outgoingLeg := strings.TrimSpace(metadata[metadataOutgoingLeg])
@@ -695,12 +603,12 @@ func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, d
targetChatID = strings.TrimSpace(defaultChatID)
}
return &model.PaymentGatewayIntent{
PaymentRef: paymentRef,
PaymentIntentID: paymentIntentID,
IdempotencyKey: idempotencyKey,
OutgoingLeg: outgoingLeg,
QuoteRef: quoteRef,
RequestedMoney: requestedMoney,
TargetChatID: targetChatID,
}, nil
}
@@ -708,15 +616,14 @@ func transferFromRequest(req *chainv1.SubmitTransferRequest) *chainv1.Transfer {
if req == nil {
return nil
}
amount := req.GetAmount()
return &chainv1.Transfer{
TransferRef: strings.TrimSpace(req.GetIdempotencyKey()),
IdempotencyKey: strings.TrimSpace(req.GetIdempotencyKey()),
OrganizationRef: strings.TrimSpace(req.GetOrganizationRef()),
SourceWalletRef: strings.TrimSpace(req.GetSourceWalletRef()),
Destination: req.GetDestination(),
RequestedAmount: amount,
Status: chainv1.TransferStatus_TRANSFER_SUBMITTED,
RequestedAmount: req.GetAmount(),
Status: chainv1.TransferStatus_TRANSFER_CREATED,
}
}
@@ -724,20 +631,32 @@ func transferFromPayment(record *storagemodel.PaymentRecord, req *chainv1.Submit
if record == nil {
return nil
}
var requested *moneyv1.Money
if req != nil && req.GetAmount() != nil {
requested = req.GetAmount()
} else {
requested = moneyFromPayment(record.RequestedMoney)
}
net := moneyFromPayment(record.ExecutedMoney)
status := chainv1.TransferStatus_TRANSFER_SUBMITTED
switch paymentStatus(record) {
case storagemodel.PaymentStatusExecuted:
status = chainv1.TransferStatus_TRANSFER_CONFIRMED
case storagemodel.PaymentStatusExpired:
net := moneyFromPayment(record.RequestedMoney)
var status chainv1.TransferStatus
switch record.Status {
case storagemodel.PaymentStatusSuccess:
status = chainv1.TransferStatus_TRANSFER_SUCCESS
case storagemodel.PaymentStatusCancelled:
status = chainv1.TransferStatus_TRANSFER_CANCELLED
case storagemodel.PaymentStatusFailed:
status = chainv1.TransferStatus_TRANSFER_FAILED
case storagemodel.PaymentStatusProcessing:
status = chainv1.TransferStatus_TRANSFER_PROCESSING
case storagemodel.PaymentStatusWaiting:
status = chainv1.TransferStatus_TRANSFER_WAITING
default:
status = chainv1.TransferStatus_TRANSFER_CREATED
}
transfer := &chainv1.Transfer{
TransferRef: strings.TrimSpace(record.IdempotencyKey),
IdempotencyKey: strings.TrimSpace(record.IdempotencyKey),
@@ -745,11 +664,13 @@ func transferFromPayment(record *storagemodel.PaymentRecord, req *chainv1.Submit
NetAmount: net,
Status: status,
}
if req != nil {
transfer.OrganizationRef = strings.TrimSpace(req.GetOrganizationRef())
transfer.SourceWalletRef = strings.TrimSpace(req.GetSourceWalletRef())
transfer.Destination = req.GetDestination()
}
if !record.ExecutedAt.IsZero() {
ts := timestamppb.New(record.ExecutedAt)
transfer.CreatedAt = ts
@@ -761,6 +682,7 @@ func transferFromPayment(record *storagemodel.PaymentRecord, req *chainv1.Submit
transfer.CreatedAt = timestamppb.New(record.CreatedAt)
}
}
return transfer
}
@@ -787,3 +709,27 @@ func readEnv(env string) string {
}
var _ grpcapp.Service = (*Service)(nil)
func statusFromConfirmationResult(r *model.ConfirmationResult) storagemodel.PaymentStatus {
if r == nil {
return storagemodel.PaymentStatusWaiting
}
switch r.Status {
case model.ConfirmationStatusConfirmed:
return storagemodel.PaymentStatusProcessing
case model.ConfirmationStatusClarified:
return storagemodel.PaymentStatusWaiting
case model.ConfirmationStatusRejected:
return storagemodel.PaymentStatusFailed
case model.ConfirmationStatusTimeout:
return storagemodel.PaymentStatusFailed
default:
return storagemodel.PaymentStatusFailed
}
}