extended logging #454
@@ -11,7 +11,6 @@ import (
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"github.com/tech/sendico/pkg/mutil/mzap"
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@@ -85,17 +84,9 @@ func (p *Payouts) Upsert(ctx context.Context, record *model.CardPayout) error {
|
||||
return merrors.InvalidArgument("operation ref is required", "operation_ref")
|
||||
}
|
||||
|
||||
if record.ID == bson.NilObjectID {
|
||||
if err := p.repository.Insert(ctx, record, nil); err != nil {
|
||||
p.logger.Warn("Failed to insert new record", zap.Error(err), zap.String("operation_Ref", record.OperationRef))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := p.repository.Update(ctx, record); err != nil {
|
||||
p.logger.Warn("Failed to update existing record", zap.Error(err),
|
||||
zap.String("operation_Ref", record.OperationRef), mzap.ObjRef("payout_ref", record.ID))
|
||||
if err := p.repository.Upsert(ctx, record); err != nil {
|
||||
p.logger.Warn("Failed to upsert payout record", zap.Error(err), mzap.ObjRef("payout_ref", record.ID),
|
||||
zap.String("operation_ref", record.OperationRef), zap.String("payment_ref", record.PayoutID))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -207,6 +207,16 @@ func (m *memoryOrganizationRepository) Update(_ context.Context, obj storable.St
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryOrganizationRepository) Upsert(ctx context.Context, obj storable.Storable) error {
|
||||
if err := m.Update(ctx, obj); err != nil {
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
return m.Insert(ctx, obj, nil)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryOrganizationRepository) Patch(context.Context, bson.ObjectID, builder.Patch) error {
|
||||
return merrors.NotImplemented("Patch is not supported in memory repository")
|
||||
}
|
||||
|
||||
@@ -37,11 +37,18 @@ func (r *MongoRepository) Collection() string {
|
||||
return r.collectionName
|
||||
}
|
||||
|
||||
func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error {
|
||||
if (obj.GetID() == nil) || (obj.GetID().IsZero()) {
|
||||
func prepareForWrite(obj storable.Storable) bson.ObjectID {
|
||||
id := obj.GetID()
|
||||
if id == nil || id.IsZero() {
|
||||
obj.SetID(bson.NewObjectID())
|
||||
id = obj.GetID()
|
||||
}
|
||||
obj.Update()
|
||||
return *id
|
||||
}
|
||||
|
||||
func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error {
|
||||
prepareForWrite(obj)
|
||||
_, err := r.collection.InsertOne(ctx, obj)
|
||||
if mongo.IsDuplicateKeyError(err) {
|
||||
if getFilter != nil {
|
||||
@@ -61,10 +68,7 @@ func (r *MongoRepository) InsertMany(ctx context.Context, objects []storable.Sto
|
||||
|
||||
docs := make([]interface{}, len(objects))
|
||||
for i, obj := range objects {
|
||||
if (obj.GetID() == nil) || (obj.GetID().IsZero()) {
|
||||
obj.SetID(bson.NewObjectID())
|
||||
}
|
||||
obj.Update()
|
||||
prepareForWrite(obj)
|
||||
docs[i] = obj
|
||||
}
|
||||
|
||||
@@ -131,6 +135,12 @@ func (r *MongoRepository) Update(ctx context.Context, obj storable.Storable) err
|
||||
return r.collection.FindOneAndReplace(ctx, idFilter(*obj.GetID()), obj).Err()
|
||||
}
|
||||
|
||||
func (r *MongoRepository) Upsert(ctx context.Context, obj storable.Storable) error {
|
||||
id := prepareForWrite(obj)
|
||||
_, err := r.collection.ReplaceOne(ctx, idFilter(id), obj, options.Replace().SetUpsert(true))
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *MongoRepository) Patch(ctx context.Context, id bson.ObjectID, patch builder.Patch) error {
|
||||
if id.IsZero() {
|
||||
return merrors.InvalidArgument("zero id provided while patching", "id")
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
package repositoryimp_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tech/sendico/pkg/db/internal/mongo/repositoryimp"
|
||||
"github.com/testcontainers/testcontainers-go"
|
||||
"github.com/testcontainers/testcontainers-go/modules/mongodb"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
||||
)
|
||||
|
||||
func TestMongoRepository_Upsert(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
mongoContainer, err := mongodb.Run(ctx,
|
||||
"mongo:latest",
|
||||
mongodb.WithUsername("root"),
|
||||
mongodb.WithPassword("password"),
|
||||
testcontainers.WithWaitStrategy(wait.ForLog("Waiting for connections")),
|
||||
)
|
||||
require.NoError(t, err, "failed to start MongoDB container")
|
||||
defer terminate(ctx, t, mongoContainer)
|
||||
|
||||
mongoURI, err := mongoContainer.ConnectionString(ctx)
|
||||
require.NoError(t, err, "failed to get MongoDB connection string")
|
||||
|
||||
clientOptions := options.Client().ApplyURI(mongoURI)
|
||||
client, err := mongo.Connect(ctx, clientOptions)
|
||||
require.NoError(t, err, "failed to connect to MongoDB")
|
||||
defer disconnect(ctx, t, client)
|
||||
|
||||
db := client.Database("testdb")
|
||||
repo := repositoryimp.NewMongoRepository(db, "testcollection")
|
||||
|
||||
t.Run("Upsert_InsertsWhenIDIsZero", func(t *testing.T) {
|
||||
obj := &TestObject{Name: "inserted-via-upsert"}
|
||||
require.True(t, obj.GetID().IsZero())
|
||||
|
||||
err := repo.Upsert(ctx, obj)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, obj.GetID().IsZero())
|
||||
assert.NotZero(t, obj.CreatedAt)
|
||||
assert.NotZero(t, obj.UpdatedAt)
|
||||
|
||||
var stored TestObject
|
||||
err = repo.Get(ctx, *obj.GetID(), &stored)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "inserted-via-upsert", stored.Name)
|
||||
})
|
||||
|
||||
t.Run("Upsert_UpdatesExistingDocument", func(t *testing.T) {
|
||||
obj := &TestObject{Name: "before-upsert"}
|
||||
err := repo.Insert(ctx, obj, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
createdAt := obj.CreatedAt
|
||||
updatedAt := obj.UpdatedAt
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
obj.Name = "after-upsert"
|
||||
err = repo.Upsert(ctx, obj)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, createdAt, obj.CreatedAt)
|
||||
assert.True(t, obj.UpdatedAt.After(updatedAt))
|
||||
|
||||
var stored TestObject
|
||||
err = repo.Get(ctx, *obj.GetID(), &stored)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "after-upsert", stored.Name)
|
||||
assert.WithinDuration(t, createdAt, stored.CreatedAt, time.Second)
|
||||
assert.True(t, stored.UpdatedAt.After(updatedAt))
|
||||
})
|
||||
|
||||
t.Run("Upsert_InsertsWhenIDDoesNotExist", func(t *testing.T) {
|
||||
obj := &TestObject{Name: "upsert-with-preassigned-id"}
|
||||
obj.SetID(bson.NewObjectID())
|
||||
|
||||
err := repo.Upsert(ctx, obj)
|
||||
require.NoError(t, err)
|
||||
|
||||
var stored TestObject
|
||||
err = repo.Get(ctx, *obj.GetID(), &stored)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "upsert-with-preassigned-id", stored.Name)
|
||||
assert.Equal(t, *obj.GetID(), stored.ID)
|
||||
})
|
||||
}
|
||||
@@ -147,6 +147,20 @@ func (m *memoryTokenRepository) Update(_ context.Context, obj storable.Storable)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryTokenRepository) Upsert(ctx context.Context, obj storable.Storable) error {
|
||||
id := obj.GetID()
|
||||
if id == nil || *id == bson.NilObjectID {
|
||||
return m.Insert(ctx, obj, nil)
|
||||
}
|
||||
if err := m.Update(ctx, obj); err != nil {
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
return m.Insert(ctx, obj, nil)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memoryTokenRepository) PatchMany(_ context.Context, filter builder.Query, patch builder.Patch) (int, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
@@ -28,6 +28,8 @@ type Repository interface {
|
||||
FindOneByFilter(ctx context.Context, builder builder.Query, result storable.Storable) error
|
||||
FindManyByFilter(ctx context.Context, builder builder.Query, decoder rd.DecodingFunc) error
|
||||
Update(ctx context.Context, obj storable.Storable) error
|
||||
// Upsert replaces the document identified by obj ID, inserting it when no document exists.
|
||||
Upsert(ctx context.Context, obj storable.Storable) error
|
||||
// Patch applies partial updates defined by patch to the document identified by id.
|
||||
Patch(ctx context.Context, id bson.ObjectID, patch PatchDoc) error
|
||||
// PatchMany applies partial updates defined by patch to all documents matching filter and returns the number of updated documents.
|
||||
|
||||
Reference in New Issue
Block a user