From 461a340b082c89ed68c0950a93d9107efd6e3391 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Tue, 10 Feb 2026 11:33:47 +0100 Subject: [PATCH] extended logging --- .../mntx/storage/mongo/store/payouts.go | 15 +-- .../mongo/organizationdb/setarchived_test.go | 10 ++ .../mongo/repositoryimp/repository.go | 22 +++-- .../repositoryimp/repository_upsert_test.go | 99 +++++++++++++++++++ .../verificationimp/verification_test.go | 14 +++ api/pkg/db/repository/repository.go | 2 + 6 files changed, 144 insertions(+), 18 deletions(-) create mode 100644 api/pkg/db/internal/mongo/repositoryimp/repository_upsert_test.go diff --git a/api/gateway/mntx/storage/mongo/store/payouts.go b/api/gateway/mntx/storage/mongo/store/payouts.go index a2f1b862..dc0d799e 100644 --- a/api/gateway/mntx/storage/mongo/store/payouts.go +++ b/api/gateway/mntx/storage/mongo/store/payouts.go @@ -11,7 +11,6 @@ import ( "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mutil/mzap" - "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.uber.org/zap" ) @@ -85,17 +84,9 @@ func (p *Payouts) Upsert(ctx context.Context, record *model.CardPayout) error { return merrors.InvalidArgument("operation ref is required", "operation_ref") } - if record.ID == bson.NilObjectID { - if err := p.repository.Insert(ctx, record, nil); err != nil { - p.logger.Warn("Failed to insert new record", zap.Error(err), zap.String("operation_Ref", record.OperationRef)) - return err - } - return nil - } - - if err := p.repository.Update(ctx, record); err != nil { - p.logger.Warn("Failed to update existing record", zap.Error(err), - zap.String("operation_Ref", record.OperationRef), mzap.ObjRef("payout_ref", record.ID)) + if err := p.repository.Upsert(ctx, record); err != nil { + p.logger.Warn("Failed to upsert payout record", zap.Error(err), mzap.ObjRef("payout_ref", record.ID), + zap.String("operation_ref", record.OperationRef), zap.String("payment_ref", record.PayoutID)) return err } return nil diff --git a/api/pkg/db/internal/mongo/organizationdb/setarchived_test.go b/api/pkg/db/internal/mongo/organizationdb/setarchived_test.go index fa895164..0fd593c0 100644 --- a/api/pkg/db/internal/mongo/organizationdb/setarchived_test.go +++ b/api/pkg/db/internal/mongo/organizationdb/setarchived_test.go @@ -207,6 +207,16 @@ func (m *memoryOrganizationRepository) Update(_ context.Context, obj storable.St return nil } +func (m *memoryOrganizationRepository) Upsert(ctx context.Context, obj storable.Storable) error { + if err := m.Update(ctx, obj); err != nil { + if errors.Is(err, merrors.ErrNoData) { + return m.Insert(ctx, obj, nil) + } + return err + } + return nil +} + func (m *memoryOrganizationRepository) Patch(context.Context, bson.ObjectID, builder.Patch) error { return merrors.NotImplemented("Patch is not supported in memory repository") } diff --git a/api/pkg/db/internal/mongo/repositoryimp/repository.go b/api/pkg/db/internal/mongo/repositoryimp/repository.go index 2042dad6..eba2470d 100644 --- a/api/pkg/db/internal/mongo/repositoryimp/repository.go +++ b/api/pkg/db/internal/mongo/repositoryimp/repository.go @@ -37,11 +37,18 @@ func (r *MongoRepository) Collection() string { return r.collectionName } -func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error { - if (obj.GetID() == nil) || (obj.GetID().IsZero()) { +func prepareForWrite(obj storable.Storable) bson.ObjectID { + id := obj.GetID() + if id == nil || id.IsZero() { obj.SetID(bson.NewObjectID()) + id = obj.GetID() } obj.Update() + return *id +} + +func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error { + prepareForWrite(obj) _, err := r.collection.InsertOne(ctx, obj) if mongo.IsDuplicateKeyError(err) { if getFilter != nil { @@ -61,10 +68,7 @@ func (r *MongoRepository) InsertMany(ctx context.Context, objects []storable.Sto docs := make([]interface{}, len(objects)) for i, obj := range objects { - if (obj.GetID() == nil) || (obj.GetID().IsZero()) { - obj.SetID(bson.NewObjectID()) - } - obj.Update() + prepareForWrite(obj) docs[i] = obj } @@ -131,6 +135,12 @@ func (r *MongoRepository) Update(ctx context.Context, obj storable.Storable) err return r.collection.FindOneAndReplace(ctx, idFilter(*obj.GetID()), obj).Err() } +func (r *MongoRepository) Upsert(ctx context.Context, obj storable.Storable) error { + id := prepareForWrite(obj) + _, err := r.collection.ReplaceOne(ctx, idFilter(id), obj, options.Replace().SetUpsert(true)) + return err +} + func (r *MongoRepository) Patch(ctx context.Context, id bson.ObjectID, patch builder.Patch) error { if id.IsZero() { return merrors.InvalidArgument("zero id provided while patching", "id") diff --git a/api/pkg/db/internal/mongo/repositoryimp/repository_upsert_test.go b/api/pkg/db/internal/mongo/repositoryimp/repository_upsert_test.go new file mode 100644 index 00000000..d77482db --- /dev/null +++ b/api/pkg/db/internal/mongo/repositoryimp/repository_upsert_test.go @@ -0,0 +1,99 @@ +//go:build integration +// +build integration + +package repositoryimp_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tech/sendico/pkg/db/internal/mongo/repositoryimp" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/modules/mongodb" + "github.com/testcontainers/testcontainers-go/wait" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +func TestMongoRepository_Upsert(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + mongoContainer, err := mongodb.Run(ctx, + "mongo:latest", + mongodb.WithUsername("root"), + mongodb.WithPassword("password"), + testcontainers.WithWaitStrategy(wait.ForLog("Waiting for connections")), + ) + require.NoError(t, err, "failed to start MongoDB container") + defer terminate(ctx, t, mongoContainer) + + mongoURI, err := mongoContainer.ConnectionString(ctx) + require.NoError(t, err, "failed to get MongoDB connection string") + + clientOptions := options.Client().ApplyURI(mongoURI) + client, err := mongo.Connect(ctx, clientOptions) + require.NoError(t, err, "failed to connect to MongoDB") + defer disconnect(ctx, t, client) + + db := client.Database("testdb") + repo := repositoryimp.NewMongoRepository(db, "testcollection") + + t.Run("Upsert_InsertsWhenIDIsZero", func(t *testing.T) { + obj := &TestObject{Name: "inserted-via-upsert"} + require.True(t, obj.GetID().IsZero()) + + err := repo.Upsert(ctx, obj) + require.NoError(t, err) + assert.False(t, obj.GetID().IsZero()) + assert.NotZero(t, obj.CreatedAt) + assert.NotZero(t, obj.UpdatedAt) + + var stored TestObject + err = repo.Get(ctx, *obj.GetID(), &stored) + require.NoError(t, err) + assert.Equal(t, "inserted-via-upsert", stored.Name) + }) + + t.Run("Upsert_UpdatesExistingDocument", func(t *testing.T) { + obj := &TestObject{Name: "before-upsert"} + err := repo.Insert(ctx, obj, nil) + require.NoError(t, err) + + createdAt := obj.CreatedAt + updatedAt := obj.UpdatedAt + time.Sleep(10 * time.Millisecond) + + obj.Name = "after-upsert" + err = repo.Upsert(ctx, obj) + require.NoError(t, err) + + assert.Equal(t, createdAt, obj.CreatedAt) + assert.True(t, obj.UpdatedAt.After(updatedAt)) + + var stored TestObject + err = repo.Get(ctx, *obj.GetID(), &stored) + require.NoError(t, err) + assert.Equal(t, "after-upsert", stored.Name) + assert.WithinDuration(t, createdAt, stored.CreatedAt, time.Second) + assert.True(t, stored.UpdatedAt.After(updatedAt)) + }) + + t.Run("Upsert_InsertsWhenIDDoesNotExist", func(t *testing.T) { + obj := &TestObject{Name: "upsert-with-preassigned-id"} + obj.SetID(bson.NewObjectID()) + + err := repo.Upsert(ctx, obj) + require.NoError(t, err) + + var stored TestObject + err = repo.Get(ctx, *obj.GetID(), &stored) + require.NoError(t, err) + assert.Equal(t, "upsert-with-preassigned-id", stored.Name) + assert.Equal(t, *obj.GetID(), stored.ID) + }) +} diff --git a/api/pkg/db/internal/mongo/verificationimp/verification_test.go b/api/pkg/db/internal/mongo/verificationimp/verification_test.go index e63fb239..e1312a88 100644 --- a/api/pkg/db/internal/mongo/verificationimp/verification_test.go +++ b/api/pkg/db/internal/mongo/verificationimp/verification_test.go @@ -147,6 +147,20 @@ func (m *memoryTokenRepository) Update(_ context.Context, obj storable.Storable) return nil } +func (m *memoryTokenRepository) Upsert(ctx context.Context, obj storable.Storable) error { + id := obj.GetID() + if id == nil || *id == bson.NilObjectID { + return m.Insert(ctx, obj, nil) + } + if err := m.Update(ctx, obj); err != nil { + if errors.Is(err, merrors.ErrNoData) { + return m.Insert(ctx, obj, nil) + } + return err + } + return nil +} + func (m *memoryTokenRepository) PatchMany(_ context.Context, filter builder.Query, patch builder.Patch) (int, error) { m.mu.Lock() defer m.mu.Unlock() diff --git a/api/pkg/db/repository/repository.go b/api/pkg/db/repository/repository.go index a3e7e739..dae68ad9 100644 --- a/api/pkg/db/repository/repository.go +++ b/api/pkg/db/repository/repository.go @@ -28,6 +28,8 @@ type Repository interface { FindOneByFilter(ctx context.Context, builder builder.Query, result storable.Storable) error FindManyByFilter(ctx context.Context, builder builder.Query, decoder rd.DecodingFunc) error Update(ctx context.Context, obj storable.Storable) error + // Upsert replaces the document identified by obj ID, inserting it when no document exists. + Upsert(ctx context.Context, obj storable.Storable) error // Patch applies partial updates defined by patch to the document identified by id. Patch(ctx context.Context, id bson.ObjectID, patch PatchDoc) error // PatchMany applies partial updates defined by patch to all documents matching filter and returns the number of updated documents.