migrated raw mongo.Collection to repository.Repository + chain driver resolution fix

This commit is contained in:
Stephan D
2026-02-19 20:22:41 +01:00
parent b6cced6947
commit afd8d8d01e
6 changed files with 124 additions and 90 deletions

View File

@@ -53,8 +53,7 @@ func (r *Registry) Driver(network pmodel.ChainNetwork) (driver.Driver, error) {
func resolveDriver(logger mlogger.Logger, network pmodel.ChainNetwork) (driver.Driver, error) { func resolveDriver(logger mlogger.Logger, network pmodel.ChainNetwork) (driver.Driver, error) {
switch network { switch network {
case pmodel.ChainNetworkArbitrumOne: case pmodel.ChainNetworkArbitrumOne, pmodel.ChainNetworkArbitrumSepolia:
case pmodel.ChainNetworkArbitrumSepolia:
return arbitrum.New(logger), nil return arbitrum.New(logger), nil
case pmodel.ChainNetworkEthereumMainnet: case pmodel.ChainNetworkEthereumMainnet:
return ethereum.New(logger), nil return ethereum.New(logger), nil

View File

@@ -3,9 +3,9 @@ package model
import ( import (
"time" "time"
"github.com/tech/sendico/pkg/db/storable"
"github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/model"
paymenttypes "github.com/tech/sendico/pkg/payments/types" paymenttypes "github.com/tech/sendico/pkg/payments/types"
"go.mongodb.org/mongo-driver/v2/bson"
) )
type PaymentStatus string type PaymentStatus string
@@ -20,7 +20,7 @@ const (
) )
type PaymentRecord struct { type PaymentRecord struct {
ID bson.ObjectID `bson:"_id,omitempty" json:"id"` storable.Base `bson:",inline" json:",inline"`
OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"` OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"`
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
@@ -33,15 +33,13 @@ type PaymentRecord struct {
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"` ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"` Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"` FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"`
CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"`
UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"`
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"` ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"` ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
} }
type TelegramConfirmation struct { type TelegramConfirmation struct {
ID bson.ObjectID `bson:"_id,omitempty" json:"id"` storable.Base `bson:",inline" json:",inline"`
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"` RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
@@ -50,7 +48,7 @@ type TelegramConfirmation struct {
} }
type PendingConfirmation struct { type PendingConfirmation struct {
ID bson.ObjectID `bson:"_id,omitempty" json:"id"` storable.Base `bson:",inline" json:",inline"`
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"` RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
MessageID string `bson:"messageId,omitempty" json:"message_id,omitempty"` MessageID string `bson:"messageId,omitempty" json:"message_id,omitempty"`
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"` TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
@@ -60,6 +58,4 @@ type PendingConfirmation struct {
Rail string `bson:"rail,omitempty" json:"rail,omitempty"` Rail string `bson:"rail,omitempty" json:"rail,omitempty"`
Clarified bool `bson:"clarified,omitempty" json:"clarified,omitempty"` Clarified bool `bson:"clarified,omitempty" json:"clarified,omitempty"`
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"` ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"`
UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"`
} }

View File

@@ -0,0 +1,19 @@
package model
const (
paymentsCollection = "payments"
telegramConfirmationsCollection = "telegram_confirmations"
pendingConfirmationsCollection = "pending_confirmations"
)
func (*PaymentRecord) Collection() string {
return paymentsCollection
}
func (*TelegramConfirmation) Collection() string {
return telegramConfirmationsCollection
}
func (*PendingConfirmation) Collection() string {
return pendingConfirmationsCollection
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -25,7 +24,7 @@ const (
type Payments struct { type Payments struct {
logger mlogger.Logger logger mlogger.Logger
coll *mongo.Collection repo repository.Repository
} }
func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) { func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
@@ -48,7 +47,7 @@ func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
p := &Payments{ p := &Payments{
logger: logger, logger: logger,
coll: db.Collection(paymentsCollection), repo: repo,
} }
p.logger.Debug("Payments store initialised") p.logger.Debug("Payments store initialised")
return p, nil return p, nil
@@ -60,8 +59,8 @@ func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model
return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key") return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key")
} }
var result model.PaymentRecord var result model.PaymentRecord
err := p.coll.FindOne(ctx, bson.M{fieldIdempotencyKey: key}).Decode(&result) err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldIdempotencyKey, key), &result)
if err == mongo.ErrNoDocuments { if errors.Is(err, merrors.ErrNoData) {
return nil, nil return nil, nil
} }
if err != nil { if err != nil {
@@ -98,10 +97,26 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
} }
record.UpdatedAt = now record.UpdatedAt = now
record.ID = bson.NilObjectID record.ID = bson.NilObjectID
update := bson.M{
"$set": record, filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey)
existing := &model.PaymentRecord{}
err := p.repo.FindOneByFilter(ctx, filter, existing)
switch {
case err == nil:
record.ID = existing.ID
err = p.repo.Update(ctx, record)
case errors.Is(err, merrors.ErrNoData):
record.ID = bson.NilObjectID
err = p.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
err = findErr
break
}
record.ID = existing.ID
err = p.repo.Update(ctx, record)
}
} }
_, err := p.coll.UpdateOne(ctx, bson.M{fieldIdempotencyKey: record.IdempotencyKey}, update, options.UpdateOne().SetUpsert(true))
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to upsert payment record", p.logger.Warn("Failed to upsert payment record",

View File

@@ -9,12 +9,12 @@ import (
"github.com/tech/sendico/gateway/tgsettle/storage" "github.com/tech/sendico/gateway/tgsettle/storage"
"github.com/tech/sendico/gateway/tgsettle/storage/model" "github.com/tech/sendico/gateway/tgsettle/storage/model"
"github.com/tech/sendico/pkg/db/repository" "github.com/tech/sendico/pkg/db/repository"
"github.com/tech/sendico/pkg/db/repository/builder"
ri "github.com/tech/sendico/pkg/db/repository/index" ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -27,7 +27,7 @@ const (
type PendingConfirmations struct { type PendingConfirmations struct {
logger mlogger.Logger logger mlogger.Logger
coll *mongo.Collection repo repository.Repository
} }
func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*PendingConfirmations, error) { func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*PendingConfirmations, error) {
@@ -62,7 +62,7 @@ func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*Pendin
p := &PendingConfirmations{ p := &PendingConfirmations{
logger: logger, logger: logger,
coll: db.Collection(pendingConfirmationsCollection), repo: repo,
} }
return p, nil return p, nil
} }
@@ -93,27 +93,28 @@ func (p *PendingConfirmations) Upsert(ctx context.Context, record *model.Pending
} }
record.UpdatedAt = now record.UpdatedAt = now
record.CreatedAt = createdAt record.CreatedAt = createdAt
record.ID = bson.NilObjectID filter := repository.Filter(fieldPendingRequestID, record.RequestID)
existing := &model.PendingConfirmation{}
// Explicit map avoids accidentally overriding immutable fields from stale callers. err := p.repo.FindOneByFilter(ctx, filter, existing)
update := bson.M{ switch {
"$set": bson.M{ case err == nil:
"messageId": record.MessageID, record.ID = existing.ID
"targetChatId": record.TargetChatID, record.CreatedAt = existing.CreatedAt
"acceptedUserIds": record.AcceptedUserIDs, err = p.repo.Update(ctx, record)
"requestedMoney": record.RequestedMoney, case errors.Is(err, merrors.ErrNoData):
"sourceService": record.SourceService, record.ID = bson.NilObjectID
"rail": record.Rail, err = p.repo.Insert(ctx, record, filter)
"clarified": record.Clarified, if errors.Is(err, merrors.ErrDataConflict) {
"expiresAt": record.ExpiresAt, if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
"updatedAt": record.UpdatedAt, err = findErr
}, break
"$setOnInsert": bson.M{ }
"createdAt": createdAt, record.ID = existing.ID
}, record.CreatedAt = existing.CreatedAt
err = p.repo.Update(ctx, record)
}
} }
_, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: record.RequestID}, update, options.UpdateOne().SetUpsert(true))
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID)) p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID))
} }
@@ -126,8 +127,8 @@ func (p *PendingConfirmations) FindByRequestID(ctx context.Context, requestID st
return nil, merrors.InvalidArgument("request_id is required", "request_id") return nil, merrors.InvalidArgument("request_id is required", "request_id")
} }
var result model.PendingConfirmation var result model.PendingConfirmation
err := p.coll.FindOne(ctx, bson.M{fieldPendingRequestID: requestID}).Decode(&result) err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingRequestID, requestID), &result)
if err == mongo.ErrNoDocuments { if errors.Is(err, merrors.ErrNoData) {
return nil, nil return nil, nil
} }
if err != nil { if err != nil {
@@ -142,8 +143,8 @@ func (p *PendingConfirmations) FindByMessageID(ctx context.Context, messageID st
return nil, merrors.InvalidArgument("message_id is required", "message_id") return nil, merrors.InvalidArgument("message_id is required", "message_id")
} }
var result model.PendingConfirmation var result model.PendingConfirmation
err := p.coll.FindOne(ctx, bson.M{fieldPendingMessageID: messageID}).Decode(&result) err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingMessageID, messageID), &result)
if err == mongo.ErrNoDocuments { if errors.Is(err, merrors.ErrNoData) {
return nil, nil return nil, nil
} }
if err != nil { if err != nil {
@@ -157,12 +158,10 @@ func (p *PendingConfirmations) MarkClarified(ctx context.Context, requestID stri
if requestID == "" { if requestID == "" {
return merrors.InvalidArgument("request_id is required", "request_id") return merrors.InvalidArgument("request_id is required", "request_id")
} }
_, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: requestID}, bson.M{ patch := repository.Patch().
"$set": bson.M{ Set(repository.Field("clarified"), true).
"clarified": true, Set(repository.Field("updatedAt"), time.Now())
"updatedAt": time.Now(), _, err := p.repo.PatchMany(ctx, repository.Filter(fieldPendingRequestID, requestID), patch)
},
})
return err return err
} }
@@ -176,24 +175,21 @@ func (p *PendingConfirmations) AttachMessage(ctx context.Context, requestID stri
return merrors.InvalidArgument("message_id is required", "message_id") return merrors.InvalidArgument("message_id is required", "message_id")
} }
filter := bson.M{ filter := repository.Filter(fieldPendingRequestID, requestID).And(
fieldPendingRequestID: requestID, repository.Query().Or(
"$or": []bson.M{ repository.Exists(repository.Field(fieldPendingMessageID), false),
{fieldPendingMessageID: bson.M{"$exists": false}}, repository.Filter(fieldPendingMessageID, ""),
{fieldPendingMessageID: ""}, repository.Filter(fieldPendingMessageID, messageID),
{fieldPendingMessageID: messageID}, ),
}, )
} patch := repository.Patch().
res, err := p.coll.UpdateOne(ctx, filter, bson.M{ Set(repository.Field(fieldPendingMessageID), messageID).
"$set": bson.M{ Set(repository.Field("updatedAt"), time.Now())
fieldPendingMessageID: messageID, updated, err := p.repo.PatchMany(ctx, filter, patch)
"updatedAt": time.Now(),
},
})
if err != nil { if err != nil {
return err return err
} }
if res.MatchedCount == 0 { if updated == 0 {
return merrors.NoData("pending confirmation not found") return merrors.NoData("pending confirmation not found")
} }
return nil return nil
@@ -204,34 +200,28 @@ func (p *PendingConfirmations) DeleteByRequestID(ctx context.Context, requestID
if requestID == "" { if requestID == "" {
return merrors.InvalidArgument("request_id is required", "request_id") return merrors.InvalidArgument("request_id is required", "request_id")
} }
_, err := p.coll.DeleteOne(ctx, bson.M{fieldPendingRequestID: requestID}) return p.repo.DeleteMany(ctx, repository.Filter(fieldPendingRequestID, requestID))
return err
} }
func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) { func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) {
if limit <= 0 { if limit <= 0 {
limit = 100 limit = 100
} }
filter := bson.M{ query := repository.Query().
fieldPendingExpiresAt: bson.M{"$lte": now}, Comparison(repository.Field(fieldPendingExpiresAt), builder.Lte, now).
} Sort(repository.Field(fieldPendingExpiresAt), true).
opts := options.Find().SetLimit(limit).SetSort(bson.D{{Key: fieldPendingExpiresAt, Value: 1}}) Limit(&limit)
cursor, err := p.coll.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
result := make([]*model.PendingConfirmation, 0) result := make([]*model.PendingConfirmation, 0)
for cursor.Next(ctx) { err := p.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error {
var next model.PendingConfirmation next := &model.PendingConfirmation{}
if err := cursor.Decode(&next); err != nil { if err := cur.Decode(next); err != nil {
return nil, err return err
} }
result = append(result, &next) result = append(result, next)
} return nil
if err := cursor.Err(); err != nil { })
if err != nil && !errors.Is(err, merrors.ErrNoData) {
return nil, err return nil, err
} }
return result, nil return result, nil

View File

@@ -14,7 +14,6 @@ import (
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -25,7 +24,7 @@ const (
type TelegramConfirmations struct { type TelegramConfirmations struct {
logger mlogger.Logger logger mlogger.Logger
coll *mongo.Collection repo repository.Repository
} }
func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*TelegramConfirmations, error) { func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*TelegramConfirmations, error) {
@@ -48,7 +47,7 @@ func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*Teleg
t := &TelegramConfirmations{ t := &TelegramConfirmations{
logger: logger, logger: logger,
coll: db.Collection(telegramCollection), repo: repo,
} }
t.logger.Debug("Telegram confirmations store initialised") t.logger.Debug("Telegram confirmations store initialised")
return t, nil return t, nil
@@ -67,10 +66,26 @@ func (t *TelegramConfirmations) Upsert(ctx context.Context, record *model.Telegr
if record.ReceivedAt.IsZero() { if record.ReceivedAt.IsZero() {
record.ReceivedAt = time.Now() record.ReceivedAt = time.Now()
} }
update := bson.M{ filter := repository.Filter(fieldRequestID, record.RequestID)
"$set": record, existing := &model.TelegramConfirmation{}
err := t.repo.FindOneByFilter(ctx, filter, existing)
switch {
case err == nil:
record.ID = existing.ID
err = t.repo.Update(ctx, record)
case errors.Is(err, merrors.ErrNoData):
record.ID = bson.NilObjectID
err = t.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
if findErr := t.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
err = findErr
break
}
record.ID = existing.ID
err = t.repo.Update(ctx, record)
}
} }
_, err := t.coll.UpdateOne(ctx, bson.M{fieldRequestID: record.RequestID}, update, options.UpdateOne().SetUpsert(true))
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
fields := []zap.Field{zap.String("request_id", record.RequestID)} fields := []zap.Field{zap.String("request_id", record.RequestID)}
if record.PaymentIntentID != "" { if record.PaymentIntentID != "" {