From afd8d8d01e76229be1ed61c998ebd71a04b531c8 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Thu, 19 Feb 2026 20:22:41 +0100 Subject: [PATCH] migrated raw mongo.Collection to repository.Repository + chain driver resolution fix --- .../service/gateway/drivers/registry.go | 3 +- .../tgsettle/storage/model/execution.go | 12 +- .../tgsettle/storage/model/storable.go | 19 +++ .../tgsettle/storage/mongo/store/payments.go | 31 +++-- .../mongo/store/pending_confirmations.go | 122 ++++++++---------- .../mongo/store/telegram_confirmations.go | 27 +++- 6 files changed, 124 insertions(+), 90 deletions(-) create mode 100644 api/gateway/tgsettle/storage/model/storable.go diff --git a/api/gateway/chain/internal/service/gateway/drivers/registry.go b/api/gateway/chain/internal/service/gateway/drivers/registry.go index 78c8b45a..7686d0bb 100644 --- a/api/gateway/chain/internal/service/gateway/drivers/registry.go +++ b/api/gateway/chain/internal/service/gateway/drivers/registry.go @@ -53,8 +53,7 @@ func (r *Registry) Driver(network pmodel.ChainNetwork) (driver.Driver, error) { func resolveDriver(logger mlogger.Logger, network pmodel.ChainNetwork) (driver.Driver, error) { switch network { - case pmodel.ChainNetworkArbitrumOne: - case pmodel.ChainNetworkArbitrumSepolia: + case pmodel.ChainNetworkArbitrumOne, pmodel.ChainNetworkArbitrumSepolia: return arbitrum.New(logger), nil case pmodel.ChainNetworkEthereumMainnet: return ethereum.New(logger), nil diff --git a/api/gateway/tgsettle/storage/model/execution.go b/api/gateway/tgsettle/storage/model/execution.go index a94e52f6..fea1ffc2 100644 --- a/api/gateway/tgsettle/storage/model/execution.go +++ b/api/gateway/tgsettle/storage/model/execution.go @@ -3,9 +3,9 @@ package model import ( "time" + "github.com/tech/sendico/pkg/db/storable" "github.com/tech/sendico/pkg/model" paymenttypes "github.com/tech/sendico/pkg/payments/types" - "go.mongodb.org/mongo-driver/v2/bson" ) type PaymentStatus string @@ -20,7 +20,7 @@ const ( ) type PaymentRecord struct { - ID bson.ObjectID `bson:"_id,omitempty" json:"id"` + storable.Base `bson:",inline" json:",inline"` OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"` IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` @@ -33,15 +33,13 @@ type PaymentRecord struct { ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"` Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"` FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"` - CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"` - UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"` ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"` ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"` } type TelegramConfirmation struct { - ID bson.ObjectID `bson:"_id,omitempty" json:"id"` + storable.Base `bson:",inline" json:",inline"` RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"` PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"` QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"` @@ -50,7 +48,7 @@ type TelegramConfirmation struct { } type PendingConfirmation struct { - ID bson.ObjectID `bson:"_id,omitempty" json:"id"` + storable.Base `bson:",inline" json:",inline"` RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"` MessageID string `bson:"messageId,omitempty" json:"message_id,omitempty"` TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"` @@ -60,6 +58,4 @@ type PendingConfirmation struct { Rail string `bson:"rail,omitempty" json:"rail,omitempty"` Clarified bool `bson:"clarified,omitempty" json:"clarified,omitempty"` ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"` - CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"` - UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"` } diff --git a/api/gateway/tgsettle/storage/model/storable.go b/api/gateway/tgsettle/storage/model/storable.go new file mode 100644 index 00000000..00f9d451 --- /dev/null +++ b/api/gateway/tgsettle/storage/model/storable.go @@ -0,0 +1,19 @@ +package model + +const ( + paymentsCollection = "payments" + telegramConfirmationsCollection = "telegram_confirmations" + pendingConfirmationsCollection = "pending_confirmations" +) + +func (*PaymentRecord) Collection() string { + return paymentsCollection +} + +func (*TelegramConfirmation) Collection() string { + return telegramConfirmationsCollection +} + +func (*PendingConfirmation) Collection() string { + return pendingConfirmationsCollection +} diff --git a/api/gateway/tgsettle/storage/mongo/store/payments.go b/api/gateway/tgsettle/storage/mongo/store/payments.go index ab978271..c3a89505 100644 --- a/api/gateway/tgsettle/storage/mongo/store/payments.go +++ b/api/gateway/tgsettle/storage/mongo/store/payments.go @@ -14,7 +14,6 @@ import ( "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" - "go.mongodb.org/mongo-driver/v2/mongo/options" "go.uber.org/zap" ) @@ -25,7 +24,7 @@ const ( type Payments struct { logger mlogger.Logger - coll *mongo.Collection + repo repository.Repository } func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) { @@ -48,7 +47,7 @@ func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) { p := &Payments{ logger: logger, - coll: db.Collection(paymentsCollection), + repo: repo, } p.logger.Debug("Payments store initialised") return p, nil @@ -60,8 +59,8 @@ func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key") } var result model.PaymentRecord - err := p.coll.FindOne(ctx, bson.M{fieldIdempotencyKey: key}).Decode(&result) - if err == mongo.ErrNoDocuments { + err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldIdempotencyKey, key), &result) + if errors.Is(err, merrors.ErrNoData) { return nil, nil } if err != nil { @@ -98,10 +97,26 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro } record.UpdatedAt = now record.ID = bson.NilObjectID - update := bson.M{ - "$set": record, + + filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey) + existing := &model.PaymentRecord{} + err := p.repo.FindOneByFilter(ctx, filter, existing) + switch { + case err == nil: + record.ID = existing.ID + err = p.repo.Update(ctx, record) + case errors.Is(err, merrors.ErrNoData): + record.ID = bson.NilObjectID + err = p.repo.Insert(ctx, record, filter) + if errors.Is(err, merrors.ErrDataConflict) { + if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil { + err = findErr + break + } + record.ID = existing.ID + err = p.repo.Update(ctx, record) + } } - _, err := p.coll.UpdateOne(ctx, bson.M{fieldIdempotencyKey: record.IdempotencyKey}, update, options.UpdateOne().SetUpsert(true)) if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Failed to upsert payment record", diff --git a/api/gateway/tgsettle/storage/mongo/store/pending_confirmations.go b/api/gateway/tgsettle/storage/mongo/store/pending_confirmations.go index 4103afdf..59918875 100644 --- a/api/gateway/tgsettle/storage/mongo/store/pending_confirmations.go +++ b/api/gateway/tgsettle/storage/mongo/store/pending_confirmations.go @@ -9,12 +9,12 @@ import ( "github.com/tech/sendico/gateway/tgsettle/storage" "github.com/tech/sendico/gateway/tgsettle/storage/model" "github.com/tech/sendico/pkg/db/repository" + "github.com/tech/sendico/pkg/db/repository/builder" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" - "go.mongodb.org/mongo-driver/v2/mongo/options" "go.uber.org/zap" ) @@ -27,7 +27,7 @@ const ( type PendingConfirmations struct { logger mlogger.Logger - coll *mongo.Collection + repo repository.Repository } func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*PendingConfirmations, error) { @@ -62,7 +62,7 @@ func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*Pendin p := &PendingConfirmations{ logger: logger, - coll: db.Collection(pendingConfirmationsCollection), + repo: repo, } return p, nil } @@ -93,27 +93,28 @@ func (p *PendingConfirmations) Upsert(ctx context.Context, record *model.Pending } record.UpdatedAt = now record.CreatedAt = createdAt - record.ID = bson.NilObjectID + filter := repository.Filter(fieldPendingRequestID, record.RequestID) + existing := &model.PendingConfirmation{} - // Explicit map avoids accidentally overriding immutable fields from stale callers. - update := bson.M{ - "$set": bson.M{ - "messageId": record.MessageID, - "targetChatId": record.TargetChatID, - "acceptedUserIds": record.AcceptedUserIDs, - "requestedMoney": record.RequestedMoney, - "sourceService": record.SourceService, - "rail": record.Rail, - "clarified": record.Clarified, - "expiresAt": record.ExpiresAt, - "updatedAt": record.UpdatedAt, - }, - "$setOnInsert": bson.M{ - "createdAt": createdAt, - }, + err := p.repo.FindOneByFilter(ctx, filter, existing) + switch { + case err == nil: + record.ID = existing.ID + record.CreatedAt = existing.CreatedAt + err = p.repo.Update(ctx, record) + case errors.Is(err, merrors.ErrNoData): + record.ID = bson.NilObjectID + err = p.repo.Insert(ctx, record, filter) + if errors.Is(err, merrors.ErrDataConflict) { + if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil { + err = findErr + break + } + record.ID = existing.ID + record.CreatedAt = existing.CreatedAt + err = p.repo.Update(ctx, record) + } } - - _, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: record.RequestID}, update, options.UpdateOne().SetUpsert(true)) if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID)) } @@ -126,8 +127,8 @@ func (p *PendingConfirmations) FindByRequestID(ctx context.Context, requestID st return nil, merrors.InvalidArgument("request_id is required", "request_id") } var result model.PendingConfirmation - err := p.coll.FindOne(ctx, bson.M{fieldPendingRequestID: requestID}).Decode(&result) - if err == mongo.ErrNoDocuments { + err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingRequestID, requestID), &result) + if errors.Is(err, merrors.ErrNoData) { return nil, nil } if err != nil { @@ -142,8 +143,8 @@ func (p *PendingConfirmations) FindByMessageID(ctx context.Context, messageID st return nil, merrors.InvalidArgument("message_id is required", "message_id") } var result model.PendingConfirmation - err := p.coll.FindOne(ctx, bson.M{fieldPendingMessageID: messageID}).Decode(&result) - if err == mongo.ErrNoDocuments { + err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingMessageID, messageID), &result) + if errors.Is(err, merrors.ErrNoData) { return nil, nil } if err != nil { @@ -157,12 +158,10 @@ func (p *PendingConfirmations) MarkClarified(ctx context.Context, requestID stri if requestID == "" { return merrors.InvalidArgument("request_id is required", "request_id") } - _, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: requestID}, bson.M{ - "$set": bson.M{ - "clarified": true, - "updatedAt": time.Now(), - }, - }) + patch := repository.Patch(). + Set(repository.Field("clarified"), true). + Set(repository.Field("updatedAt"), time.Now()) + _, err := p.repo.PatchMany(ctx, repository.Filter(fieldPendingRequestID, requestID), patch) return err } @@ -176,24 +175,21 @@ func (p *PendingConfirmations) AttachMessage(ctx context.Context, requestID stri return merrors.InvalidArgument("message_id is required", "message_id") } - filter := bson.M{ - fieldPendingRequestID: requestID, - "$or": []bson.M{ - {fieldPendingMessageID: bson.M{"$exists": false}}, - {fieldPendingMessageID: ""}, - {fieldPendingMessageID: messageID}, - }, - } - res, err := p.coll.UpdateOne(ctx, filter, bson.M{ - "$set": bson.M{ - fieldPendingMessageID: messageID, - "updatedAt": time.Now(), - }, - }) + filter := repository.Filter(fieldPendingRequestID, requestID).And( + repository.Query().Or( + repository.Exists(repository.Field(fieldPendingMessageID), false), + repository.Filter(fieldPendingMessageID, ""), + repository.Filter(fieldPendingMessageID, messageID), + ), + ) + patch := repository.Patch(). + Set(repository.Field(fieldPendingMessageID), messageID). + Set(repository.Field("updatedAt"), time.Now()) + updated, err := p.repo.PatchMany(ctx, filter, patch) if err != nil { return err } - if res.MatchedCount == 0 { + if updated == 0 { return merrors.NoData("pending confirmation not found") } return nil @@ -204,34 +200,28 @@ func (p *PendingConfirmations) DeleteByRequestID(ctx context.Context, requestID if requestID == "" { return merrors.InvalidArgument("request_id is required", "request_id") } - _, err := p.coll.DeleteOne(ctx, bson.M{fieldPendingRequestID: requestID}) - return err + return p.repo.DeleteMany(ctx, repository.Filter(fieldPendingRequestID, requestID)) } func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) { if limit <= 0 { limit = 100 } - filter := bson.M{ - fieldPendingExpiresAt: bson.M{"$lte": now}, - } - opts := options.Find().SetLimit(limit).SetSort(bson.D{{Key: fieldPendingExpiresAt, Value: 1}}) - - cursor, err := p.coll.Find(ctx, filter, opts) - if err != nil { - return nil, err - } - defer cursor.Close(ctx) + query := repository.Query(). + Comparison(repository.Field(fieldPendingExpiresAt), builder.Lte, now). + Sort(repository.Field(fieldPendingExpiresAt), true). + Limit(&limit) result := make([]*model.PendingConfirmation, 0) - for cursor.Next(ctx) { - var next model.PendingConfirmation - if err := cursor.Decode(&next); err != nil { - return nil, err + err := p.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error { + next := &model.PendingConfirmation{} + if err := cur.Decode(next); err != nil { + return err } - result = append(result, &next) - } - if err := cursor.Err(); err != nil { + result = append(result, next) + return nil + }) + if err != nil && !errors.Is(err, merrors.ErrNoData) { return nil, err } return result, nil diff --git a/api/gateway/tgsettle/storage/mongo/store/telegram_confirmations.go b/api/gateway/tgsettle/storage/mongo/store/telegram_confirmations.go index 8ada11f7..d3f0746b 100644 --- a/api/gateway/tgsettle/storage/mongo/store/telegram_confirmations.go +++ b/api/gateway/tgsettle/storage/mongo/store/telegram_confirmations.go @@ -14,7 +14,6 @@ import ( "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" - "go.mongodb.org/mongo-driver/v2/mongo/options" "go.uber.org/zap" ) @@ -25,7 +24,7 @@ const ( type TelegramConfirmations struct { logger mlogger.Logger - coll *mongo.Collection + repo repository.Repository } func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*TelegramConfirmations, error) { @@ -48,7 +47,7 @@ func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*Teleg t := &TelegramConfirmations{ logger: logger, - coll: db.Collection(telegramCollection), + repo: repo, } t.logger.Debug("Telegram confirmations store initialised") return t, nil @@ -67,10 +66,26 @@ func (t *TelegramConfirmations) Upsert(ctx context.Context, record *model.Telegr if record.ReceivedAt.IsZero() { record.ReceivedAt = time.Now() } - update := bson.M{ - "$set": record, + filter := repository.Filter(fieldRequestID, record.RequestID) + existing := &model.TelegramConfirmation{} + + err := t.repo.FindOneByFilter(ctx, filter, existing) + switch { + case err == nil: + record.ID = existing.ID + err = t.repo.Update(ctx, record) + case errors.Is(err, merrors.ErrNoData): + record.ID = bson.NilObjectID + err = t.repo.Insert(ctx, record, filter) + if errors.Is(err, merrors.ErrDataConflict) { + if findErr := t.repo.FindOneByFilter(ctx, filter, existing); findErr != nil { + err = findErr + break + } + record.ID = existing.ID + err = t.repo.Update(ctx, record) + } } - _, err := t.coll.UpdateOne(ctx, bson.M{fieldRequestID: record.RequestID}, update, options.UpdateOne().SetUpsert(true)) if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { fields := []zap.Field{zap.String("request_id", record.RequestID)} if record.PaymentIntentID != "" { -- 2.49.1