This commit is contained in:
Stephan D
2026-03-10 12:31:09 +01:00
parent d87e709f43
commit e77d1ab793
287 changed files with 2089 additions and 1550 deletions

View File

@@ -217,7 +217,7 @@ func (s *Service) onTelegramUpdate(ctx context.Context, update *model.TelegramWe
ReplyToMessageID: message.MessageID,
Text: "Only approved users can confirm this payment.",
}); e != nil {
s.logger.Warn("Failed to create telegram text", append(replyFields, zap.Error(err))...)
s.logger.Warn("Failed to create telegram text", append(replyFields, zap.Error(e))...)
}
if err := s.clearPendingConfirmation(ctx, pending.RequestID); err != nil {
return err
@@ -242,7 +242,7 @@ func (s *Service) onTelegramUpdate(ctx context.Context, update *model.TelegramWe
ReplyToMessageID: message.MessageID,
Text: clarificationMessage(reason),
}); e != nil {
s.logger.Warn("Failed to create telegram text", append(replyFields, zap.Error(err))...)
s.logger.Warn("Failed to create telegram text", append(replyFields, zap.Error(e))...)
}
s.logger.Warn("Telegram confirmation reply dropped",
append(replyFields,
@@ -307,13 +307,13 @@ func (s *Service) publishPendingConfirmationResult(pending *storagemodel.Pending
}
sourceService := strings.TrimSpace(pending.SourceService)
if sourceService == "" {
sourceService = string(mservice.PaymentGateway)
sourceService = mservice.PaymentGateway
}
rail := strings.TrimSpace(pending.Rail)
if rail == "" {
rail = s.rail
}
env := confirmations.ConfirmationResult(string(mservice.PaymentGateway), result, sourceService, rail)
env := confirmations.ConfirmationResult(mservice.PaymentGateway, result, sourceService, rail)
if err := s.producer.SendMessage(env); err != nil {
s.logger.Warn("Failed to publish confirmation result", zap.Error(err),
zap.String("request_id", strings.TrimSpace(result.RequestID)),
@@ -338,7 +338,7 @@ func (s *Service) sendTelegramText(_ context.Context, request *model.TelegramTex
if request.ChatID == "" || request.Text == "" {
return merrors.InvalidArgument("telegram chat_id and text are required", "chat_id", "text")
}
env := tnotifications.TelegramText(string(mservice.PaymentGateway), request)
env := tnotifications.TelegramText(mservice.PaymentGateway, request)
if err := s.producer.SendMessage(env); err != nil {
s.logger.Warn("Failed to publish telegram text request", zap.Error(err),
zap.String("request_id", request.RequestID),

View File

@@ -31,7 +31,7 @@ func (s *Service) GetCapabilities(_ context.Context, _ *connectorv1.GetCapabilit
}
func (s *Service) OpenAccount(_ context.Context, _ *connectorv1.OpenAccountRequest) (*connectorv1.OpenAccountResponse, error) {
return &connectorv1.OpenAccountResponse{Error: connectorError(connectorv1.ErrorCode_UNSUPPORTED_ACCOUNT_KIND, "open_account: unsupported", nil, "")}, nil
return &connectorv1.OpenAccountResponse{Error: connectorError(connectorv1.ErrorCode_UNSUPPORTED_ACCOUNT_KIND, "open_account: unsupported", nil)}, nil
}
func (s *Service) GetAccount(_ context.Context, _ *connectorv1.GetAccountRequest) (*connectorv1.GetAccountResponse, error) {
@@ -49,16 +49,16 @@ func (s *Service) GetBalance(_ context.Context, _ *connectorv1.GetBalanceRequest
func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOperationRequest) (*connectorv1.SubmitOperationResponse, error) {
if req == nil || req.GetOperation() == nil {
s.logger.Warn("Submit operation rejected", zap.String("reason", "operation is required"))
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: operation is required", nil, "")}}, nil
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: operation is required", nil)}}, nil
}
op := req.GetOperation()
if strings.TrimSpace(op.GetIdempotencyKey()) == "" {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "idempotency_key is required"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: idempotency_key is required", op, "")}}, nil
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: idempotency_key is required", op)}}, nil
}
if op.GetType() != connectorv1.OperationType_TRANSFER {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "unsupported operation type"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_UNSUPPORTED_OPERATION, "submit_operation: unsupported operation type", op, "")}}, nil
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_UNSUPPORTED_OPERATION, "submit_operation: unsupported operation type", op)}}, nil
}
reader := params.New(op.GetParams())
metadata := reader.StringMap("metadata")
@@ -74,22 +74,22 @@ func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOp
}
if paymentIntentID == "" {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "payment_intent_id is required"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: payment_intent_id is required", op, "")}}, nil
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: payment_intent_id is required", op)}}, nil
}
source := operationAccountID(op.GetFrom())
if source == "" {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "from.account is required"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "transfer: from.account is required", op, "")}}, nil
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "transfer: from.account is required", op)}}, nil
}
dest, err := transferDestinationFromOperation(op)
if err != nil {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.Error(err))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, err.Error(), op, "")}}, nil
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, err.Error(), op)}}, nil
}
amount := op.GetMoney()
if amount == nil {
s.logger.Warn("Submit operation rejected", append(operationLogFields(op), zap.String("reason", "money is required"))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "transfer: money is required", op, "")}}, nil
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "transfer: money is required", op)}}, nil
}
metadata[metadataPaymentIntentID] = paymentIntentID
@@ -133,7 +133,7 @@ func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOp
})
if err != nil {
s.logger.Warn("Submit operation transfer failed", append(logFields, zap.Error(err))...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, "")}}, nil
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op)}}, nil
}
transfer := resp.GetTransfer()
operationID := strings.TrimSpace(transfer.GetOperationRef())
@@ -142,7 +142,7 @@ func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOp
zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())),
)...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{
Error: connectorError(connectorv1.ErrorCode_TEMPORARY_UNAVAILABLE, "submit_operation: operation_ref is missing in transfer response", op, ""),
Error: connectorError(connectorv1.ErrorCode_TEMPORARY_UNAVAILABLE, "submit_operation: operation_ref is missing in transfer response", op),
}}, nil
}
s.logger.Info("Submit operation transfer submitted", append(logFields,
@@ -361,11 +361,10 @@ func transferDestinationLogFields(dest *chainv1.TransferDestination) []zap.Field
}
}
func connectorError(code connectorv1.ErrorCode, message string, op *connectorv1.Operation, accountID string) *connectorv1.ConnectorError {
func connectorError(code connectorv1.ErrorCode, message string, op *connectorv1.Operation) *connectorv1.ConnectorError {
err := &connectorv1.ConnectorError{
Code: code,
Message: strings.TrimSpace(message),
AccountId: strings.TrimSpace(accountID),
Code: code,
Message: strings.TrimSpace(message),
}
if op != nil {
err.CorrelationId = strings.TrimSpace(op.GetCorrelationId())

View File

@@ -24,15 +24,15 @@ func (s *Service) outboxStore() gatewayoutbox.Store {
return provider.Outbox()
}
func (s *Service) startOutboxReliableProducer() error {
func (s *Service) startOutboxReliableProducer(_ context.Context) error {
if s == nil || s.repo == nil {
return nil
}
return s.outbox.Start(s.logger, s.producer, s.outboxStore(), s.msgCfg)
return s.outbox.Start(s.logger, s.producer, s.outboxStore(), s.msgCfg) //nolint:contextcheck // Reliable runtime start API does not accept context.
}
func (s *Service) sendWithOutbox(ctx context.Context, env me.Envelope) error {
if err := s.startOutboxReliableProducer(); err != nil {
if err := s.startOutboxReliableProducer(ctx); err != nil {
return err
}
return s.outbox.Send(ctx, env)

View File

@@ -24,7 +24,6 @@ import (
tnotifications "github.com/tech/sendico/pkg/messaging/notifications/telegram"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/model"
pmodel "github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
@@ -63,7 +62,7 @@ type Config struct {
AcceptedUserIDs []string
SuccessReaction string
InvokeURI string
MessagingSettings pmodel.SettingsT
MessagingSettings model.SettingsT
DiscoveryRegistry *discovery.Registry
Treasury TreasuryConfig
}
@@ -90,7 +89,7 @@ type Service struct {
producer msg.Producer
broker mb.Broker
cfg Config
msgCfg pmodel.SettingsT
msgCfg model.SettingsT
rail string
chatID string
announcer *discovery.Announcer
@@ -131,7 +130,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
if svc.successReaction == "" {
svc.successReaction = defaultTelegramSuccessReaction
}
if err := svc.startOutboxReliableProducer(); err != nil {
if err := svc.startOutboxReliableProducer(context.Background()); err != nil {
svc.logger.Warn("Failed to initialise outbox reliable producer", zap.Error(err))
}
svc.startConsumers()
@@ -240,9 +239,9 @@ func (s *Service) startConsumers() {
}
return
}
resultProcessor := confirmations.NewConfirmationResultProcessor(s.logger, string(mservice.PaymentGateway), s.rail, s.onConfirmationResult)
resultProcessor := confirmations.NewConfirmationResultProcessor(s.logger, mservice.PaymentGateway, s.rail, s.onConfirmationResult)
s.consumeProcessor(resultProcessor)
dispatchProcessor := confirmations.NewConfirmationDispatchProcessor(s.logger, string(mservice.PaymentGateway), s.rail, s.onConfirmationDispatch)
dispatchProcessor := confirmations.NewConfirmationDispatchProcessor(s.logger, mservice.PaymentGateway, s.rail, s.onConfirmationDispatch)
s.consumeProcessor(dispatchProcessor)
updateProcessor := tnotifications.NewTelegramUpdateProcessor(s.logger, s.onTelegramUpdate)
s.consumeProcessor(updateProcessor)
@@ -277,7 +276,7 @@ func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransfe
s.logger.Warn("Submit transfer rejected", zap.String("reason", "amount is required"), zap.String("idempotency_key", idempotencyKey))
return nil, merrors.InvalidArgument("submit_transfer: amount is required")
}
intent, err := intentFromSubmitTransfer(req, s.rail, s.chatID)
intent, err := intentFromSubmitTransfer(req, s.rail)
if err != nil {
s.logger.Warn("Submit transfer rejected", zap.Error(err), zap.String("idempotency_key", idempotencyKey))
return nil, err
@@ -537,7 +536,7 @@ func (s *Service) buildConfirmationRequest(intent *model.PaymentGatewayIntent) (
QuoteRef: intent.QuoteRef,
AcceptedUserIDs: s.cfg.AcceptedUserIDs,
TimeoutSeconds: timeout,
SourceService: string(mservice.PaymentGateway),
SourceService: mservice.PaymentGateway,
Rail: rail,
OperationRef: intent.OperationRef,
IntentRef: intent.IntentRef,
@@ -554,7 +553,7 @@ func (s *Service) sendConfirmationRequest(request *model.ConfirmationRequest) er
s.logger.Warn("Messaging producer not configured")
return merrors.Internal("messaging producer is not configured")
}
env := confirmations.ConfirmationRequest(string(mservice.PaymentGateway), request)
env := confirmations.ConfirmationRequest(mservice.PaymentGateway, request)
if err := s.producer.SendMessage(env); err != nil {
s.logger.Warn("Failed to publish confirmation request",
zap.Error(err),
@@ -590,7 +589,7 @@ func (s *Service) publishTelegramReaction(result *model.ConfirmationResult) {
if request.ChatID == "" || request.MessageID == "" || request.Emoji == "" {
return
}
env := tnotifications.TelegramReaction(string(mservice.PaymentGateway), request)
env := tnotifications.TelegramReaction(mservice.PaymentGateway, request)
if err := s.producer.SendMessage(env); err != nil {
s.logger.Warn("Failed to publish telegram reaction",
zap.Error(err),
@@ -632,7 +631,7 @@ func (s *Service) startAnnouncer() {
Operations: caps,
InvokeURI: s.invokeURI,
}
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.PaymentGateway), announce)
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, mservice.PaymentGateway, announce)
s.announcer.Start()
}
@@ -685,7 +684,7 @@ func paymentRecordFromIntent(intent *model.PaymentGatewayIntent, confirmReq *mod
return record
}
func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, defaultChatID string) (*model.PaymentGatewayIntent, error) {
func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail string) (*model.PaymentGatewayIntent, error) {
if req == nil {
return nil, merrors.InvalidArgument("submit_transfer: request is required")
}
@@ -733,14 +732,10 @@ func intentFromSubmitTransfer(req *chainv1.SubmitTransferRequest, defaultRail, d
return nil, merrors.InvalidArgument("submit_transfer: operation_ref is required")
}
quoteRef := strings.TrimSpace(metadata[metadataQuoteRef])
targetChatID := strings.TrimSpace(metadata[metadataTargetChatID])
outgoingLeg := normalizeRail(metadata[metadataOutgoingLeg])
if outgoingLeg == "" {
outgoingLeg = normalizeRail(defaultRail)
}
if targetChatID == "" {
targetChatID = strings.TrimSpace(defaultChatID)
}
return &model.PaymentGatewayIntent{
PaymentRef: paymentRef,
PaymentIntentID: paymentIntentID,

View File

@@ -32,6 +32,7 @@ func (f *fakePaymentsStore) FindByIdempotencyKey(_ context.Context, key string)
f.mu.Lock()
defer f.mu.Unlock()
if f.records == nil {
//nolint:nilnil // Test stub uses nil, nil to represent missing records.
return nil, nil
}
return f.records[key], nil
@@ -41,6 +42,7 @@ func (f *fakePaymentsStore) FindByOperationRef(_ context.Context, key string) (*
f.mu.Lock()
defer f.mu.Unlock()
if f.records == nil {
//nolint:nilnil // Test stub uses nil, nil to represent missing records.
return nil, nil
}
for _, record := range f.records {
@@ -48,6 +50,7 @@ func (f *fakePaymentsStore) FindByOperationRef(_ context.Context, key string) (*
return record, nil
}
}
//nolint:nilnil // Test stub uses nil, nil to represent missing records.
return nil, nil
}
@@ -124,6 +127,7 @@ func (f *fakePendingStore) FindByRequestID(_ context.Context, requestID string)
f.mu.Lock()
defer f.mu.Unlock()
if f.records == nil {
//nolint:nilnil // Test stub uses nil, nil to represent missing records.
return nil, nil
}
return f.records[requestID], nil
@@ -137,6 +141,7 @@ func (f *fakePendingStore) FindByMessageID(_ context.Context, messageID string)
return record, nil
}
}
//nolint:nilnil // Test stub uses nil, nil to represent missing records.
return nil, nil
}
@@ -197,6 +202,7 @@ func (f *fakeBroker) Publish(_ envelope.Envelope) error {
}
func (f *fakeBroker) Subscribe(event model.NotificationEvent) (<-chan envelope.Envelope, error) {
//nolint:nilnil // Test stub does not create a subscription channel.
return nil, nil
}
@@ -237,7 +243,7 @@ func newTestService(_ *testing.T) (*Service, *fakeRepo, *captureProducer) {
pending: &fakePendingStore{},
}
sigEnv := tnotifications.TelegramReaction(string(mservice.PaymentGateway), &model.TelegramReactionRequest{
sigEnv := tnotifications.TelegramReaction(mservice.PaymentGateway, &model.TelegramReactionRequest{
RequestID: "x",
ChatID: "1",
MessageID: "2",
@@ -407,7 +413,7 @@ func TestIntentFromSubmitTransfer_NormalizesOutgoingLeg(t *testing.T) {
Metadata: map[string]string{
metadataOutgoingLeg: "card",
},
}, "provider_settlement", "")
}, "provider_settlement")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

View File

@@ -53,7 +53,7 @@ func (s *Service) updateTransferStatus(ctx context.Context, record *model.Paymen
return nil, emitErr
}
}
return nil, nil
return struct{}{}, nil
})
if err != nil {
s.logger.Warn("Failed to update transfer status", zap.String("payment_ref", record.PaymentIntentID), zap.String("status", string(record.Status)), zap.Error(err))