Merge pull request 'fixed tgsettle upsert logic' (#686) from tg-685 into main
All checks were successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
All checks were successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
Reviewed-on: #686
This commit was merged in pull request #686.
This commit is contained in:
@@ -11,7 +11,7 @@ require (
|
||||
github.com/tech/sendico/pkg v0.1.0
|
||||
go.mongodb.org/mongo-driver/v2 v2.5.0
|
||||
go.uber.org/zap v1.27.1
|
||||
google.golang.org/grpc v1.79.1
|
||||
google.golang.org/grpc v1.79.2
|
||||
google.golang.org/protobuf v1.36.11
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
@@ -210,8 +210,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 h1:ggcbiqK8WWh6l1dnltU4BgWGIGo+EVYxCaAPih/zQXQ=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
|
||||
google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY=
|
||||
google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
|
||||
google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU=
|
||||
google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
||||
@@ -119,25 +119,30 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
|
||||
return merrors.InvalidArgument("intention reference key is required", "intent_ref")
|
||||
}
|
||||
|
||||
filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey)
|
||||
err := p.repo.Insert(ctx, record, filter)
|
||||
if errors.Is(err, merrors.ErrDataConflict) {
|
||||
patch := repository.Patch().
|
||||
Set(repository.Field(fieldOperationRef), record.OperationRef).
|
||||
Set(repository.Field("paymentIntentId"), record.PaymentIntentID).
|
||||
Set(repository.Field("quoteRef"), record.QuoteRef).
|
||||
Set(repository.Field("intentRef"), record.IntentRef).
|
||||
Set(repository.Field("paymentRef"), record.PaymentRef).
|
||||
Set(repository.Field("outgoingLeg"), record.OutgoingLeg).
|
||||
Set(repository.Field("targetChatId"), record.TargetChatID).
|
||||
Set(repository.Field("requestedMoney"), record.RequestedMoney).
|
||||
Set(repository.Field("executedMoney"), record.ExecutedMoney).
|
||||
Set(repository.Field("status"), record.Status).
|
||||
Set(repository.Field("failureReason"), record.FailureReason).
|
||||
Set(repository.Field("executedAt"), record.ExecutedAt).
|
||||
Set(repository.Field("expiresAt"), record.ExpiresAt).
|
||||
Set(repository.Field("expiredAt"), record.ExpiredAt)
|
||||
_, err = p.repo.PatchMany(ctx, filter, patch)
|
||||
existing, err := p.FindByIdempotencyKey(ctx, record.IdempotencyKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if existing != nil {
|
||||
record.ID = existing.ID
|
||||
if record.CreatedAt.IsZero() {
|
||||
record.CreatedAt = existing.CreatedAt
|
||||
}
|
||||
}
|
||||
|
||||
err = p.repo.Upsert(ctx, record)
|
||||
if mongo.IsDuplicateKeyError(err) {
|
||||
// Concurrent insert by idempotency key: resolve existing ID and retry replace-by-ID.
|
||||
existing, lookupErr := p.FindByIdempotencyKey(ctx, record.IdempotencyKey)
|
||||
if lookupErr != nil {
|
||||
err = lookupErr
|
||||
} else if existing != nil {
|
||||
record.ID = existing.ID
|
||||
if record.CreatedAt.IsZero() {
|
||||
record.CreatedAt = existing.CreatedAt
|
||||
}
|
||||
err = p.repo.Upsert(ctx, record)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
|
||||
251
api/gateway/tgsettle/storage/mongo/store/payments_test.go
Normal file
251
api/gateway/tgsettle/storage/mongo/store/payments_test.go
Normal file
@@ -0,0 +1,251 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/tgsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/db/repository"
|
||||
"github.com/tech/sendico/pkg/db/storable"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type fakePaymentsRepo struct {
|
||||
repository.Repository
|
||||
|
||||
records map[string]*model.PaymentRecord
|
||||
findErrByCall map[int]error
|
||||
duplicateWhenZeroID bool
|
||||
findCalls int
|
||||
upsertCalls int
|
||||
upsertIDs []bson.ObjectID
|
||||
upsertIdempotencyKey []string
|
||||
}
|
||||
|
||||
func (f *fakePaymentsRepo) FindOneByFilter(_ context.Context, query repository.FilterQuery, result storable.Storable) error {
|
||||
f.findCalls++
|
||||
if err, ok := f.findErrByCall[f.findCalls]; ok {
|
||||
return err
|
||||
}
|
||||
|
||||
rec, ok := result.(*model.PaymentRecord)
|
||||
if !ok {
|
||||
return merrors.InvalidDataType("expected *model.PaymentRecord")
|
||||
}
|
||||
|
||||
doc := query.BuildQuery()
|
||||
if key := stringField(doc, fieldIdempotencyKey); key != "" {
|
||||
stored, ok := f.records[key]
|
||||
if !ok {
|
||||
return merrors.NoData("payment not found by filter")
|
||||
}
|
||||
*rec = *stored
|
||||
return nil
|
||||
}
|
||||
if operationRef := stringField(doc, fieldOperationRef); operationRef != "" {
|
||||
for _, stored := range f.records {
|
||||
if strings.TrimSpace(stored.OperationRef) == operationRef {
|
||||
*rec = *stored
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return merrors.NoData("payment not found by operation ref")
|
||||
}
|
||||
|
||||
return merrors.NoData("payment not found")
|
||||
}
|
||||
|
||||
func (f *fakePaymentsRepo) Upsert(_ context.Context, obj storable.Storable) error {
|
||||
f.upsertCalls++
|
||||
|
||||
rec, ok := obj.(*model.PaymentRecord)
|
||||
if !ok {
|
||||
return merrors.InvalidDataType("expected *model.PaymentRecord")
|
||||
}
|
||||
f.upsertIDs = append(f.upsertIDs, rec.ID)
|
||||
f.upsertIdempotencyKey = append(f.upsertIdempotencyKey, rec.IdempotencyKey)
|
||||
|
||||
if f.duplicateWhenZeroID && rec.ID.IsZero() {
|
||||
if _, exists := f.records[rec.IdempotencyKey]; exists {
|
||||
return mongo.WriteException{
|
||||
WriteErrors: mongo.WriteErrors{
|
||||
{
|
||||
Code: 11000,
|
||||
Message: "E11000 duplicate key error collection: tgsettle_gateway.payments",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
copyRec := *rec
|
||||
if copyRec.ID.IsZero() {
|
||||
copyRec.ID = bson.NewObjectID()
|
||||
}
|
||||
if copyRec.CreatedAt.IsZero() {
|
||||
copyRec.CreatedAt = time.Now().UTC()
|
||||
}
|
||||
copyRec.UpdatedAt = time.Now().UTC()
|
||||
if f.records == nil {
|
||||
f.records = map[string]*model.PaymentRecord{}
|
||||
}
|
||||
f.records[copyRec.IdempotencyKey] = ©Rec
|
||||
*rec = copyRec
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) {
|
||||
key := "idem-existing"
|
||||
existingID := bson.NewObjectID()
|
||||
existingCreatedAt := time.Date(2026, 3, 6, 10, 0, 0, 0, time.UTC)
|
||||
|
||||
repo := &fakePaymentsRepo{
|
||||
records: map[string]*model.PaymentRecord{
|
||||
key: {
|
||||
Base: storable.Base{
|
||||
ID: existingID,
|
||||
CreatedAt: existingCreatedAt,
|
||||
UpdatedAt: existingCreatedAt,
|
||||
},
|
||||
IdempotencyKey: key,
|
||||
PaymentIntentID: "pi-old",
|
||||
IntentRef: "intent-old",
|
||||
},
|
||||
},
|
||||
duplicateWhenZeroID: true,
|
||||
}
|
||||
store := &Payments{logger: zap.NewNop(), repo: repo}
|
||||
|
||||
record := &model.PaymentRecord{
|
||||
IdempotencyKey: key,
|
||||
PaymentIntentID: "pi-new",
|
||||
QuoteRef: "quote-new",
|
||||
IntentRef: "intent-new",
|
||||
}
|
||||
|
||||
if err := store.Upsert(context.Background(), record); err != nil {
|
||||
t.Fatalf("upsert failed: %v", err)
|
||||
}
|
||||
|
||||
if repo.upsertCalls != 1 {
|
||||
t.Fatalf("expected one upsert call, got %d", repo.upsertCalls)
|
||||
}
|
||||
if len(repo.upsertIDs) != 1 || repo.upsertIDs[0] != existingID {
|
||||
t.Fatalf("expected upsert to reuse existing id %s, got %+v", existingID.Hex(), repo.upsertIDs)
|
||||
}
|
||||
if record.ID != existingID {
|
||||
t.Fatalf("record ID mismatch: got %s want %s", record.ID.Hex(), existingID.Hex())
|
||||
}
|
||||
}
|
||||
|
||||
func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) {
|
||||
key := "idem-race"
|
||||
existingID := bson.NewObjectID()
|
||||
|
||||
repo := &fakePaymentsRepo{
|
||||
records: map[string]*model.PaymentRecord{
|
||||
key: {
|
||||
Base: storable.Base{
|
||||
ID: existingID,
|
||||
CreatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC),
|
||||
UpdatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC),
|
||||
},
|
||||
IdempotencyKey: key,
|
||||
PaymentIntentID: "pi-existing",
|
||||
IntentRef: "intent-existing",
|
||||
},
|
||||
},
|
||||
findErrByCall: map[int]error{
|
||||
1: merrors.NoData("payment not found by filter"),
|
||||
},
|
||||
duplicateWhenZeroID: true,
|
||||
}
|
||||
store := &Payments{logger: zap.NewNop(), repo: repo}
|
||||
|
||||
record := &model.PaymentRecord{
|
||||
IdempotencyKey: key,
|
||||
PaymentIntentID: "pi-new",
|
||||
QuoteRef: "quote-new",
|
||||
IntentRef: "intent-new",
|
||||
}
|
||||
|
||||
if err := store.Upsert(context.Background(), record); err != nil {
|
||||
t.Fatalf("upsert failed: %v", err)
|
||||
}
|
||||
|
||||
if repo.upsertCalls != 2 {
|
||||
t.Fatalf("expected two upsert calls, got %d", repo.upsertCalls)
|
||||
}
|
||||
if len(repo.upsertIDs) != 2 {
|
||||
t.Fatalf("expected two upsert IDs, got %d", len(repo.upsertIDs))
|
||||
}
|
||||
if !repo.upsertIDs[0].IsZero() {
|
||||
t.Fatalf("expected first upsert to use zero id due stale read, got %s", repo.upsertIDs[0].Hex())
|
||||
}
|
||||
if repo.upsertIDs[1] != existingID {
|
||||
t.Fatalf("expected retry to use existing id %s, got %s", existingID.Hex(), repo.upsertIDs[1].Hex())
|
||||
}
|
||||
}
|
||||
|
||||
func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T) {
|
||||
key := "idem-nosuchtx"
|
||||
|
||||
repo := &fakePaymentsRepo{
|
||||
records: map[string]*model.PaymentRecord{
|
||||
key: {
|
||||
Base: storable.Base{
|
||||
ID: bson.NewObjectID(),
|
||||
CreatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC),
|
||||
UpdatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC),
|
||||
},
|
||||
IdempotencyKey: key,
|
||||
PaymentIntentID: "pi-existing",
|
||||
IntentRef: "intent-existing",
|
||||
},
|
||||
},
|
||||
findErrByCall: map[int]error{
|
||||
1: merrors.NoData("payment not found by filter"),
|
||||
2: mongo.CommandError{
|
||||
Code: 251,
|
||||
Name: "NoSuchTransaction",
|
||||
Message: "Transaction with { txnNumber: 2 } has been aborted.",
|
||||
},
|
||||
},
|
||||
duplicateWhenZeroID: true,
|
||||
}
|
||||
store := &Payments{logger: zap.NewNop(), repo: repo}
|
||||
|
||||
record := &model.PaymentRecord{
|
||||
IdempotencyKey: key,
|
||||
PaymentIntentID: "pi-new",
|
||||
QuoteRef: "quote-new",
|
||||
IntentRef: "intent-new",
|
||||
}
|
||||
|
||||
err := store.Upsert(context.Background(), record)
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "NoSuchTransaction") {
|
||||
t.Fatalf("expected NoSuchTransaction error, got %v", err)
|
||||
}
|
||||
if repo.upsertCalls != 1 {
|
||||
t.Fatalf("expected one upsert attempt before lookup failure, got %d", repo.upsertCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func stringField(doc bson.D, key string) string {
|
||||
for _, entry := range doc {
|
||||
if entry.Key != key {
|
||||
continue
|
||||
}
|
||||
res, _ := entry.Value.(string)
|
||||
return strings.TrimSpace(res)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
Reference in New Issue
Block a user