Merge remote-tracking branch 'origin/main' into SEND066

merge fresh main into SEND66
This commit is contained in:
Arseni
2026-03-06 17:18:39 +03:00
31 changed files with 2263 additions and 234 deletions

View File

@@ -11,7 +11,7 @@ require (
github.com/tech/sendico/pkg v0.1.0
go.mongodb.org/mongo-driver/v2 v2.5.0
go.uber.org/zap v1.27.1
google.golang.org/grpc v1.79.1
google.golang.org/grpc v1.79.2
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v3 v3.0.1
)

View File

@@ -210,8 +210,8 @@ gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171 h1:ggcbiqK8WWh6l1dnltU4BgWGIGo+EVYxCaAPih/zQXQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260226221140-a57be14db171/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY=
google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU=
google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -136,13 +136,22 @@ func (s *Service) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOp
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, "")}}, nil
}
transfer := resp.GetTransfer()
operationID := strings.TrimSpace(transfer.GetOperationRef())
if operationID == "" {
s.logger.Warn("Submit operation transfer response missing operation_ref", append(logFields,
zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())),
)...)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{
Error: connectorError(connectorv1.ErrorCode_TEMPORARY_UNAVAILABLE, "submit_operation: operation_ref is missing in transfer response", op, ""),
}}, nil
}
s.logger.Info("Submit operation transfer submitted", append(logFields,
zap.String("transfer_ref", strings.TrimSpace(transfer.GetTransferRef())),
zap.String("status", transfer.GetStatus().String()),
)...)
return &connectorv1.SubmitOperationResponse{
Receipt: &connectorv1.OperationReceipt{
OperationId: strings.TrimSpace(transfer.GetTransferRef()),
OperationId: operationID,
Status: transferStatusToOperation(transfer.GetStatus()),
ProviderRef: strings.TrimSpace(transfer.GetTransferRef()),
},
@@ -224,7 +233,7 @@ func transferToOperation(transfer *chainv1.Transfer) *connectorv1.Operation {
return nil
}
op := &connectorv1.Operation{
OperationId: strings.TrimSpace(transfer.GetTransferRef()),
OperationId: strings.TrimSpace(transfer.GetOperationRef()),
Type: connectorv1.OperationType_TRANSFER,
Status: transferStatusToOperation(transfer.GetStatus()),
Money: transfer.GetRequestedAmount(),

View File

@@ -0,0 +1,119 @@
package gateway
import (
"context"
"testing"
storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestSubmitOperation_UsesOperationRefAsOperationID(t *testing.T) {
svc, _, _ := newTestService(t)
svc.chatID = "1"
req := &connectorv1.SubmitOperationRequest{
Operation: &connectorv1.Operation{
Type: connectorv1.OperationType_TRANSFER,
IdempotencyKey: "idem-settlement-1",
OperationRef: "payment-1:hop_2_settlement_fx_convert",
IntentRef: "intent-1",
Money: &moneyv1.Money{Amount: "1.00", Currency: "USDT"},
From: &connectorv1.OperationParty{
Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{
ConnectorId: tgsettleConnectorID,
AccountId: "wallet-src",
}},
},
To: &connectorv1.OperationParty{
Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{
ConnectorId: tgsettleConnectorID,
AccountId: "wallet-dst",
}},
},
Params: structFromMap(map[string]interface{}{
"payment_ref": "payment-1",
"organization_ref": "org-1",
}),
},
}
resp, err := svc.SubmitOperation(context.Background(), req)
if err != nil {
t.Fatalf("SubmitOperation returned error: %v", err)
}
if resp.GetReceipt() == nil {
t.Fatal("expected receipt")
}
if got := resp.GetReceipt().GetError(); got != nil {
t.Fatalf("expected no connector error, got: %v", got)
}
if got, want := resp.GetReceipt().GetOperationId(), "payment-1:hop_2_settlement_fx_convert"; got != want {
t.Fatalf("operation_id mismatch: got=%q want=%q", got, want)
}
if got, want := resp.GetReceipt().GetProviderRef(), "idem-settlement-1"; got != want {
t.Fatalf("provider_ref mismatch: got=%q want=%q", got, want)
}
}
func TestGetOperation_UsesOperationRefIdentity(t *testing.T) {
svc, repo, _ := newTestService(t)
record := &storagemodel.PaymentRecord{
IdempotencyKey: "idem-settlement-2",
OperationRef: "payment-2:hop_2_settlement_fx_convert",
PaymentIntentID: "pi-2",
PaymentRef: "payment-2",
RequestedMoney: &paymenttypes.Money{Amount: "5.00", Currency: "USDT"},
Status: storagemodel.PaymentStatusSuccess,
}
if err := repo.payments.Upsert(context.Background(), record); err != nil {
t.Fatalf("failed to seed payment record: %v", err)
}
resp, err := svc.GetOperation(context.Background(), &connectorv1.GetOperationRequest{
OperationId: "payment-2:hop_2_settlement_fx_convert",
})
if err != nil {
t.Fatalf("GetOperation returned error: %v", err)
}
if resp.GetOperation() == nil {
t.Fatal("expected operation")
}
if got, want := resp.GetOperation().GetOperationId(), "payment-2:hop_2_settlement_fx_convert"; got != want {
t.Fatalf("operation_id mismatch: got=%q want=%q", got, want)
}
if got, want := resp.GetOperation().GetProviderRef(), "idem-settlement-2"; got != want {
t.Fatalf("provider_ref mismatch: got=%q want=%q", got, want)
}
}
func TestGetOperation_DoesNotResolveByIdempotencyKey(t *testing.T) {
svc, repo, _ := newTestService(t)
record := &storagemodel.PaymentRecord{
IdempotencyKey: "idem-settlement-3",
OperationRef: "payment-3:hop_2_settlement_fx_convert",
PaymentIntentID: "pi-3",
PaymentRef: "payment-3",
RequestedMoney: &paymenttypes.Money{Amount: "5.00", Currency: "USDT"},
Status: storagemodel.PaymentStatusSuccess,
}
if err := repo.payments.Upsert(context.Background(), record); err != nil {
t.Fatalf("failed to seed payment record: %v", err)
}
_, err := svc.GetOperation(context.Background(), &connectorv1.GetOperationRequest{
OperationId: "idem-settlement-3",
})
if err == nil {
t.Fatal("expected not found error")
}
if status.Code(err) != codes.NotFound {
t.Fatalf("unexpected error code: got=%s want=%s", status.Code(err), codes.NotFound)
}
}

View File

@@ -20,22 +20,25 @@ const (
)
type PaymentRecord struct {
storable.Base `bson:",inline" json:",inline"`
OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
IntentRef string `bson:"intentRef,omitempty" json:"intent_ref,omitempty"`
PaymentRef string `bson:"paymentRef,omitempty" json:"payment_ref,omitempty"`
OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"`
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"`
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
storable.Base `bson:",inline" json:",inline"`
OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
ConfirmationRef string `bson:"confirmationRef,omitempty" json:"confirmation_ref,omitempty"`
ConfirmationMessageID string `bson:"confirmationMessageId,omitempty" json:"confirmation_message_id,omitempty"`
ConfirmationReplyMessageID string `bson:"confirmationReplyMessageId,omitempty" json:"confirmation_reply_message_id,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
IntentRef string `bson:"intentRef,omitempty" json:"intent_ref,omitempty"`
PaymentRef string `bson:"paymentRef,omitempty" json:"payment_ref,omitempty"`
OutgoingLeg string `bson:"outgoingLeg,omitempty" json:"outgoing_leg,omitempty"`
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
RequestedMoney *paymenttypes.Money `bson:"requestedMoney,omitempty" json:"requested_money,omitempty"`
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"`
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
}
type TelegramConfirmation struct {

View File

@@ -103,14 +103,13 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
return merrors.InvalidArgument("payment record is nil", "record")
}
record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey)
record.PaymentIntentID = strings.TrimSpace(record.PaymentIntentID)
record.QuoteRef = strings.TrimSpace(record.QuoteRef)
record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg)
record.TargetChatID = strings.TrimSpace(record.TargetChatID)
record.IntentRef = strings.TrimSpace(record.IntentRef)
record.OperationRef = strings.TrimSpace(record.OperationRef)
if record.PaymentIntentID == "" {
return merrors.InvalidArgument("intention reference is required", "payment_intent_ref")
if record.IntentRef == "" {
return merrors.InvalidArgument("intention reference is required", "intent_ref")
}
if record.IdempotencyKey == "" {
return merrors.InvalidArgument("idempotency key is required", "idempotency_key")
@@ -119,31 +118,36 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
return merrors.InvalidArgument("intention reference key is required", "intent_ref")
}
filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey)
err := p.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
patch := repository.Patch().
Set(repository.Field(fieldOperationRef), record.OperationRef).
Set(repository.Field("paymentIntentId"), record.PaymentIntentID).
Set(repository.Field("quoteRef"), record.QuoteRef).
Set(repository.Field("intentRef"), record.IntentRef).
Set(repository.Field("paymentRef"), record.PaymentRef).
Set(repository.Field("outgoingLeg"), record.OutgoingLeg).
Set(repository.Field("targetChatId"), record.TargetChatID).
Set(repository.Field("requestedMoney"), record.RequestedMoney).
Set(repository.Field("executedMoney"), record.ExecutedMoney).
Set(repository.Field("status"), record.Status).
Set(repository.Field("failureReason"), record.FailureReason).
Set(repository.Field("executedAt"), record.ExecutedAt).
Set(repository.Field("expiresAt"), record.ExpiresAt).
Set(repository.Field("expiredAt"), record.ExpiredAt)
_, err = p.repo.PatchMany(ctx, filter, patch)
existing, err := p.FindByIdempotencyKey(ctx, record.IdempotencyKey)
if err != nil {
return err
}
if existing != nil {
record.ID = existing.ID
if record.CreatedAt.IsZero() {
record.CreatedAt = existing.CreatedAt
}
}
err = p.repo.Upsert(ctx, record)
if mongo.IsDuplicateKeyError(err) {
// Concurrent insert by idempotency key: resolve existing ID and retry replace-by-ID.
existing, lookupErr := p.FindByIdempotencyKey(ctx, record.IdempotencyKey)
if lookupErr != nil {
err = lookupErr
} else if existing != nil {
record.ID = existing.ID
if record.CreatedAt.IsZero() {
record.CreatedAt = existing.CreatedAt
}
err = p.repo.Upsert(ctx, record)
}
}
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to upsert payment record",
zap.String("idempotency_key", record.IdempotencyKey),
zap.String("payment_intent_id", record.PaymentIntentID),
zap.String("intent_ref", record.IntentRef),
zap.String("quote_ref", record.QuoteRef),
zap.Error(err))
}

View File

@@ -0,0 +1,245 @@
package store
import (
"context"
"strings"
"testing"
"time"
"github.com/tech/sendico/gateway/tgsettle/storage/model"
"github.com/tech/sendico/pkg/db/repository"
"github.com/tech/sendico/pkg/db/storable"
"github.com/tech/sendico/pkg/merrors"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.uber.org/zap"
)
type fakePaymentsRepo struct {
repository.Repository
records map[string]*model.PaymentRecord
findErrByCall map[int]error
duplicateWhenZeroID bool
findCalls int
upsertCalls int
upsertIDs []bson.ObjectID
upsertIdempotencyKey []string
}
func (f *fakePaymentsRepo) FindOneByFilter(_ context.Context, query repository.FilterQuery, result storable.Storable) error {
f.findCalls++
if err, ok := f.findErrByCall[f.findCalls]; ok {
return err
}
rec, ok := result.(*model.PaymentRecord)
if !ok {
return merrors.InvalidDataType("expected *model.PaymentRecord")
}
doc := query.BuildQuery()
if key := stringField(doc, fieldIdempotencyKey); key != "" {
stored, ok := f.records[key]
if !ok {
return merrors.NoData("payment not found by filter")
}
*rec = *stored
return nil
}
if operationRef := stringField(doc, fieldOperationRef); operationRef != "" {
for _, stored := range f.records {
if strings.TrimSpace(stored.OperationRef) == operationRef {
*rec = *stored
return nil
}
}
return merrors.NoData("payment not found by operation ref")
}
return merrors.NoData("payment not found")
}
func (f *fakePaymentsRepo) Upsert(_ context.Context, obj storable.Storable) error {
f.upsertCalls++
rec, ok := obj.(*model.PaymentRecord)
if !ok {
return merrors.InvalidDataType("expected *model.PaymentRecord")
}
f.upsertIDs = append(f.upsertIDs, rec.ID)
f.upsertIdempotencyKey = append(f.upsertIdempotencyKey, rec.IdempotencyKey)
if f.duplicateWhenZeroID && rec.ID.IsZero() {
if _, exists := f.records[rec.IdempotencyKey]; exists {
return mongo.WriteException{
WriteErrors: mongo.WriteErrors{
{
Code: 11000,
Message: "E11000 duplicate key error collection: tgsettle_gateway.payments",
},
},
}
}
}
copyRec := *rec
if copyRec.ID.IsZero() {
copyRec.ID = bson.NewObjectID()
}
if copyRec.CreatedAt.IsZero() {
copyRec.CreatedAt = time.Now().UTC()
}
copyRec.UpdatedAt = time.Now().UTC()
if f.records == nil {
f.records = map[string]*model.PaymentRecord{}
}
f.records[copyRec.IdempotencyKey] = &copyRec
*rec = copyRec
return nil
}
func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) {
key := "idem-existing"
existingID := bson.NewObjectID()
existingCreatedAt := time.Date(2026, 3, 6, 10, 0, 0, 0, time.UTC)
repo := &fakePaymentsRepo{
records: map[string]*model.PaymentRecord{
key: {
Base: storable.Base{
ID: existingID,
CreatedAt: existingCreatedAt,
UpdatedAt: existingCreatedAt,
},
IdempotencyKey: key,
IntentRef: "pi-old",
},
},
duplicateWhenZeroID: true,
}
store := &Payments{logger: zap.NewNop(), repo: repo}
record := &model.PaymentRecord{
IdempotencyKey: key,
IntentRef: "pi-new",
QuoteRef: "quote-new",
}
if err := store.Upsert(context.Background(), record); err != nil {
t.Fatalf("upsert failed: %v", err)
}
if repo.upsertCalls != 1 {
t.Fatalf("expected one upsert call, got %d", repo.upsertCalls)
}
if len(repo.upsertIDs) != 1 || repo.upsertIDs[0] != existingID {
t.Fatalf("expected upsert to reuse existing id %s, got %+v", existingID.Hex(), repo.upsertIDs)
}
if record.ID != existingID {
t.Fatalf("record ID mismatch: got %s want %s", record.ID.Hex(), existingID.Hex())
}
}
func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) {
key := "idem-race"
existingID := bson.NewObjectID()
repo := &fakePaymentsRepo{
records: map[string]*model.PaymentRecord{
key: {
Base: storable.Base{
ID: existingID,
CreatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC),
UpdatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC),
},
IdempotencyKey: key,
IntentRef: "pi-existing",
},
},
findErrByCall: map[int]error{
1: merrors.NoData("payment not found by filter"),
},
duplicateWhenZeroID: true,
}
store := &Payments{logger: zap.NewNop(), repo: repo}
record := &model.PaymentRecord{
IdempotencyKey: key,
IntentRef: "pi-new",
QuoteRef: "quote-new",
}
if err := store.Upsert(context.Background(), record); err != nil {
t.Fatalf("upsert failed: %v", err)
}
if repo.upsertCalls != 2 {
t.Fatalf("expected two upsert calls, got %d", repo.upsertCalls)
}
if len(repo.upsertIDs) != 2 {
t.Fatalf("expected two upsert IDs, got %d", len(repo.upsertIDs))
}
if !repo.upsertIDs[0].IsZero() {
t.Fatalf("expected first upsert to use zero id due stale read, got %s", repo.upsertIDs[0].Hex())
}
if repo.upsertIDs[1] != existingID {
t.Fatalf("expected retry to use existing id %s, got %s", existingID.Hex(), repo.upsertIDs[1].Hex())
}
}
func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T) {
key := "idem-nosuchtx"
repo := &fakePaymentsRepo{
records: map[string]*model.PaymentRecord{
key: {
Base: storable.Base{
ID: bson.NewObjectID(),
CreatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC),
UpdatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC),
},
IdempotencyKey: key,
IntentRef: "pi-existing",
},
},
findErrByCall: map[int]error{
1: merrors.NoData("payment not found by filter"),
2: mongo.CommandError{
Code: 251,
Name: "NoSuchTransaction",
Message: "Transaction with { txnNumber: 2 } has been aborted.",
},
},
duplicateWhenZeroID: true,
}
store := &Payments{logger: zap.NewNop(), repo: repo}
record := &model.PaymentRecord{
IdempotencyKey: key,
IntentRef: "pi-new",
QuoteRef: "quote-new",
}
err := store.Upsert(context.Background(), record)
if err == nil {
t.Fatal("expected error, got nil")
}
if !strings.Contains(err.Error(), "NoSuchTransaction") {
t.Fatalf("expected NoSuchTransaction error, got %v", err)
}
if repo.upsertCalls != 1 {
t.Fatalf("expected one upsert attempt before lookup failure, got %d", repo.upsertCalls)
}
}
func stringField(doc bson.D, key string) string {
for _, entry := range doc {
if entry.Key != key {
continue
}
res, _ := entry.Value.(string)
return strings.TrimSpace(res)
}
return ""
}