extended logging #454

Merged
tech merged 1 commits from mntx-452 into main 2026-02-10 10:34:19 +00:00
6 changed files with 144 additions and 18 deletions

View File

@@ -11,7 +11,6 @@ import (
"github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mutil/mzap" "github.com/tech/sendico/pkg/mutil/mzap"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -85,17 +84,9 @@ func (p *Payouts) Upsert(ctx context.Context, record *model.CardPayout) error {
return merrors.InvalidArgument("operation ref is required", "operation_ref") return merrors.InvalidArgument("operation ref is required", "operation_ref")
} }
if record.ID == bson.NilObjectID { if err := p.repository.Upsert(ctx, record); err != nil {
if err := p.repository.Insert(ctx, record, nil); err != nil { p.logger.Warn("Failed to upsert payout record", zap.Error(err), mzap.ObjRef("payout_ref", record.ID),
p.logger.Warn("Failed to insert new record", zap.Error(err), zap.String("operation_Ref", record.OperationRef)) zap.String("operation_ref", record.OperationRef), zap.String("payment_ref", record.PayoutID))
return err
}
return nil
}
if err := p.repository.Update(ctx, record); err != nil {
p.logger.Warn("Failed to update existing record", zap.Error(err),
zap.String("operation_Ref", record.OperationRef), mzap.ObjRef("payout_ref", record.ID))
return err return err
} }
return nil return nil

View File

@@ -207,6 +207,16 @@ func (m *memoryOrganizationRepository) Update(_ context.Context, obj storable.St
return nil return nil
} }
func (m *memoryOrganizationRepository) Upsert(ctx context.Context, obj storable.Storable) error {
if err := m.Update(ctx, obj); err != nil {
if errors.Is(err, merrors.ErrNoData) {
return m.Insert(ctx, obj, nil)
}
return err
}
return nil
}
func (m *memoryOrganizationRepository) Patch(context.Context, bson.ObjectID, builder.Patch) error { func (m *memoryOrganizationRepository) Patch(context.Context, bson.ObjectID, builder.Patch) error {
return merrors.NotImplemented("Patch is not supported in memory repository") return merrors.NotImplemented("Patch is not supported in memory repository")
} }

View File

@@ -37,11 +37,18 @@ func (r *MongoRepository) Collection() string {
return r.collectionName return r.collectionName
} }
func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error { func prepareForWrite(obj storable.Storable) bson.ObjectID {
if (obj.GetID() == nil) || (obj.GetID().IsZero()) { id := obj.GetID()
if id == nil || id.IsZero() {
obj.SetID(bson.NewObjectID()) obj.SetID(bson.NewObjectID())
id = obj.GetID()
} }
obj.Update() obj.Update()
return *id
}
func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error {
prepareForWrite(obj)
_, err := r.collection.InsertOne(ctx, obj) _, err := r.collection.InsertOne(ctx, obj)
if mongo.IsDuplicateKeyError(err) { if mongo.IsDuplicateKeyError(err) {
if getFilter != nil { if getFilter != nil {
@@ -61,10 +68,7 @@ func (r *MongoRepository) InsertMany(ctx context.Context, objects []storable.Sto
docs := make([]interface{}, len(objects)) docs := make([]interface{}, len(objects))
for i, obj := range objects { for i, obj := range objects {
if (obj.GetID() == nil) || (obj.GetID().IsZero()) { prepareForWrite(obj)
obj.SetID(bson.NewObjectID())
}
obj.Update()
docs[i] = obj docs[i] = obj
} }
@@ -131,6 +135,12 @@ func (r *MongoRepository) Update(ctx context.Context, obj storable.Storable) err
return r.collection.FindOneAndReplace(ctx, idFilter(*obj.GetID()), obj).Err() return r.collection.FindOneAndReplace(ctx, idFilter(*obj.GetID()), obj).Err()
} }
func (r *MongoRepository) Upsert(ctx context.Context, obj storable.Storable) error {
id := prepareForWrite(obj)
_, err := r.collection.ReplaceOne(ctx, idFilter(id), obj, options.Replace().SetUpsert(true))
return err
}
func (r *MongoRepository) Patch(ctx context.Context, id bson.ObjectID, patch builder.Patch) error { func (r *MongoRepository) Patch(ctx context.Context, id bson.ObjectID, patch builder.Patch) error {
if id.IsZero() { if id.IsZero() {
return merrors.InvalidArgument("zero id provided while patching", "id") return merrors.InvalidArgument("zero id provided while patching", "id")

View File

@@ -0,0 +1,99 @@
//go:build integration
// +build integration
package repositoryimp_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tech/sendico/pkg/db/internal/mongo/repositoryimp"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/mongodb"
"github.com/testcontainers/testcontainers-go/wait"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
func TestMongoRepository_Upsert(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
mongoContainer, err := mongodb.Run(ctx,
"mongo:latest",
mongodb.WithUsername("root"),
mongodb.WithPassword("password"),
testcontainers.WithWaitStrategy(wait.ForLog("Waiting for connections")),
)
require.NoError(t, err, "failed to start MongoDB container")
defer terminate(ctx, t, mongoContainer)
mongoURI, err := mongoContainer.ConnectionString(ctx)
require.NoError(t, err, "failed to get MongoDB connection string")
clientOptions := options.Client().ApplyURI(mongoURI)
client, err := mongo.Connect(ctx, clientOptions)
require.NoError(t, err, "failed to connect to MongoDB")
defer disconnect(ctx, t, client)
db := client.Database("testdb")
repo := repositoryimp.NewMongoRepository(db, "testcollection")
t.Run("Upsert_InsertsWhenIDIsZero", func(t *testing.T) {
obj := &TestObject{Name: "inserted-via-upsert"}
require.True(t, obj.GetID().IsZero())
err := repo.Upsert(ctx, obj)
require.NoError(t, err)
assert.False(t, obj.GetID().IsZero())
assert.NotZero(t, obj.CreatedAt)
assert.NotZero(t, obj.UpdatedAt)
var stored TestObject
err = repo.Get(ctx, *obj.GetID(), &stored)
require.NoError(t, err)
assert.Equal(t, "inserted-via-upsert", stored.Name)
})
t.Run("Upsert_UpdatesExistingDocument", func(t *testing.T) {
obj := &TestObject{Name: "before-upsert"}
err := repo.Insert(ctx, obj, nil)
require.NoError(t, err)
createdAt := obj.CreatedAt
updatedAt := obj.UpdatedAt
time.Sleep(10 * time.Millisecond)
obj.Name = "after-upsert"
err = repo.Upsert(ctx, obj)
require.NoError(t, err)
assert.Equal(t, createdAt, obj.CreatedAt)
assert.True(t, obj.UpdatedAt.After(updatedAt))
var stored TestObject
err = repo.Get(ctx, *obj.GetID(), &stored)
require.NoError(t, err)
assert.Equal(t, "after-upsert", stored.Name)
assert.WithinDuration(t, createdAt, stored.CreatedAt, time.Second)
assert.True(t, stored.UpdatedAt.After(updatedAt))
})
t.Run("Upsert_InsertsWhenIDDoesNotExist", func(t *testing.T) {
obj := &TestObject{Name: "upsert-with-preassigned-id"}
obj.SetID(bson.NewObjectID())
err := repo.Upsert(ctx, obj)
require.NoError(t, err)
var stored TestObject
err = repo.Get(ctx, *obj.GetID(), &stored)
require.NoError(t, err)
assert.Equal(t, "upsert-with-preassigned-id", stored.Name)
assert.Equal(t, *obj.GetID(), stored.ID)
})
}

View File

@@ -147,6 +147,20 @@ func (m *memoryTokenRepository) Update(_ context.Context, obj storable.Storable)
return nil return nil
} }
func (m *memoryTokenRepository) Upsert(ctx context.Context, obj storable.Storable) error {
id := obj.GetID()
if id == nil || *id == bson.NilObjectID {
return m.Insert(ctx, obj, nil)
}
if err := m.Update(ctx, obj); err != nil {
if errors.Is(err, merrors.ErrNoData) {
return m.Insert(ctx, obj, nil)
}
return err
}
return nil
}
func (m *memoryTokenRepository) PatchMany(_ context.Context, filter builder.Query, patch builder.Patch) (int, error) { func (m *memoryTokenRepository) PatchMany(_ context.Context, filter builder.Query, patch builder.Patch) (int, error) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()

View File

@@ -28,6 +28,8 @@ type Repository interface {
FindOneByFilter(ctx context.Context, builder builder.Query, result storable.Storable) error FindOneByFilter(ctx context.Context, builder builder.Query, result storable.Storable) error
FindManyByFilter(ctx context.Context, builder builder.Query, decoder rd.DecodingFunc) error FindManyByFilter(ctx context.Context, builder builder.Query, decoder rd.DecodingFunc) error
Update(ctx context.Context, obj storable.Storable) error Update(ctx context.Context, obj storable.Storable) error
// Upsert replaces the document identified by obj ID, inserting it when no document exists.
Upsert(ctx context.Context, obj storable.Storable) error
// Patch applies partial updates defined by patch to the document identified by id. // Patch applies partial updates defined by patch to the document identified by id.
Patch(ctx context.Context, id bson.ObjectID, patch PatchDoc) error Patch(ctx context.Context, id bson.ObjectID, patch PatchDoc) error
// PatchMany applies partial updates defined by patch to all documents matching filter and returns the number of updated documents. // PatchMany applies partial updates defined by patch to all documents matching filter and returns the number of updated documents.