better logging

This commit is contained in:
Stephan D
2026-01-18 13:11:22 +01:00
parent 66a3961cef
commit 3db1681a6e
8 changed files with 113 additions and 38 deletions

View File

@@ -2,6 +2,7 @@ package gateway
import (
"context"
"errors"
"os"
"strings"
"sync"
@@ -20,9 +21,9 @@ import (
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"github.com/tech/sendico/pkg/server/grpcapp"
"go.uber.org/zap"
@@ -69,9 +70,10 @@ type Service struct {
}
func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Producer, broker mb.Broker, cfg Config) *Service {
if logger != nil {
logger = logger.Named("tgsettle_gateway")
if logger == nil {
logger = zap.NewNop()
}
logger = logger.Named("service")
svc := &Service{
logger: logger,
repo: repo,
@@ -126,7 +128,7 @@ func (s *Service) consumeProcessor(processor np.EnvelopeProcessor) {
}
s.consumers = append(s.consumers, consumer)
go func() {
if err := consumer.ConsumeMessages(processor.Process); err != nil {
if err := consumer.ConsumeMessages(processor.Process); err != nil && !errors.Is(err, context.Canceled) {
s.logger.Warn("Messaging consumer stopped", zap.Error(err), zap.String("event", processor.GetSubject().ToString()))
}
}()
@@ -211,7 +213,11 @@ func (s *Service) onIntent(ctx context.Context, intent *model.PaymentGatewayInte
return err
}
if existing != nil {
s.logger.Info("Payment gateway intent already executed", zap.String("idempotency_key", intent.IdempotencyKey))
s.logger.Info("Payment gateway intent already executed",
zap.String("idempotency_key", intent.IdempotencyKey),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("quote_ref", intent.QuoteRef),
zap.String("rail", intent.OutgoingLeg))
return nil
}
@@ -241,12 +247,19 @@ func (s *Service) onConfirmationResult(ctx context.Context, result *model.Confir
}
if result.RawReply != nil && s.repo != nil && s.repo.TelegramConfirmations() != nil {
_ = s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{
if err := s.repo.TelegramConfirmations().Upsert(ctx, &storagemodel.TelegramConfirmation{
RequestID: requestID,
PaymentIntentID: intent.PaymentIntentID,
QuoteRef: intent.QuoteRef,
RawReply: result.RawReply,
})
}); err != nil {
s.logger.Warn("Failed to store telegram confirmation", zap.Error(err), zap.String("request_id", requestID))
} else {
s.logger.Info("Stored telegram confirmation", zap.String("request_id", requestID),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("reply_text", result.RawReply.Text), zap.String("reply_user_id", result.RawReply.FromUserID),
zap.String("reply_user", result.RawReply.FromUsername))
}
}
if result.Status == model.ConfirmationStatusConfirmed || result.Status == model.ConfirmationStatusClarified {
@@ -305,7 +318,13 @@ func (s *Service) sendConfirmationRequest(request *model.ConfirmationRequest) er
}
env := confirmations.ConfirmationRequest(string(mservice.PaymentGateway), request)
if err := s.producer.SendMessage(env); err != nil {
s.logger.Warn("Failed to publish confirmation request", zap.Error(err), zap.String("request_id", request.RequestID))
s.logger.Warn("Failed to publish confirmation request",
zap.Error(err),
zap.String("request_id", request.RequestID),
zap.String("payment_intent_id", request.PaymentIntentID),
zap.String("quote_ref", request.QuoteRef),
zap.String("rail", request.Rail),
zap.Int32("timeout_seconds", request.TimeoutSeconds))
return err
}
return nil
@@ -326,7 +345,13 @@ func (s *Service) publishExecution(intent *model.PaymentGatewayIntent, result *m
}
env := paymentgateway.PaymentGatewayExecution(string(mservice.PaymentGateway), exec)
if err := s.producer.SendMessage(env); err != nil {
s.logger.Warn("Failed to publish gateway execution result", zap.Error(err), zap.String("request_id", result.RequestID))
s.logger.Warn("Failed to publish gateway execution result",
zap.Error(err),
zap.String("request_id", result.RequestID),
zap.String("idempotency_key", intent.IdempotencyKey),
zap.String("payment_intent_id", intent.PaymentIntentID),
zap.String("quote_ref", intent.QuoteRef),
zap.String("status", string(result.Status)))
}
}

View File

@@ -23,6 +23,9 @@ type Repository struct {
}
func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) {
if logger == nil {
logger = zap.NewNop()
}
if conn == nil {
return nil, merrors.InvalidArgument("mongo connection is nil")
}
@@ -30,10 +33,19 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) {
if client == nil {
return nil, merrors.Internal("mongo client is not initialised")
}
db := conn.Database()
if db == nil {
return nil, merrors.Internal("mongo database is not initialised")
}
dbName := db.Name()
logger = logger.Named("storage").Named("mongo")
if dbName != "" {
logger = logger.With(zap.String("database", dbName))
}
result := &Repository{
logger: logger.Named("storage").Named("mongo"),
logger: logger,
conn: conn,
db: conn.Database(),
db: db,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@@ -43,12 +55,12 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) {
}
paymentsStore, err := store.NewPayments(result.logger, result.db)
if err != nil {
result.logger.Error("Failed to initialise payments store", zap.Error(err))
result.logger.Error("Failed to initialise payments store", zap.Error(err), zap.String("store", "payments"))
return nil, err
}
tgStore, err := store.NewTelegramConfirmations(result.logger, result.db)
if err != nil {
result.logger.Error("Failed to initialise telegram confirmations store", zap.Error(err))
result.logger.Error("Failed to initialise telegram confirmations store", zap.Error(err), zap.String("store", "telegram_confirmations"))
return nil, err
}
result.payments = paymentsStore

View File

@@ -2,16 +2,18 @@ package store
import (
"context"
"errors"
"strings"
"time"
"github.com/tech/sendico/gateway/tgsettle/storage"
"github.com/tech/sendico/gateway/tgsettle/storage/model"
"github.com/tech/sendico/pkg/db/repository"
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
@@ -29,18 +31,25 @@ func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
if db == nil {
return nil, merrors.InvalidArgument("mongo database is nil")
}
p := &Payments{
logger: logger.Named("payments"),
coll: db.Collection(paymentsCollection),
if logger == nil {
logger = zap.NewNop()
}
_, err := p.coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{{Key: fieldIdempotencyKey, Value: 1}},
Options: options.Index().SetUnique(true),
})
if err != nil {
p.logger.Error("Failed to create payments idempotency index", zap.Error(err))
logger = logger.Named("payments").With(zap.String("collection", paymentsCollection))
repo := repository.CreateMongoRepository(db, paymentsCollection)
if err := repo.CreateIndex(&ri.Definition{
Keys: []ri.Key{{Field: fieldIdempotencyKey, Sort: ri.Asc}},
Unique: true,
}); err != nil {
logger.Error("Failed to create payments idempotency index", zap.Error(err), zap.String("index_field", fieldIdempotencyKey))
return nil, err
}
p := &Payments{
logger: logger,
coll: db.Collection(paymentsCollection),
}
p.logger.Debug("Payments store initialised")
return p, nil
}
@@ -55,6 +64,9 @@ func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model
return nil, nil
}
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Payment execution lookup failed", zap.String("idempotency_key", key), zap.Error(err))
}
return nil, err
}
return &result, nil
@@ -74,6 +86,13 @@ func (p *Payments) InsertExecution(ctx context.Context, exec *model.PaymentExecu
if mongo.IsDuplicateKeyError(err) {
return storage.ErrDuplicate
}
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to insert payment execution",
zap.String("idempotency_key", exec.IdempotencyKey),
zap.String("payment_intent_id", exec.PaymentIntentID),
zap.String("quote_ref", exec.QuoteRef),
zap.Error(err))
}
return err
}
return nil

View File

@@ -2,11 +2,14 @@ package store
import (
"context"
"errors"
"strings"
"time"
"github.com/tech/sendico/gateway/tgsettle/storage"
"github.com/tech/sendico/gateway/tgsettle/storage/model"
"github.com/tech/sendico/pkg/db/repository"
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/bson"
@@ -29,18 +32,25 @@ func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*Teleg
if db == nil {
return nil, merrors.InvalidArgument("mongo database is nil")
}
t := &TelegramConfirmations{
logger: logger.Named("telegram_confirmations"),
coll: db.Collection(telegramCollection),
if logger == nil {
logger = zap.NewNop()
}
_, err := t.coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{
Keys: bson.D{{Key: fieldRequestID, Value: 1}},
Options: options.Index().SetUnique(true),
})
if err != nil {
t.logger.Error("Failed to create telegram confirmations request_id index", zap.Error(err))
logger = logger.Named("telegram_confirmations").With(zap.String("collection", telegramCollection))
repo := repository.CreateMongoRepository(db, telegramCollection)
if err := repo.CreateIndex(&ri.Definition{
Keys: []ri.Key{{Field: fieldRequestID, Sort: ri.Asc}},
Unique: true,
}); err != nil {
logger.Error("Failed to create telegram confirmations request_id index", zap.Error(err), zap.String("index_field", fieldRequestID))
return nil, err
}
t := &TelegramConfirmations{
logger: logger,
coll: db.Collection(telegramCollection),
}
t.logger.Debug("Telegram confirmations store initialised")
return t, nil
}
@@ -61,6 +71,16 @@ func (t *TelegramConfirmations) Upsert(ctx context.Context, record *model.Telegr
"$set": record,
}
_, err := t.coll.UpdateOne(ctx, bson.M{fieldRequestID: record.RequestID}, update, options.Update().SetUpsert(true))
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
fields := []zap.Field{zap.String("request_id", record.RequestID)}
if record.PaymentIntentID != "" {
fields = append(fields, zap.String("payment_intent_id", record.PaymentIntentID))
}
if record.QuoteRef != "" {
fields = append(fields, zap.String("quote_ref", record.QuoteRef))
}
t.logger.Warn("Failed to upsert telegram confirmation", append(fields, zap.Error(err))...)
}
return err
}