fixed tgsettle logging

This commit is contained in:
Stephan D
2026-01-19 19:56:11 +01:00
parent d9d2fded8a
commit 5737ebf89a
4 changed files with 153 additions and 25 deletions

View File

@@ -10,6 +10,7 @@ import (
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"go.uber.org/zap"
)
const tgsettleConnectorID = "tgsettle"
@@ -44,61 +45,94 @@ func (s *Service) GetBalance(_ context.Context, _ *connectorv1.GetBalanceRequest
func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOperationRequest) (*connectorv1.SubmitOperationResponse, error) {
if req == nil || req.GetOperation() == nil {
s.logger.Warn("Submit operation rejected", zap.String("reason", "operation is required"))
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: operation is required", nil, "")}}, nil
}
op := req.GetOperation()
if strings.TrimSpace(op.GetIdempotencyKey()) == "" {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "idempotency_key is required"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: idempotency_key is required", op, "")}}, nil
}
if op.GetType() != connectorv1.OperationType_TRANSFER {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "unsupported operation type"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_UNSUPPORTED_OPERATION, "submit_operation: unsupported operation type", op, "")}}, nil
}
reader := params.New(op.GetParams())
paymentIntentID := strings.TrimSpace(reader.String("payment_intent_id"))
if paymentIntentID == "" {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: payment_intent_id is required", op, "")}}, nil
}
source := operationAccountID(op.GetFrom())
if source == "" {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "transfer: from.account is required", op, "")}}, nil
}
dest, err := transferDestinationFromOperation(op)
if err != nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, err.Error(), op, "")}}, nil
}
amount := op.GetMoney()
if amount == nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "transfer: money is required", op, "")}}, nil
}
metadata := reader.StringMap("metadata")
if metadata == nil {
metadata = map[string]string{}
}
paymentIntentID := strings.TrimSpace(reader.String("payment_intent_id"))
if paymentIntentID == "" {
paymentIntentID = strings.TrimSpace(reader.String("client_reference"))
}
if paymentIntentID == "" {
paymentIntentID = strings.TrimSpace(metadata[metadataPaymentIntentID])
}
if paymentIntentID == "" {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "payment_intent_id is required"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: payment_intent_id is required", op, "")}}, nil
}
source := operationAccountID(op.GetFrom())
if source == "" {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "from.account is required"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "transfer: from.account is required", op, "")}}, nil
}
dest, err := transferDestinationFromOperation(op)
if err != nil {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.Error(err))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, err.Error(), op, "")}}, nil
}
amount := op.GetMoney()
if amount == nil {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "money is required"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "transfer: money is required", op, "")}}, nil
}
metadata[metadataPaymentIntentID] = paymentIntentID
if quoteRef := strings.TrimSpace(reader.String("quote_ref")); quoteRef != "" {
quoteRef := strings.TrimSpace(reader.String("quote_ref"))
if quoteRef != "" {
metadata[metadataQuoteRef] = quoteRef
}
if targetChatID := strings.TrimSpace(reader.String("target_chat_id")); targetChatID != "" {
targetChatID := strings.TrimSpace(reader.String("target_chat_id"))
if targetChatID != "" {
metadata[metadataTargetChatID] = targetChatID
}
if outgoingLeg := strings.TrimSpace(reader.String("outgoing_leg")); outgoingLeg != "" {
outgoingLeg := strings.TrimSpace(reader.String("outgoing_leg"))
if outgoingLeg != "" {
metadata[metadataOutgoingLeg] = outgoingLeg
}
normalizedAmount := normalizeMoneyForTransfer(amount)
logFields := append(operationLogFields(op),
zap.String("payment_intent_id", paymentIntentID),
zap.String("organization_ref", strings.TrimSpace(reader.String("organization_ref"))),
zap.String("source_wallet_ref", source),
zap.String("amount", strings.TrimSpace(normalizedAmount.GetAmount())),
zap.String("currency", strings.TrimSpace(normalizedAmount.GetCurrency())),
zap.String("quote_ref", quoteRef),
zap.String("outgoing_leg", outgoingLeg),
)
logFields = append(logFields, transferDestinationLogFields(dest)...)
resp, err := s.SubmitTransfer(ctx, &chainv1.SubmitTransferRequest{
IdempotencyKey: strings.TrimSpace(op.GetIdempotencyKey()),
OrganizationRef: strings.TrimSpace(reader.String("organization_ref")),
SourceWalletRef: source,
Destination: dest,
Amount: normalizeMoneyForTransfer(amount),
Amount: normalizedAmount,
Metadata: metadata,
ClientReference: paymentIntentID,
})
if err != nil {
s.logger.Warn("Submit operation transfer failed", append(logFields, zap.Error(err))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, "")}}, nil
}
transfer := resp.GetTransfer()
s.logger.Info("Submit operation transfer submitted", append(logFields,
zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())),
zap.String("status", transfer.GetStatus().String()),
)...)
return &connectorv1.SubmitOperationResponse{
Receipt: &connectorv1.OperationReceipt{
OperationId: strings.TrimSpace(transfer.GetTransferRef()),
@@ -110,10 +144,13 @@ func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOp
func (s *Service) GetOperation(ctx context.Context, req *connectorv1.GetOperationRequest) (*connectorv1.GetOperationResponse, error) {
if req == nil || strings.TrimSpace(req.GetOperationId()) == "" {
s.logger.Warn("Get operation rejected", zap.String("reason", "operation_id is required"))
return nil, merrors.InvalidArgument("get_operation: operation_id is required")
}
resp, err := s.GetTransfer(ctx, &chainv1.GetTransferRequest{TransferRef: strings.TrimSpace(req.GetOperationId())})
operationID := strings.TrimSpace(req.GetOperationId())
resp, err := s.GetTransfer(ctx, &chainv1.GetTransferRequest{TransferRef: operationID})
if err != nil {
s.logger.Warn("Get operation failed", zap.String("operation_id", operationID), zap.Error(err))
return nil, err
}
return &connectorv1.GetOperationResponse{Operation: transferToOperation(resp.GetTransfer())}, nil
@@ -223,6 +260,39 @@ func operationAccountID(party *connectorv1.OperationParty) string {
return ""
}
func operationLogFields(op *connectorv1.Operation) []zap.Field {
if op == nil {
return nil
}
return []zap.Field{
zap.String("operation_id", strings.TrimSpace(op.GetOperationId())),
zap.String("idempotency_key", strings.TrimSpace(op.GetIdempotencyKey())),
zap.String("correlation_id", strings.TrimSpace(op.GetCorrelationId())),
zap.String("parent_intent_id", strings.TrimSpace(op.GetParentIntentId())),
zap.String("operation_type", op.GetType().String()),
}
}
func transferDestinationLogFields(dest *chainv1.TransferDestination) []zap.Field {
if dest == nil {
return nil
}
switch d := dest.GetDestination().(type) {
case *chainv1.TransferDestination_ManagedWalletRef:
return []zap.Field{
zap.String("destination_type", "managed_wallet"),
zap.String("destination_ref", strings.TrimSpace(d.ManagedWalletRef)),
}
case *chainv1.TransferDestination_ExternalAddress:
return []zap.Field{
zap.String("destination_type", "external_address"),
zap.String("destination_ref", strings.TrimSpace(d.ExternalAddress)),
}
default:
return []zap.Field{zap.String("destination_type", "unknown")}
}
}
func connectorError(code connectorv1.ErrorCode, message string, op *connectorv1.Operation, accountID string) *connectorv1.ConnectorError {
err := &connectorv1.ConnectorError{
Code: code,

View File

@@ -139,75 +139,115 @@ func (s *Service) consumeProcessor(processor np.EnvelopeProcessor) {
func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) {
if req == nil {
s.logger.Warn("Submit transfer rejected", zap.String("reason", "request is required"))
return nil, merrors.InvalidArgument("submit_transfer: request is required")
}
idempotencyKey := strings.TrimSpace(req.GetIdempotencyKey())
if idempotencyKey == "" {
s.logger.Warn("Submit transfer rejected", zap.String("reason", "idempotency_key is required"))
return nil, merrors.InvalidArgument("submit_transfer: idempotency_key is required")
}
amount := req.GetAmount()
if amount == nil || strings.TrimSpace(amount.GetAmount()) == "" || strings.TrimSpace(amount.GetCurrency()) == "" {
s.logger.Warn("Submit transfer rejected", zap.String("reason", "amount is required"), zap.String("idempotency_key", idempotencyKey))
return nil, merrors.InvalidArgument("submit_transfer: amount is required")
}
intent, err := intentFromSubmitTransfer(req, s.rail, s.chatID)
if err != nil {
s.logger.Warn("Submit transfer rejected", zap.Error(err), zap.String("idempotency_key", idempotencyKey))
return nil, err
}
logFields := []zap.Field{
zap.String("idempotency_key", intent.IdempotencyKey),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("quote_ref", intent.QuoteRef),
zap.String("rail", intent.OutgoingLeg),
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
zap.String("source_wallet_ref", strings.TrimSpace(req.GetSourceWalletRef())),
}
if intent.RequestedMoney != nil {
logFields = append(logFields,
zap.String("amount", strings.TrimSpace(intent.RequestedMoney.Amount)),
zap.String("currency", strings.TrimSpace(intent.RequestedMoney.Currency)),
)
}
logFields = append(logFields, transferDestinationLogFields(req.GetDestination())...)
if s.repo == nil || s.repo.Payments() == nil {
s.logger.Warn("Payment gateway storage unavailable", logFields...)
return nil, merrors.Internal("payment gateway storage unavailable")
}
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, idempotencyKey)
if err != nil {
s.logger.Warn("Submit transfer lookup failed", append(logFields, zap.Error(err))...)
return nil, err
}
if existing != nil {
s.logger.Info("Submit transfer idempotent hit", append(logFields, zap.String("status", strings.TrimSpace(existing.Status)))...)
return &chainv1.SubmitTransferResponse{Transfer: transferFromExecution(existing, req)}, nil
}
if err := s.onIntent(ctx, intent); err != nil {
s.logger.Warn("Submit transfer intent handling failed", append(logFields, zap.Error(err))...)
return nil, err
}
s.logger.Info("Submit transfer accepted", logFields...)
return &chainv1.SubmitTransferResponse{Transfer: transferFromRequest(req)}, nil
}
func (s *Service) GetTransfer(ctx context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) {
if req == nil {
s.logger.Warn("Get transfer rejected", zap.String("reason", "request is required"))
return nil, merrors.InvalidArgument("get_transfer: request is required")
}
transferRef := strings.TrimSpace(req.GetTransferRef())
if transferRef == "" {
s.logger.Warn("Get transfer rejected", zap.String("reason", "transfer_ref is required"))
return nil, merrors.InvalidArgument("get_transfer: transfer_ref is required")
}
logFields := []zap.Field{zap.String("transfer_ref", transferRef)}
if s.repo == nil || s.repo.Payments() == nil {
s.logger.Warn("Payment gateway storage unavailable", logFields...)
return nil, merrors.Internal("payment gateway storage unavailable")
}
existing, err := s.repo.Payments().FindByIdempotencyKey(ctx, transferRef)
if err != nil {
s.logger.Warn("Get transfer lookup failed", append(logFields, zap.Error(err))...)
return nil, err
}
if existing != nil {
s.logger.Info("Get transfer resolved from execution", append(logFields,
zap.String("payment_intent_id", strings.TrimSpace(existing.PaymentIntentID)),
zap.String("status", strings.TrimSpace(existing.Status)),
)...)
return &chainv1.GetTransferResponse{Transfer: transferFromExecution(existing, nil)}, nil
}
if s.hasPending(transferRef) {
s.logger.Info("Get transfer pending", logFields...)
return &chainv1.GetTransferResponse{Transfer: transferPending(transferRef)}, nil
}
s.logger.Warn("Get transfer not found", logFields...)
return nil, status.Error(codes.NotFound, "transfer not found")
}
func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayIntent) error {
if intent == nil {
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "intent is nil"))
return merrors.InvalidArgument("payment gateway intent is nil", "intent")
}
intent = normalizeIntent(intent)
if intent.IdempotencyKey == "" {
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "idempotency_key is required"))
return merrors.InvalidArgument("idempotency_key is required", "idempotency_key")
}
if intent.PaymentIntentID == "" {
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "payment_intent_id is required"), zap.String("idempotency_key", intent.IdempotencyKey))
return merrors.InvalidArgument("payment_intent_id is required", "payment_intent_id")
}
if intent.RequestedMoney == nil || strings.TrimSpace(intent.RequestedMoney.Amount) == "" || strings.TrimSpace(intent.RequestedMoney.Currency) == "" {
s.logger.Warn("Payment gateway intent rejected", zap.String("reason", "requested_money is required"), zap.String("idempotency_key", intent.IdempotencyKey))
return merrors.InvalidArgument("requested_money is required", "requested_money")
}
if s.repo == nil || s.repo.Payments() == nil {
s.logger.Warn("Payment gateway storage unavailable", zap.String("idempotency_key", intent.IdempotencyKey))
return merrors.Internal("payment gateway storage unavailable")
}
@@ -226,6 +266,7 @@ func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayInte
confirmReq, err := s.buildConfirmationRequest(intent)
if err != nil {
s.logger.Warn("Failed to build confirmation request", zap.Error(err), zap.String("idempotency_key", intent.IdempotencyKey), zap.String("payment_intent_id", intent.PaymentIntentID))
return err
}
if err := s.sendConfirmationRequest(confirmReq); err != nil {
@@ -237,10 +278,12 @@ func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayInte
func (s *Service) onConfirmationResult(ctx context.Context, result *model.ConfirmationResult) error {
if result == nil {
s.logger.Warn("Confirmation result rejected", zap.String("reason", "result is nil"))
return merrors.InvalidArgument("confirmation result is nil", "result")
}
requestID := strings.TrimSpace(result.RequestID)
if requestID == "" {
s.logger.Warn("Confirmation result rejected", zap.String("reason", "request_id is required"))
return merrors.InvalidArgument("confirmation request_id is required", "request_id")
}
intent := s.lookupIntent(requestID)
@@ -314,9 +357,11 @@ func (s *Service) buildConfirmationRequest(intent *model.PaymentGatewayIntent) (
func (s *Service) sendConfirmationRequest(request *model.ConfirmationRequest) error {
if request == nil {
s.logger.Warn("Confirmation request rejected", zap.String("reason", "request is nil"))
return merrors.InvalidArgument("confirmation request is nil", "request")
}
if s.producer == nil {
s.logger.Warn("Messaging producer not configured")
return merrors.Internal("messaging producer is not configured")
}
env := confirmations.ConfirmationRequest(string(mservice.PaymentGateway), request)
@@ -330,6 +375,12 @@ func (s *Service) sendConfirmationRequest(request *model.ConfirmationRequest) er
zap.Int32("timeout_seconds", request.TimeoutSeconds))
return err
}
s.logger.Info("Published confirmation request",
zap.String("request_id", request.RequestID),
zap.String("payment_intent_id", request.PaymentIntentID),
zap.String("quote_ref", request.QuoteRef),
zap.String("rail", request.Rail),
zap.Int32("timeout_seconds", request.TimeoutSeconds))
return nil
}
@@ -355,7 +406,14 @@ func (s *Service) publishExecution(intent *model.PaymentGatewayIntent, result *m
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("quote_ref", intent.QuoteRef),
zap.String("status", string(result.Status)))
return
}
s.logger.Info("Published gateway execution result",
zap.String("request_id", result.RequestID),
zap.String("idempotency_key", intent.IdempotencyKey),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("quote_ref", intent.QuoteRef),
zap.String("status", string(result.Status)))
}
func (s *Service) trackIntent(requestID string, intent *model.PaymentGatewayIntent) {