Merge pull request 'migrated raw mongo.Collection to repository.Repository + chain driver resolution fix' (#536) from tg-535 into main
All checks were successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
ci/woodpecker/push/gateway_chain Pipeline was successful

Reviewed-on: #536
This commit was merged in pull request #536.
This commit is contained in:
2026-02-19 19:31:08 +00:00
6 changed files with 124 additions and 90 deletions

View File

@@ -53,8 +53,7 @@ func (r *Registry) Driver(network pmodel.ChainNetwork) (driver.Driver, error) {
func resolveDriver(logger mlogger.Logger, network pmodel.ChainNetwork) (driver.Driver, error) {
switch network {
case pmodel.ChainNetworkArbitrumOne:
case pmodel.ChainNetworkArbitrumSepolia:
case pmodel.ChainNetworkArbitrumOne, pmodel.ChainNetworkArbitrumSepolia:
return arbitrum.New(logger), nil
case pmodel.ChainNetworkEthereumMainnet:
return ethereum.New(logger), nil

View File

@@ -3,9 +3,9 @@ package model
import (
"time"
"github.com/tech/sendico/pkg/db/storable"
"github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.mongodb.org/mongo-driver/v2/bson"
)
type PaymentStatus string
@@ -20,7 +20,7 @@ const (
)
type PaymentRecord struct {
ID bson.ObjectID `bson:"_id,omitempty" json:"id"`
storable.Base `bson:",inline" json:",inline"`
OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
@@ -33,15 +33,13 @@ type PaymentRecord struct {
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"`
CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"`
UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"`
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
}
type TelegramConfirmation struct {
ID bson.ObjectID `bson:"_id,omitempty" json:"id"`
storable.Base `bson:",inline" json:",inline"`
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
@@ -50,7 +48,7 @@ type TelegramConfirmation struct {
}
type PendingConfirmation struct {
ID bson.ObjectID `bson:"_id,omitempty" json:"id"`
storable.Base `bson:",inline" json:",inline"`
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
MessageID string `bson:"messageId,omitempty" json:"message_id,omitempty"`
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
@@ -60,6 +58,4 @@ type PendingConfirmation struct {
Rail string `bson:"rail,omitempty" json:"rail,omitempty"`
Clarified bool `bson:"clarified,omitempty" json:"clarified,omitempty"`
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"`
UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"`
}

View File

@@ -0,0 +1,19 @@
package model
const (
paymentsCollection = "payments"
telegramConfirmationsCollection = "telegram_confirmations"
pendingConfirmationsCollection = "pending_confirmations"
)
func (*PaymentRecord) Collection() string {
return paymentsCollection
}
func (*TelegramConfirmation) Collection() string {
return telegramConfirmationsCollection
}
func (*PendingConfirmation) Collection() string {
return pendingConfirmationsCollection
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.uber.org/zap"
)
@@ -25,7 +24,7 @@ const (
type Payments struct {
logger mlogger.Logger
coll *mongo.Collection
repo repository.Repository
}
func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
@@ -48,7 +47,7 @@ func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
p := &Payments{
logger: logger,
coll: db.Collection(paymentsCollection),
repo: repo,
}
p.logger.Debug("Payments store initialised")
return p, nil
@@ -60,8 +59,8 @@ func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model
return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key")
}
var result model.PaymentRecord
err := p.coll.FindOne(ctx, bson.M{fieldIdempotencyKey: key}).Decode(&result)
if err == mongo.ErrNoDocuments {
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldIdempotencyKey, key), &result)
if errors.Is(err, merrors.ErrNoData) {
return nil, nil
}
if err != nil {
@@ -98,10 +97,26 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
}
record.UpdatedAt = now
record.ID = bson.NilObjectID
update := bson.M{
"$set": record,
filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey)
existing := &model.PaymentRecord{}
err := p.repo.FindOneByFilter(ctx, filter, existing)
switch {
case err == nil:
record.ID = existing.ID
err = p.repo.Update(ctx, record)
case errors.Is(err, merrors.ErrNoData):
record.ID = bson.NilObjectID
err = p.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
err = findErr
break
}
record.ID = existing.ID
err = p.repo.Update(ctx, record)
}
}
_, err := p.coll.UpdateOne(ctx, bson.M{fieldIdempotencyKey: record.IdempotencyKey}, update, options.UpdateOne().SetUpsert(true))
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to upsert payment record",

View File

@@ -9,12 +9,12 @@ import (
"github.com/tech/sendico/gateway/tgsettle/storage"
"github.com/tech/sendico/gateway/tgsettle/storage/model"
"github.com/tech/sendico/pkg/db/repository"
"github.com/tech/sendico/pkg/db/repository/builder"
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.uber.org/zap"
)
@@ -27,7 +27,7 @@ const (
type PendingConfirmations struct {
logger mlogger.Logger
coll *mongo.Collection
repo repository.Repository
}
func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*PendingConfirmations, error) {
@@ -62,7 +62,7 @@ func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*Pendin
p := &PendingConfirmations{
logger: logger,
coll: db.Collection(pendingConfirmationsCollection),
repo: repo,
}
return p, nil
}
@@ -93,27 +93,28 @@ func (p *PendingConfirmations) Upsert(ctx context.Context, record *model.Pending
}
record.UpdatedAt = now
record.CreatedAt = createdAt
filter := repository.Filter(fieldPendingRequestID, record.RequestID)
existing := &model.PendingConfirmation{}
err := p.repo.FindOneByFilter(ctx, filter, existing)
switch {
case err == nil:
record.ID = existing.ID
record.CreatedAt = existing.CreatedAt
err = p.repo.Update(ctx, record)
case errors.Is(err, merrors.ErrNoData):
record.ID = bson.NilObjectID
// Explicit map avoids accidentally overriding immutable fields from stale callers.
update := bson.M{
"$set": bson.M{
"messageId": record.MessageID,
"targetChatId": record.TargetChatID,
"acceptedUserIds": record.AcceptedUserIDs,
"requestedMoney": record.RequestedMoney,
"sourceService": record.SourceService,
"rail": record.Rail,
"clarified": record.Clarified,
"expiresAt": record.ExpiresAt,
"updatedAt": record.UpdatedAt,
},
"$setOnInsert": bson.M{
"createdAt": createdAt,
},
err = p.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
err = findErr
break
}
record.ID = existing.ID
record.CreatedAt = existing.CreatedAt
err = p.repo.Update(ctx, record)
}
}
_, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: record.RequestID}, update, options.UpdateOne().SetUpsert(true))
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID))
}
@@ -126,8 +127,8 @@ func (p *PendingConfirmations) FindByRequestID(ctx context.Context, requestID st
return nil, merrors.InvalidArgument("request_id is required", "request_id")
}
var result model.PendingConfirmation
err := p.coll.FindOne(ctx, bson.M{fieldPendingRequestID: requestID}).Decode(&result)
if err == mongo.ErrNoDocuments {
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingRequestID, requestID), &result)
if errors.Is(err, merrors.ErrNoData) {
return nil, nil
}
if err != nil {
@@ -142,8 +143,8 @@ func (p *PendingConfirmations) FindByMessageID(ctx context.Context, messageID st
return nil, merrors.InvalidArgument("message_id is required", "message_id")
}
var result model.PendingConfirmation
err := p.coll.FindOne(ctx, bson.M{fieldPendingMessageID: messageID}).Decode(&result)
if err == mongo.ErrNoDocuments {
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingMessageID, messageID), &result)
if errors.Is(err, merrors.ErrNoData) {
return nil, nil
}
if err != nil {
@@ -157,12 +158,10 @@ func (p *PendingConfirmations) MarkClarified(ctx context.Context, requestID stri
if requestID == "" {
return merrors.InvalidArgument("request_id is required", "request_id")
}
_, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: requestID}, bson.M{
"$set": bson.M{
"clarified": true,
"updatedAt": time.Now(),
},
})
patch := repository.Patch().
Set(repository.Field("clarified"), true).
Set(repository.Field("updatedAt"), time.Now())
_, err := p.repo.PatchMany(ctx, repository.Filter(fieldPendingRequestID, requestID), patch)
return err
}
@@ -176,24 +175,21 @@ func (p *PendingConfirmations) AttachMessage(ctx context.Context, requestID stri
return merrors.InvalidArgument("message_id is required", "message_id")
}
filter := bson.M{
fieldPendingRequestID: requestID,
"$or": []bson.M{
{fieldPendingMessageID: bson.M{"$exists": false}},
{fieldPendingMessageID: ""},
{fieldPendingMessageID: messageID},
},
}
res, err := p.coll.UpdateOne(ctx, filter, bson.M{
"$set": bson.M{
fieldPendingMessageID: messageID,
"updatedAt": time.Now(),
},
})
filter := repository.Filter(fieldPendingRequestID, requestID).And(
repository.Query().Or(
repository.Exists(repository.Field(fieldPendingMessageID), false),
repository.Filter(fieldPendingMessageID, ""),
repository.Filter(fieldPendingMessageID, messageID),
),
)
patch := repository.Patch().
Set(repository.Field(fieldPendingMessageID), messageID).
Set(repository.Field("updatedAt"), time.Now())
updated, err := p.repo.PatchMany(ctx, filter, patch)
if err != nil {
return err
}
if res.MatchedCount == 0 {
if updated == 0 {
return merrors.NoData("pending confirmation not found")
}
return nil
@@ -204,34 +200,28 @@ func (p *PendingConfirmations) DeleteByRequestID(ctx context.Context, requestID
if requestID == "" {
return merrors.InvalidArgument("request_id is required", "request_id")
}
_, err := p.coll.DeleteOne(ctx, bson.M{fieldPendingRequestID: requestID})
return err
return p.repo.DeleteMany(ctx, repository.Filter(fieldPendingRequestID, requestID))
}
func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) {
if limit <= 0 {
limit = 100
}
filter := bson.M{
fieldPendingExpiresAt: bson.M{"$lte": now},
}
opts := options.Find().SetLimit(limit).SetSort(bson.D{{Key: fieldPendingExpiresAt, Value: 1}})
cursor, err := p.coll.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
query := repository.Query().
Comparison(repository.Field(fieldPendingExpiresAt), builder.Lte, now).
Sort(repository.Field(fieldPendingExpiresAt), true).
Limit(&limit)
result := make([]*model.PendingConfirmation, 0)
for cursor.Next(ctx) {
var next model.PendingConfirmation
if err := cursor.Decode(&next); err != nil {
return nil, err
err := p.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error {
next := &model.PendingConfirmation{}
if err := cur.Decode(next); err != nil {
return err
}
result = append(result, &next)
}
if err := cursor.Err(); err != nil {
result = append(result, next)
return nil
})
if err != nil && !errors.Is(err, merrors.ErrNoData) {
return nil, err
}
return result, nil

View File

@@ -14,7 +14,6 @@ import (
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.uber.org/zap"
)
@@ -25,7 +24,7 @@ const (
type TelegramConfirmations struct {
logger mlogger.Logger
coll *mongo.Collection
repo repository.Repository
}
func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*TelegramConfirmations, error) {
@@ -48,7 +47,7 @@ func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*Teleg
t := &TelegramConfirmations{
logger: logger,
coll: db.Collection(telegramCollection),
repo: repo,
}
t.logger.Debug("Telegram confirmations store initialised")
return t, nil
@@ -67,10 +66,26 @@ func (t *TelegramConfirmations) Upsert(ctx context.Context, record *model.Telegr
if record.ReceivedAt.IsZero() {
record.ReceivedAt = time.Now()
}
update := bson.M{
"$set": record,
filter := repository.Filter(fieldRequestID, record.RequestID)
existing := &model.TelegramConfirmation{}
err := t.repo.FindOneByFilter(ctx, filter, existing)
switch {
case err == nil:
record.ID = existing.ID
err = t.repo.Update(ctx, record)
case errors.Is(err, merrors.ErrNoData):
record.ID = bson.NilObjectID
err = t.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
if findErr := t.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
err = findErr
break
}
record.ID = existing.ID
err = t.repo.Update(ctx, record)
}
}
_, err := t.coll.UpdateOne(ctx, bson.M{fieldRequestID: record.RequestID}, update, options.UpdateOne().SetUpsert(true))
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
fields := []zap.Field{zap.String("request_id", record.RequestID)}
if record.PaymentIntentID != "" {