Merge pull request 'extended logging' (#454) from mntx-452 into main
All checks were successful
ci/woodpecker/push/billing_documents Pipeline was successful
ci/woodpecker/push/discovery Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/gateway_chain Pipeline was successful
ci/woodpecker/push/gateway_mntx Pipeline was successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
ci/woodpecker/push/gateway_tron Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful

Reviewed-on: #454
This commit was merged in pull request #454.
This commit is contained in:
2026-02-10 10:34:18 +00:00
6 changed files with 144 additions and 18 deletions

View File

@@ -11,7 +11,6 @@ import (
"github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mutil/mzap" "github.com/tech/sendico/pkg/mutil/mzap"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -85,17 +84,9 @@ func (p *Payouts) Upsert(ctx context.Context, record *model.CardPayout) error {
return merrors.InvalidArgument("operation ref is required", "operation_ref") return merrors.InvalidArgument("operation ref is required", "operation_ref")
} }
if record.ID == bson.NilObjectID { if err := p.repository.Upsert(ctx, record); err != nil {
if err := p.repository.Insert(ctx, record, nil); err != nil { p.logger.Warn("Failed to upsert payout record", zap.Error(err), mzap.ObjRef("payout_ref", record.ID),
p.logger.Warn("Failed to insert new record", zap.Error(err), zap.String("operation_Ref", record.OperationRef)) zap.String("operation_ref", record.OperationRef), zap.String("payment_ref", record.PayoutID))
return err
}
return nil
}
if err := p.repository.Update(ctx, record); err != nil {
p.logger.Warn("Failed to update existing record", zap.Error(err),
zap.String("operation_Ref", record.OperationRef), mzap.ObjRef("payout_ref", record.ID))
return err return err
} }
return nil return nil

View File

@@ -207,6 +207,16 @@ func (m *memoryOrganizationRepository) Update(_ context.Context, obj storable.St
return nil return nil
} }
func (m *memoryOrganizationRepository) Upsert(ctx context.Context, obj storable.Storable) error {
if err := m.Update(ctx, obj); err != nil {
if errors.Is(err, merrors.ErrNoData) {
return m.Insert(ctx, obj, nil)
}
return err
}
return nil
}
func (m *memoryOrganizationRepository) Patch(context.Context, bson.ObjectID, builder.Patch) error { func (m *memoryOrganizationRepository) Patch(context.Context, bson.ObjectID, builder.Patch) error {
return merrors.NotImplemented("Patch is not supported in memory repository") return merrors.NotImplemented("Patch is not supported in memory repository")
} }

View File

@@ -37,11 +37,18 @@ func (r *MongoRepository) Collection() string {
return r.collectionName return r.collectionName
} }
func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error { func prepareForWrite(obj storable.Storable) bson.ObjectID {
if (obj.GetID() == nil) || (obj.GetID().IsZero()) { id := obj.GetID()
if id == nil || id.IsZero() {
obj.SetID(bson.NewObjectID()) obj.SetID(bson.NewObjectID())
id = obj.GetID()
} }
obj.Update() obj.Update()
return *id
}
func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error {
prepareForWrite(obj)
_, err := r.collection.InsertOne(ctx, obj) _, err := r.collection.InsertOne(ctx, obj)
if mongo.IsDuplicateKeyError(err) { if mongo.IsDuplicateKeyError(err) {
if getFilter != nil { if getFilter != nil {
@@ -61,10 +68,7 @@ func (r *MongoRepository) InsertMany(ctx context.Context, objects []storable.Sto
docs := make([]interface{}, len(objects)) docs := make([]interface{}, len(objects))
for i, obj := range objects { for i, obj := range objects {
if (obj.GetID() == nil) || (obj.GetID().IsZero()) { prepareForWrite(obj)
obj.SetID(bson.NewObjectID())
}
obj.Update()
docs[i] = obj docs[i] = obj
} }
@@ -131,6 +135,12 @@ func (r *MongoRepository) Update(ctx context.Context, obj storable.Storable) err
return r.collection.FindOneAndReplace(ctx, idFilter(*obj.GetID()), obj).Err() return r.collection.FindOneAndReplace(ctx, idFilter(*obj.GetID()), obj).Err()
} }
func (r *MongoRepository) Upsert(ctx context.Context, obj storable.Storable) error {
id := prepareForWrite(obj)
_, err := r.collection.ReplaceOne(ctx, idFilter(id), obj, options.Replace().SetUpsert(true))
return err
}
func (r *MongoRepository) Patch(ctx context.Context, id bson.ObjectID, patch builder.Patch) error { func (r *MongoRepository) Patch(ctx context.Context, id bson.ObjectID, patch builder.Patch) error {
if id.IsZero() { if id.IsZero() {
return merrors.InvalidArgument("zero id provided while patching", "id") return merrors.InvalidArgument("zero id provided while patching", "id")

View File

@@ -0,0 +1,99 @@
//go:build integration
// +build integration
package repositoryimp_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tech/sendico/pkg/db/internal/mongo/repositoryimp"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/mongodb"
"github.com/testcontainers/testcontainers-go/wait"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
func TestMongoRepository_Upsert(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
mongoContainer, err := mongodb.Run(ctx,
"mongo:latest",
mongodb.WithUsername("root"),
mongodb.WithPassword("password"),
testcontainers.WithWaitStrategy(wait.ForLog("Waiting for connections")),
)
require.NoError(t, err, "failed to start MongoDB container")
defer terminate(ctx, t, mongoContainer)
mongoURI, err := mongoContainer.ConnectionString(ctx)
require.NoError(t, err, "failed to get MongoDB connection string")
clientOptions := options.Client().ApplyURI(mongoURI)
client, err := mongo.Connect(ctx, clientOptions)
require.NoError(t, err, "failed to connect to MongoDB")
defer disconnect(ctx, t, client)
db := client.Database("testdb")
repo := repositoryimp.NewMongoRepository(db, "testcollection")
t.Run("Upsert_InsertsWhenIDIsZero", func(t *testing.T) {
obj := &TestObject{Name: "inserted-via-upsert"}
require.True(t, obj.GetID().IsZero())
err := repo.Upsert(ctx, obj)
require.NoError(t, err)
assert.False(t, obj.GetID().IsZero())
assert.NotZero(t, obj.CreatedAt)
assert.NotZero(t, obj.UpdatedAt)
var stored TestObject
err = repo.Get(ctx, *obj.GetID(), &stored)
require.NoError(t, err)
assert.Equal(t, "inserted-via-upsert", stored.Name)
})
t.Run("Upsert_UpdatesExistingDocument", func(t *testing.T) {
obj := &TestObject{Name: "before-upsert"}
err := repo.Insert(ctx, obj, nil)
require.NoError(t, err)
createdAt := obj.CreatedAt
updatedAt := obj.UpdatedAt
time.Sleep(10 * time.Millisecond)
obj.Name = "after-upsert"
err = repo.Upsert(ctx, obj)
require.NoError(t, err)
assert.Equal(t, createdAt, obj.CreatedAt)
assert.True(t, obj.UpdatedAt.After(updatedAt))
var stored TestObject
err = repo.Get(ctx, *obj.GetID(), &stored)
require.NoError(t, err)
assert.Equal(t, "after-upsert", stored.Name)
assert.WithinDuration(t, createdAt, stored.CreatedAt, time.Second)
assert.True(t, stored.UpdatedAt.After(updatedAt))
})
t.Run("Upsert_InsertsWhenIDDoesNotExist", func(t *testing.T) {
obj := &TestObject{Name: "upsert-with-preassigned-id"}
obj.SetID(bson.NewObjectID())
err := repo.Upsert(ctx, obj)
require.NoError(t, err)
var stored TestObject
err = repo.Get(ctx, *obj.GetID(), &stored)
require.NoError(t, err)
assert.Equal(t, "upsert-with-preassigned-id", stored.Name)
assert.Equal(t, *obj.GetID(), stored.ID)
})
}

View File

@@ -147,6 +147,20 @@ func (m *memoryTokenRepository) Update(_ context.Context, obj storable.Storable)
return nil return nil
} }
func (m *memoryTokenRepository) Upsert(ctx context.Context, obj storable.Storable) error {
id := obj.GetID()
if id == nil || *id == bson.NilObjectID {
return m.Insert(ctx, obj, nil)
}
if err := m.Update(ctx, obj); err != nil {
if errors.Is(err, merrors.ErrNoData) {
return m.Insert(ctx, obj, nil)
}
return err
}
return nil
}
func (m *memoryTokenRepository) PatchMany(_ context.Context, filter builder.Query, patch builder.Patch) (int, error) { func (m *memoryTokenRepository) PatchMany(_ context.Context, filter builder.Query, patch builder.Patch) (int, error) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()

View File

@@ -28,6 +28,8 @@ type Repository interface {
FindOneByFilter(ctx context.Context, builder builder.Query, result storable.Storable) error FindOneByFilter(ctx context.Context, builder builder.Query, result storable.Storable) error
FindManyByFilter(ctx context.Context, builder builder.Query, decoder rd.DecodingFunc) error FindManyByFilter(ctx context.Context, builder builder.Query, decoder rd.DecodingFunc) error
Update(ctx context.Context, obj storable.Storable) error Update(ctx context.Context, obj storable.Storable) error
// Upsert replaces the document identified by obj ID, inserting it when no document exists.
Upsert(ctx context.Context, obj storable.Storable) error
// Patch applies partial updates defined by patch to the document identified by id. // Patch applies partial updates defined by patch to the document identified by id.
Patch(ctx context.Context, id bson.ObjectID, patch PatchDoc) error Patch(ctx context.Context, id bson.ObjectID, patch PatchDoc) error
// PatchMany applies partial updates defined by patch to all documents matching filter and returns the number of updated documents. // PatchMany applies partial updates defined by patch to all documents matching filter and returns the number of updated documents.