extended logging
This commit is contained in:
@@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/tech/sendico/pkg/merrors"
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
"github.com/tech/sendico/pkg/mlogger"
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
"github.com/tech/sendico/pkg/mutil/mzap"
|
"github.com/tech/sendico/pkg/mutil/mzap"
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
|
||||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@@ -85,17 +84,9 @@ func (p *Payouts) Upsert(ctx context.Context, record *model.CardPayout) error {
|
|||||||
return merrors.InvalidArgument("operation ref is required", "operation_ref")
|
return merrors.InvalidArgument("operation ref is required", "operation_ref")
|
||||||
}
|
}
|
||||||
|
|
||||||
if record.ID == bson.NilObjectID {
|
if err := p.repository.Upsert(ctx, record); err != nil {
|
||||||
if err := p.repository.Insert(ctx, record, nil); err != nil {
|
p.logger.Warn("Failed to upsert payout record", zap.Error(err), mzap.ObjRef("payout_ref", record.ID),
|
||||||
p.logger.Warn("Failed to insert new record", zap.Error(err), zap.String("operation_Ref", record.OperationRef))
|
zap.String("operation_ref", record.OperationRef), zap.String("payment_ref", record.PayoutID))
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p.repository.Update(ctx, record); err != nil {
|
|
||||||
p.logger.Warn("Failed to update existing record", zap.Error(err),
|
|
||||||
zap.String("operation_Ref", record.OperationRef), mzap.ObjRef("payout_ref", record.ID))
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -207,6 +207,16 @@ func (m *memoryOrganizationRepository) Update(_ context.Context, obj storable.St
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *memoryOrganizationRepository) Upsert(ctx context.Context, obj storable.Storable) error {
|
||||||
|
if err := m.Update(ctx, obj); err != nil {
|
||||||
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
|
return m.Insert(ctx, obj, nil)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *memoryOrganizationRepository) Patch(context.Context, bson.ObjectID, builder.Patch) error {
|
func (m *memoryOrganizationRepository) Patch(context.Context, bson.ObjectID, builder.Patch) error {
|
||||||
return merrors.NotImplemented("Patch is not supported in memory repository")
|
return merrors.NotImplemented("Patch is not supported in memory repository")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -37,11 +37,18 @@ func (r *MongoRepository) Collection() string {
|
|||||||
return r.collectionName
|
return r.collectionName
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error {
|
func prepareForWrite(obj storable.Storable) bson.ObjectID {
|
||||||
if (obj.GetID() == nil) || (obj.GetID().IsZero()) {
|
id := obj.GetID()
|
||||||
|
if id == nil || id.IsZero() {
|
||||||
obj.SetID(bson.NewObjectID())
|
obj.SetID(bson.NewObjectID())
|
||||||
|
id = obj.GetID()
|
||||||
}
|
}
|
||||||
obj.Update()
|
obj.Update()
|
||||||
|
return *id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MongoRepository) Insert(ctx context.Context, obj storable.Storable, getFilter builder.Query) error {
|
||||||
|
prepareForWrite(obj)
|
||||||
_, err := r.collection.InsertOne(ctx, obj)
|
_, err := r.collection.InsertOne(ctx, obj)
|
||||||
if mongo.IsDuplicateKeyError(err) {
|
if mongo.IsDuplicateKeyError(err) {
|
||||||
if getFilter != nil {
|
if getFilter != nil {
|
||||||
@@ -61,10 +68,7 @@ func (r *MongoRepository) InsertMany(ctx context.Context, objects []storable.Sto
|
|||||||
|
|
||||||
docs := make([]interface{}, len(objects))
|
docs := make([]interface{}, len(objects))
|
||||||
for i, obj := range objects {
|
for i, obj := range objects {
|
||||||
if (obj.GetID() == nil) || (obj.GetID().IsZero()) {
|
prepareForWrite(obj)
|
||||||
obj.SetID(bson.NewObjectID())
|
|
||||||
}
|
|
||||||
obj.Update()
|
|
||||||
docs[i] = obj
|
docs[i] = obj
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -131,6 +135,12 @@ func (r *MongoRepository) Update(ctx context.Context, obj storable.Storable) err
|
|||||||
return r.collection.FindOneAndReplace(ctx, idFilter(*obj.GetID()), obj).Err()
|
return r.collection.FindOneAndReplace(ctx, idFilter(*obj.GetID()), obj).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *MongoRepository) Upsert(ctx context.Context, obj storable.Storable) error {
|
||||||
|
id := prepareForWrite(obj)
|
||||||
|
_, err := r.collection.ReplaceOne(ctx, idFilter(id), obj, options.Replace().SetUpsert(true))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (r *MongoRepository) Patch(ctx context.Context, id bson.ObjectID, patch builder.Patch) error {
|
func (r *MongoRepository) Patch(ctx context.Context, id bson.ObjectID, patch builder.Patch) error {
|
||||||
if id.IsZero() {
|
if id.IsZero() {
|
||||||
return merrors.InvalidArgument("zero id provided while patching", "id")
|
return merrors.InvalidArgument("zero id provided while patching", "id")
|
||||||
|
|||||||
@@ -0,0 +1,99 @@
|
|||||||
|
//go:build integration
|
||||||
|
// +build integration
|
||||||
|
|
||||||
|
package repositoryimp_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/tech/sendico/pkg/db/internal/mongo/repositoryimp"
|
||||||
|
"github.com/testcontainers/testcontainers-go"
|
||||||
|
"github.com/testcontainers/testcontainers-go/modules/mongodb"
|
||||||
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMongoRepository_Upsert(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
mongoContainer, err := mongodb.Run(ctx,
|
||||||
|
"mongo:latest",
|
||||||
|
mongodb.WithUsername("root"),
|
||||||
|
mongodb.WithPassword("password"),
|
||||||
|
testcontainers.WithWaitStrategy(wait.ForLog("Waiting for connections")),
|
||||||
|
)
|
||||||
|
require.NoError(t, err, "failed to start MongoDB container")
|
||||||
|
defer terminate(ctx, t, mongoContainer)
|
||||||
|
|
||||||
|
mongoURI, err := mongoContainer.ConnectionString(ctx)
|
||||||
|
require.NoError(t, err, "failed to get MongoDB connection string")
|
||||||
|
|
||||||
|
clientOptions := options.Client().ApplyURI(mongoURI)
|
||||||
|
client, err := mongo.Connect(ctx, clientOptions)
|
||||||
|
require.NoError(t, err, "failed to connect to MongoDB")
|
||||||
|
defer disconnect(ctx, t, client)
|
||||||
|
|
||||||
|
db := client.Database("testdb")
|
||||||
|
repo := repositoryimp.NewMongoRepository(db, "testcollection")
|
||||||
|
|
||||||
|
t.Run("Upsert_InsertsWhenIDIsZero", func(t *testing.T) {
|
||||||
|
obj := &TestObject{Name: "inserted-via-upsert"}
|
||||||
|
require.True(t, obj.GetID().IsZero())
|
||||||
|
|
||||||
|
err := repo.Upsert(ctx, obj)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.False(t, obj.GetID().IsZero())
|
||||||
|
assert.NotZero(t, obj.CreatedAt)
|
||||||
|
assert.NotZero(t, obj.UpdatedAt)
|
||||||
|
|
||||||
|
var stored TestObject
|
||||||
|
err = repo.Get(ctx, *obj.GetID(), &stored)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "inserted-via-upsert", stored.Name)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Upsert_UpdatesExistingDocument", func(t *testing.T) {
|
||||||
|
obj := &TestObject{Name: "before-upsert"}
|
||||||
|
err := repo.Insert(ctx, obj, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
createdAt := obj.CreatedAt
|
||||||
|
updatedAt := obj.UpdatedAt
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
obj.Name = "after-upsert"
|
||||||
|
err = repo.Upsert(ctx, obj)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, createdAt, obj.CreatedAt)
|
||||||
|
assert.True(t, obj.UpdatedAt.After(updatedAt))
|
||||||
|
|
||||||
|
var stored TestObject
|
||||||
|
err = repo.Get(ctx, *obj.GetID(), &stored)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "after-upsert", stored.Name)
|
||||||
|
assert.WithinDuration(t, createdAt, stored.CreatedAt, time.Second)
|
||||||
|
assert.True(t, stored.UpdatedAt.After(updatedAt))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Upsert_InsertsWhenIDDoesNotExist", func(t *testing.T) {
|
||||||
|
obj := &TestObject{Name: "upsert-with-preassigned-id"}
|
||||||
|
obj.SetID(bson.NewObjectID())
|
||||||
|
|
||||||
|
err := repo.Upsert(ctx, obj)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var stored TestObject
|
||||||
|
err = repo.Get(ctx, *obj.GetID(), &stored)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "upsert-with-preassigned-id", stored.Name)
|
||||||
|
assert.Equal(t, *obj.GetID(), stored.ID)
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -147,6 +147,20 @@ func (m *memoryTokenRepository) Update(_ context.Context, obj storable.Storable)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *memoryTokenRepository) Upsert(ctx context.Context, obj storable.Storable) error {
|
||||||
|
id := obj.GetID()
|
||||||
|
if id == nil || *id == bson.NilObjectID {
|
||||||
|
return m.Insert(ctx, obj, nil)
|
||||||
|
}
|
||||||
|
if err := m.Update(ctx, obj); err != nil {
|
||||||
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
|
return m.Insert(ctx, obj, nil)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *memoryTokenRepository) PatchMany(_ context.Context, filter builder.Query, patch builder.Patch) (int, error) {
|
func (m *memoryTokenRepository) PatchMany(_ context.Context, filter builder.Query, patch builder.Patch) (int, error) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
|||||||
@@ -28,6 +28,8 @@ type Repository interface {
|
|||||||
FindOneByFilter(ctx context.Context, builder builder.Query, result storable.Storable) error
|
FindOneByFilter(ctx context.Context, builder builder.Query, result storable.Storable) error
|
||||||
FindManyByFilter(ctx context.Context, builder builder.Query, decoder rd.DecodingFunc) error
|
FindManyByFilter(ctx context.Context, builder builder.Query, decoder rd.DecodingFunc) error
|
||||||
Update(ctx context.Context, obj storable.Storable) error
|
Update(ctx context.Context, obj storable.Storable) error
|
||||||
|
// Upsert replaces the document identified by obj ID, inserting it when no document exists.
|
||||||
|
Upsert(ctx context.Context, obj storable.Storable) error
|
||||||
// Patch applies partial updates defined by patch to the document identified by id.
|
// Patch applies partial updates defined by patch to the document identified by id.
|
||||||
Patch(ctx context.Context, id bson.ObjectID, patch PatchDoc) error
|
Patch(ctx context.Context, id bson.ObjectID, patch PatchDoc) error
|
||||||
// PatchMany applies partial updates defined by patch to all documents matching filter and returns the number of updated documents.
|
// PatchMany applies partial updates defined by patch to all documents matching filter and returns the number of updated documents.
|
||||||
|
|||||||
Reference in New Issue
Block a user