From 4295456f631e1bfaa88428eeb45e7b291eb93230 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Fri, 6 Mar 2026 13:50:13 +0100 Subject: [PATCH] fixed tgsettle upsert logic --- api/gateway/tgsettle/go.mod | 2 +- api/gateway/tgsettle/go.sum | 4 +- .../tgsettle/storage/mongo/store/payments.go | 43 +-- .../storage/mongo/store/payments_test.go | 251 ++++++++++++++++++ 4 files changed, 278 insertions(+), 22 deletions(-) create mode 100644 api/gateway/tgsettle/storage/mongo/store/payments_test.go diff --git a/api/gateway/tgsettle/go.mod b/api/gateway/tgsettle/go.mod index 37efef2d..cab2452f 100644 --- a/api/gateway/tgsettle/go.mod +++ b/api/gateway/tgsettle/go.mod @@ -11,7 +11,7 @@ require ( github.com/tech/sendico/pkg v0.1.0 go.mongodb.org/mongo-driver/v2 v2.5.0 go.uber.org/zap v1.27.1 - google.golang.org/grpc v1.79.1 + google.golang.org/grpc v1.79.2 google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/api/gateway/tgsettle/go.sum b/api/gateway/tgsettle/go.sum index 5261ce70..f8c3c7eb 100644 --- a/api/gateway/tgsettle/go.sum +++ b/api/gateway/tgsettle/go.sum @@ -210,8 +210,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 h1:ggcbiqK8WWh6l1dnltU4BgWGIGo+EVYxCaAPih/zQXQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= -google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= -google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU= +google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/api/gateway/tgsettle/storage/mongo/store/payments.go b/api/gateway/tgsettle/storage/mongo/store/payments.go index 8b3c0fbe..11759af5 100644 --- a/api/gateway/tgsettle/storage/mongo/store/payments.go +++ b/api/gateway/tgsettle/storage/mongo/store/payments.go @@ -119,25 +119,30 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro return merrors.InvalidArgument("intention reference key is required", "intent_ref") } - filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey) - err := p.repo.Insert(ctx, record, filter) - if errors.Is(err, merrors.ErrDataConflict) { - patch := repository.Patch(). - Set(repository.Field(fieldOperationRef), record.OperationRef). - Set(repository.Field("paymentIntentId"), record.PaymentIntentID). - Set(repository.Field("quoteRef"), record.QuoteRef). - Set(repository.Field("intentRef"), record.IntentRef). - Set(repository.Field("paymentRef"), record.PaymentRef). - Set(repository.Field("outgoingLeg"), record.OutgoingLeg). - Set(repository.Field("targetChatId"), record.TargetChatID). - Set(repository.Field("requestedMoney"), record.RequestedMoney). - Set(repository.Field("executedMoney"), record.ExecutedMoney). - Set(repository.Field("status"), record.Status). - Set(repository.Field("failureReason"), record.FailureReason). - Set(repository.Field("executedAt"), record.ExecutedAt). - Set(repository.Field("expiresAt"), record.ExpiresAt). - Set(repository.Field("expiredAt"), record.ExpiredAt) - _, err = p.repo.PatchMany(ctx, filter, patch) + existing, err := p.FindByIdempotencyKey(ctx, record.IdempotencyKey) + if err != nil { + return err + } + if existing != nil { + record.ID = existing.ID + if record.CreatedAt.IsZero() { + record.CreatedAt = existing.CreatedAt + } + } + + err = p.repo.Upsert(ctx, record) + if mongo.IsDuplicateKeyError(err) { + // Concurrent insert by idempotency key: resolve existing ID and retry replace-by-ID. + existing, lookupErr := p.FindByIdempotencyKey(ctx, record.IdempotencyKey) + if lookupErr != nil { + err = lookupErr + } else if existing != nil { + record.ID = existing.ID + if record.CreatedAt.IsZero() { + record.CreatedAt = existing.CreatedAt + } + err = p.repo.Upsert(ctx, record) + } } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { diff --git a/api/gateway/tgsettle/storage/mongo/store/payments_test.go b/api/gateway/tgsettle/storage/mongo/store/payments_test.go new file mode 100644 index 00000000..6097c434 --- /dev/null +++ b/api/gateway/tgsettle/storage/mongo/store/payments_test.go @@ -0,0 +1,251 @@ +package store + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/tech/sendico/gateway/tgsettle/storage/model" + "github.com/tech/sendico/pkg/db/repository" + "github.com/tech/sendico/pkg/db/storable" + "github.com/tech/sendico/pkg/merrors" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.uber.org/zap" +) + +type fakePaymentsRepo struct { + repository.Repository + + records map[string]*model.PaymentRecord + findErrByCall map[int]error + duplicateWhenZeroID bool + findCalls int + upsertCalls int + upsertIDs []bson.ObjectID + upsertIdempotencyKey []string +} + +func (f *fakePaymentsRepo) FindOneByFilter(_ context.Context, query repository.FilterQuery, result storable.Storable) error { + f.findCalls++ + if err, ok := f.findErrByCall[f.findCalls]; ok { + return err + } + + rec, ok := result.(*model.PaymentRecord) + if !ok { + return merrors.InvalidDataType("expected *model.PaymentRecord") + } + + doc := query.BuildQuery() + if key := stringField(doc, fieldIdempotencyKey); key != "" { + stored, ok := f.records[key] + if !ok { + return merrors.NoData("payment not found by filter") + } + *rec = *stored + return nil + } + if operationRef := stringField(doc, fieldOperationRef); operationRef != "" { + for _, stored := range f.records { + if strings.TrimSpace(stored.OperationRef) == operationRef { + *rec = *stored + return nil + } + } + return merrors.NoData("payment not found by operation ref") + } + + return merrors.NoData("payment not found") +} + +func (f *fakePaymentsRepo) Upsert(_ context.Context, obj storable.Storable) error { + f.upsertCalls++ + + rec, ok := obj.(*model.PaymentRecord) + if !ok { + return merrors.InvalidDataType("expected *model.PaymentRecord") + } + f.upsertIDs = append(f.upsertIDs, rec.ID) + f.upsertIdempotencyKey = append(f.upsertIdempotencyKey, rec.IdempotencyKey) + + if f.duplicateWhenZeroID && rec.ID.IsZero() { + if _, exists := f.records[rec.IdempotencyKey]; exists { + return mongo.WriteException{ + WriteErrors: mongo.WriteErrors{ + { + Code: 11000, + Message: "E11000 duplicate key error collection: tgsettle_gateway.payments", + }, + }, + } + } + } + + copyRec := *rec + if copyRec.ID.IsZero() { + copyRec.ID = bson.NewObjectID() + } + if copyRec.CreatedAt.IsZero() { + copyRec.CreatedAt = time.Now().UTC() + } + copyRec.UpdatedAt = time.Now().UTC() + if f.records == nil { + f.records = map[string]*model.PaymentRecord{} + } + f.records[copyRec.IdempotencyKey] = ©Rec + *rec = copyRec + return nil +} + +func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) { + key := "idem-existing" + existingID := bson.NewObjectID() + existingCreatedAt := time.Date(2026, 3, 6, 10, 0, 0, 0, time.UTC) + + repo := &fakePaymentsRepo{ + records: map[string]*model.PaymentRecord{ + key: { + Base: storable.Base{ + ID: existingID, + CreatedAt: existingCreatedAt, + UpdatedAt: existingCreatedAt, + }, + IdempotencyKey: key, + PaymentIntentID: "pi-old", + IntentRef: "intent-old", + }, + }, + duplicateWhenZeroID: true, + } + store := &Payments{logger: zap.NewNop(), repo: repo} + + record := &model.PaymentRecord{ + IdempotencyKey: key, + PaymentIntentID: "pi-new", + QuoteRef: "quote-new", + IntentRef: "intent-new", + } + + if err := store.Upsert(context.Background(), record); err != nil { + t.Fatalf("upsert failed: %v", err) + } + + if repo.upsertCalls != 1 { + t.Fatalf("expected one upsert call, got %d", repo.upsertCalls) + } + if len(repo.upsertIDs) != 1 || repo.upsertIDs[0] != existingID { + t.Fatalf("expected upsert to reuse existing id %s, got %+v", existingID.Hex(), repo.upsertIDs) + } + if record.ID != existingID { + t.Fatalf("record ID mismatch: got %s want %s", record.ID.Hex(), existingID.Hex()) + } +} + +func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) { + key := "idem-race" + existingID := bson.NewObjectID() + + repo := &fakePaymentsRepo{ + records: map[string]*model.PaymentRecord{ + key: { + Base: storable.Base{ + ID: existingID, + CreatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC), + UpdatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC), + }, + IdempotencyKey: key, + PaymentIntentID: "pi-existing", + IntentRef: "intent-existing", + }, + }, + findErrByCall: map[int]error{ + 1: merrors.NoData("payment not found by filter"), + }, + duplicateWhenZeroID: true, + } + store := &Payments{logger: zap.NewNop(), repo: repo} + + record := &model.PaymentRecord{ + IdempotencyKey: key, + PaymentIntentID: "pi-new", + QuoteRef: "quote-new", + IntentRef: "intent-new", + } + + if err := store.Upsert(context.Background(), record); err != nil { + t.Fatalf("upsert failed: %v", err) + } + + if repo.upsertCalls != 2 { + t.Fatalf("expected two upsert calls, got %d", repo.upsertCalls) + } + if len(repo.upsertIDs) != 2 { + t.Fatalf("expected two upsert IDs, got %d", len(repo.upsertIDs)) + } + if !repo.upsertIDs[0].IsZero() { + t.Fatalf("expected first upsert to use zero id due stale read, got %s", repo.upsertIDs[0].Hex()) + } + if repo.upsertIDs[1] != existingID { + t.Fatalf("expected retry to use existing id %s, got %s", existingID.Hex(), repo.upsertIDs[1].Hex()) + } +} + +func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T) { + key := "idem-nosuchtx" + + repo := &fakePaymentsRepo{ + records: map[string]*model.PaymentRecord{ + key: { + Base: storable.Base{ + ID: bson.NewObjectID(), + CreatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC), + UpdatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC), + }, + IdempotencyKey: key, + PaymentIntentID: "pi-existing", + IntentRef: "intent-existing", + }, + }, + findErrByCall: map[int]error{ + 1: merrors.NoData("payment not found by filter"), + 2: mongo.CommandError{ + Code: 251, + Name: "NoSuchTransaction", + Message: "Transaction with { txnNumber: 2 } has been aborted.", + }, + }, + duplicateWhenZeroID: true, + } + store := &Payments{logger: zap.NewNop(), repo: repo} + + record := &model.PaymentRecord{ + IdempotencyKey: key, + PaymentIntentID: "pi-new", + QuoteRef: "quote-new", + IntentRef: "intent-new", + } + + err := store.Upsert(context.Background(), record) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "NoSuchTransaction") { + t.Fatalf("expected NoSuchTransaction error, got %v", err) + } + if repo.upsertCalls != 1 { + t.Fatalf("expected one upsert attempt before lookup failure, got %d", repo.upsertCalls) + } +} + +func stringField(doc bson.D, key string) string { + for _, entry := range doc { + if entry.Key != key { + continue + } + res, _ := entry.Value.(string) + return strings.TrimSpace(res) + } + return "" +}