migrated raw mongo.Collection to repository.Repository + chain driver resolution fix #536
@@ -53,8 +53,7 @@ func (r *Registry) Driver(network pmodel.ChainNetwork) (driver.Driver, error) {
|
|||||||
|
|
||||||
func resolveDriver(logger mlogger.Logger, network pmodel.ChainNetwork) (driver.Driver, error) {
|
func resolveDriver(logger mlogger.Logger, network pmodel.ChainNetwork) (driver.Driver, error) {
|
||||||
switch network {
|
switch network {
|
||||||
case pmodel.ChainNetworkArbitrumOne:
|
case pmodel.ChainNetworkArbitrumOne, pmodel.ChainNetworkArbitrumSepolia:
|
||||||
case pmodel.ChainNetworkArbitrumSepolia:
|
|
||||||
return arbitrum.New(logger), nil
|
return arbitrum.New(logger), nil
|
||||||
case pmodel.ChainNetworkEthereumMainnet:
|
case pmodel.ChainNetworkEthereumMainnet:
|
||||||
return ethereum.New(logger), nil
|
return ethereum.New(logger), nil
|
||||||
|
|||||||
@@ -3,9 +3,9 @@ package model
|
|||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/pkg/db/storable"
|
||||||
"github.com/tech/sendico/pkg/model"
|
"github.com/tech/sendico/pkg/model"
|
||||||
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
paymenttypes "github.com/tech/sendico/pkg/payments/types"
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type PaymentStatus string
|
type PaymentStatus string
|
||||||
@@ -20,7 +20,7 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type PaymentRecord struct {
|
type PaymentRecord struct {
|
||||||
ID bson.ObjectID `bson:"_id,omitempty" json:"id"`
|
storable.Base `bson:",inline" json:",inline"`
|
||||||
OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"`
|
OperationRef string `bson:"operationRef,omitempty" json:"operation_ref,omitempty"`
|
||||||
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
|
IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"`
|
||||||
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
|
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
|
||||||
@@ -33,15 +33,13 @@ type PaymentRecord struct {
|
|||||||
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
|
ExecutedMoney *paymenttypes.Money `bson:"executedMoney,omitempty" json:"executed_money,omitempty"`
|
||||||
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
|
Status PaymentStatus `bson:"status,omitempty" json:"status,omitempty"`
|
||||||
FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"`
|
FailureReason string `bson:"failureReason,omitempty" json:"Failure_reason,omitempty"`
|
||||||
CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"`
|
|
||||||
UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"`
|
|
||||||
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
|
ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"`
|
||||||
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
|
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
|
||||||
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
|
ExpiredAt time.Time `bson:"expiredAt,omitempty" json:"expired_at,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type TelegramConfirmation struct {
|
type TelegramConfirmation struct {
|
||||||
ID bson.ObjectID `bson:"_id,omitempty" json:"id"`
|
storable.Base `bson:",inline" json:",inline"`
|
||||||
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
|
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
|
||||||
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
|
PaymentIntentID string `bson:"paymentIntentId,omitempty" json:"payment_intent_id,omitempty"`
|
||||||
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
|
QuoteRef string `bson:"quoteRef,omitempty" json:"quote_ref,omitempty"`
|
||||||
@@ -50,7 +48,7 @@ type TelegramConfirmation struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type PendingConfirmation struct {
|
type PendingConfirmation struct {
|
||||||
ID bson.ObjectID `bson:"_id,omitempty" json:"id"`
|
storable.Base `bson:",inline" json:",inline"`
|
||||||
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
|
RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"`
|
||||||
MessageID string `bson:"messageId,omitempty" json:"message_id,omitempty"`
|
MessageID string `bson:"messageId,omitempty" json:"message_id,omitempty"`
|
||||||
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
|
TargetChatID string `bson:"targetChatId,omitempty" json:"target_chat_id,omitempty"`
|
||||||
@@ -60,6 +58,4 @@ type PendingConfirmation struct {
|
|||||||
Rail string `bson:"rail,omitempty" json:"rail,omitempty"`
|
Rail string `bson:"rail,omitempty" json:"rail,omitempty"`
|
||||||
Clarified bool `bson:"clarified,omitempty" json:"clarified,omitempty"`
|
Clarified bool `bson:"clarified,omitempty" json:"clarified,omitempty"`
|
||||||
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
|
ExpiresAt time.Time `bson:"expiresAt,omitempty" json:"expires_at,omitempty"`
|
||||||
CreatedAt time.Time `bson:"createdAt,omitempty" json:"created_at,omitempty"`
|
|
||||||
UpdatedAt time.Time `bson:"updatedAt,omitempty" json:"updated_at,omitempty"`
|
|
||||||
}
|
}
|
||||||
|
|||||||
19
api/gateway/tgsettle/storage/model/storable.go
Normal file
19
api/gateway/tgsettle/storage/model/storable.go
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
const (
|
||||||
|
paymentsCollection = "payments"
|
||||||
|
telegramConfirmationsCollection = "telegram_confirmations"
|
||||||
|
pendingConfirmationsCollection = "pending_confirmations"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (*PaymentRecord) Collection() string {
|
||||||
|
return paymentsCollection
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*TelegramConfirmation) Collection() string {
|
||||||
|
return telegramConfirmationsCollection
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*PendingConfirmation) Collection() string {
|
||||||
|
return pendingConfirmationsCollection
|
||||||
|
}
|
||||||
@@ -14,7 +14,6 @@ import (
|
|||||||
"github.com/tech/sendico/pkg/mlogger"
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -25,7 +24,7 @@ const (
|
|||||||
|
|
||||||
type Payments struct {
|
type Payments struct {
|
||||||
logger mlogger.Logger
|
logger mlogger.Logger
|
||||||
coll *mongo.Collection
|
repo repository.Repository
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
|
func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
|
||||||
@@ -48,7 +47,7 @@ func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
|
|||||||
|
|
||||||
p := &Payments{
|
p := &Payments{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
coll: db.Collection(paymentsCollection),
|
repo: repo,
|
||||||
}
|
}
|
||||||
p.logger.Debug("Payments store initialised")
|
p.logger.Debug("Payments store initialised")
|
||||||
return p, nil
|
return p, nil
|
||||||
@@ -60,8 +59,8 @@ func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model
|
|||||||
return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key")
|
return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key")
|
||||||
}
|
}
|
||||||
var result model.PaymentRecord
|
var result model.PaymentRecord
|
||||||
err := p.coll.FindOne(ctx, bson.M{fieldIdempotencyKey: key}).Decode(&result)
|
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldIdempotencyKey, key), &result)
|
||||||
if err == mongo.ErrNoDocuments {
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -98,10 +97,26 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
|
|||||||
}
|
}
|
||||||
record.UpdatedAt = now
|
record.UpdatedAt = now
|
||||||
record.ID = bson.NilObjectID
|
record.ID = bson.NilObjectID
|
||||||
update := bson.M{
|
|
||||||
"$set": record,
|
filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey)
|
||||||
|
existing := &model.PaymentRecord{}
|
||||||
|
err := p.repo.FindOneByFilter(ctx, filter, existing)
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
record.ID = existing.ID
|
||||||
|
err = p.repo.Update(ctx, record)
|
||||||
|
case errors.Is(err, merrors.ErrNoData):
|
||||||
|
record.ID = bson.NilObjectID
|
||||||
|
err = p.repo.Insert(ctx, record, filter)
|
||||||
|
if errors.Is(err, merrors.ErrDataConflict) {
|
||||||
|
if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
|
||||||
|
err = findErr
|
||||||
|
break
|
||||||
|
}
|
||||||
|
record.ID = existing.ID
|
||||||
|
err = p.repo.Update(ctx, record)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_, err := p.coll.UpdateOne(ctx, bson.M{fieldIdempotencyKey: record.IdempotencyKey}, update, options.UpdateOne().SetUpsert(true))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||||
p.logger.Warn("Failed to upsert payment record",
|
p.logger.Warn("Failed to upsert payment record",
|
||||||
|
|||||||
@@ -9,12 +9,12 @@ import (
|
|||||||
"github.com/tech/sendico/gateway/tgsettle/storage"
|
"github.com/tech/sendico/gateway/tgsettle/storage"
|
||||||
"github.com/tech/sendico/gateway/tgsettle/storage/model"
|
"github.com/tech/sendico/gateway/tgsettle/storage/model"
|
||||||
"github.com/tech/sendico/pkg/db/repository"
|
"github.com/tech/sendico/pkg/db/repository"
|
||||||
|
"github.com/tech/sendico/pkg/db/repository/builder"
|
||||||
ri "github.com/tech/sendico/pkg/db/repository/index"
|
ri "github.com/tech/sendico/pkg/db/repository/index"
|
||||||
"github.com/tech/sendico/pkg/merrors"
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
"github.com/tech/sendico/pkg/mlogger"
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -27,7 +27,7 @@ const (
|
|||||||
|
|
||||||
type PendingConfirmations struct {
|
type PendingConfirmations struct {
|
||||||
logger mlogger.Logger
|
logger mlogger.Logger
|
||||||
coll *mongo.Collection
|
repo repository.Repository
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*PendingConfirmations, error) {
|
func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*PendingConfirmations, error) {
|
||||||
@@ -62,7 +62,7 @@ func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*Pendin
|
|||||||
|
|
||||||
p := &PendingConfirmations{
|
p := &PendingConfirmations{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
coll: db.Collection(pendingConfirmationsCollection),
|
repo: repo,
|
||||||
}
|
}
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
@@ -93,27 +93,28 @@ func (p *PendingConfirmations) Upsert(ctx context.Context, record *model.Pending
|
|||||||
}
|
}
|
||||||
record.UpdatedAt = now
|
record.UpdatedAt = now
|
||||||
record.CreatedAt = createdAt
|
record.CreatedAt = createdAt
|
||||||
record.ID = bson.NilObjectID
|
filter := repository.Filter(fieldPendingRequestID, record.RequestID)
|
||||||
|
existing := &model.PendingConfirmation{}
|
||||||
|
|
||||||
// Explicit map avoids accidentally overriding immutable fields from stale callers.
|
err := p.repo.FindOneByFilter(ctx, filter, existing)
|
||||||
update := bson.M{
|
switch {
|
||||||
"$set": bson.M{
|
case err == nil:
|
||||||
"messageId": record.MessageID,
|
record.ID = existing.ID
|
||||||
"targetChatId": record.TargetChatID,
|
record.CreatedAt = existing.CreatedAt
|
||||||
"acceptedUserIds": record.AcceptedUserIDs,
|
err = p.repo.Update(ctx, record)
|
||||||
"requestedMoney": record.RequestedMoney,
|
case errors.Is(err, merrors.ErrNoData):
|
||||||
"sourceService": record.SourceService,
|
record.ID = bson.NilObjectID
|
||||||
"rail": record.Rail,
|
err = p.repo.Insert(ctx, record, filter)
|
||||||
"clarified": record.Clarified,
|
if errors.Is(err, merrors.ErrDataConflict) {
|
||||||
"expiresAt": record.ExpiresAt,
|
if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
|
||||||
"updatedAt": record.UpdatedAt,
|
err = findErr
|
||||||
},
|
break
|
||||||
"$setOnInsert": bson.M{
|
}
|
||||||
"createdAt": createdAt,
|
record.ID = existing.ID
|
||||||
},
|
record.CreatedAt = existing.CreatedAt
|
||||||
|
err = p.repo.Update(ctx, record)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: record.RequestID}, update, options.UpdateOne().SetUpsert(true))
|
|
||||||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||||
p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID))
|
p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID))
|
||||||
}
|
}
|
||||||
@@ -126,8 +127,8 @@ func (p *PendingConfirmations) FindByRequestID(ctx context.Context, requestID st
|
|||||||
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
||||||
}
|
}
|
||||||
var result model.PendingConfirmation
|
var result model.PendingConfirmation
|
||||||
err := p.coll.FindOne(ctx, bson.M{fieldPendingRequestID: requestID}).Decode(&result)
|
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingRequestID, requestID), &result)
|
||||||
if err == mongo.ErrNoDocuments {
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -142,8 +143,8 @@ func (p *PendingConfirmations) FindByMessageID(ctx context.Context, messageID st
|
|||||||
return nil, merrors.InvalidArgument("message_id is required", "message_id")
|
return nil, merrors.InvalidArgument("message_id is required", "message_id")
|
||||||
}
|
}
|
||||||
var result model.PendingConfirmation
|
var result model.PendingConfirmation
|
||||||
err := p.coll.FindOne(ctx, bson.M{fieldPendingMessageID: messageID}).Decode(&result)
|
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingMessageID, messageID), &result)
|
||||||
if err == mongo.ErrNoDocuments {
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -157,12 +158,10 @@ func (p *PendingConfirmations) MarkClarified(ctx context.Context, requestID stri
|
|||||||
if requestID == "" {
|
if requestID == "" {
|
||||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||||
}
|
}
|
||||||
_, err := p.coll.UpdateOne(ctx, bson.M{fieldPendingRequestID: requestID}, bson.M{
|
patch := repository.Patch().
|
||||||
"$set": bson.M{
|
Set(repository.Field("clarified"), true).
|
||||||
"clarified": true,
|
Set(repository.Field("updatedAt"), time.Now())
|
||||||
"updatedAt": time.Now(),
|
_, err := p.repo.PatchMany(ctx, repository.Filter(fieldPendingRequestID, requestID), patch)
|
||||||
},
|
|
||||||
})
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -176,24 +175,21 @@ func (p *PendingConfirmations) AttachMessage(ctx context.Context, requestID stri
|
|||||||
return merrors.InvalidArgument("message_id is required", "message_id")
|
return merrors.InvalidArgument("message_id is required", "message_id")
|
||||||
}
|
}
|
||||||
|
|
||||||
filter := bson.M{
|
filter := repository.Filter(fieldPendingRequestID, requestID).And(
|
||||||
fieldPendingRequestID: requestID,
|
repository.Query().Or(
|
||||||
"$or": []bson.M{
|
repository.Exists(repository.Field(fieldPendingMessageID), false),
|
||||||
{fieldPendingMessageID: bson.M{"$exists": false}},
|
repository.Filter(fieldPendingMessageID, ""),
|
||||||
{fieldPendingMessageID: ""},
|
repository.Filter(fieldPendingMessageID, messageID),
|
||||||
{fieldPendingMessageID: messageID},
|
),
|
||||||
},
|
)
|
||||||
}
|
patch := repository.Patch().
|
||||||
res, err := p.coll.UpdateOne(ctx, filter, bson.M{
|
Set(repository.Field(fieldPendingMessageID), messageID).
|
||||||
"$set": bson.M{
|
Set(repository.Field("updatedAt"), time.Now())
|
||||||
fieldPendingMessageID: messageID,
|
updated, err := p.repo.PatchMany(ctx, filter, patch)
|
||||||
"updatedAt": time.Now(),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if res.MatchedCount == 0 {
|
if updated == 0 {
|
||||||
return merrors.NoData("pending confirmation not found")
|
return merrors.NoData("pending confirmation not found")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -204,34 +200,28 @@ func (p *PendingConfirmations) DeleteByRequestID(ctx context.Context, requestID
|
|||||||
if requestID == "" {
|
if requestID == "" {
|
||||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||||
}
|
}
|
||||||
_, err := p.coll.DeleteOne(ctx, bson.M{fieldPendingRequestID: requestID})
|
return p.repo.DeleteMany(ctx, repository.Filter(fieldPendingRequestID, requestID))
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) {
|
func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) {
|
||||||
if limit <= 0 {
|
if limit <= 0 {
|
||||||
limit = 100
|
limit = 100
|
||||||
}
|
}
|
||||||
filter := bson.M{
|
query := repository.Query().
|
||||||
fieldPendingExpiresAt: bson.M{"$lte": now},
|
Comparison(repository.Field(fieldPendingExpiresAt), builder.Lte, now).
|
||||||
}
|
Sort(repository.Field(fieldPendingExpiresAt), true).
|
||||||
opts := options.Find().SetLimit(limit).SetSort(bson.D{{Key: fieldPendingExpiresAt, Value: 1}})
|
Limit(&limit)
|
||||||
|
|
||||||
cursor, err := p.coll.Find(ctx, filter, opts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer cursor.Close(ctx)
|
|
||||||
|
|
||||||
result := make([]*model.PendingConfirmation, 0)
|
result := make([]*model.PendingConfirmation, 0)
|
||||||
for cursor.Next(ctx) {
|
err := p.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error {
|
||||||
var next model.PendingConfirmation
|
next := &model.PendingConfirmation{}
|
||||||
if err := cursor.Decode(&next); err != nil {
|
if err := cur.Decode(next); err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
result = append(result, &next)
|
result = append(result, next)
|
||||||
}
|
return nil
|
||||||
if err := cursor.Err(); err != nil {
|
})
|
||||||
|
if err != nil && !errors.Is(err, merrors.ErrNoData) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ import (
|
|||||||
"github.com/tech/sendico/pkg/mlogger"
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
"go.mongodb.org/mongo-driver/v2/bson"
|
"go.mongodb.org/mongo-driver/v2/bson"
|
||||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -25,7 +24,7 @@ const (
|
|||||||
|
|
||||||
type TelegramConfirmations struct {
|
type TelegramConfirmations struct {
|
||||||
logger mlogger.Logger
|
logger mlogger.Logger
|
||||||
coll *mongo.Collection
|
repo repository.Repository
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*TelegramConfirmations, error) {
|
func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*TelegramConfirmations, error) {
|
||||||
@@ -48,7 +47,7 @@ func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*Teleg
|
|||||||
|
|
||||||
t := &TelegramConfirmations{
|
t := &TelegramConfirmations{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
coll: db.Collection(telegramCollection),
|
repo: repo,
|
||||||
}
|
}
|
||||||
t.logger.Debug("Telegram confirmations store initialised")
|
t.logger.Debug("Telegram confirmations store initialised")
|
||||||
return t, nil
|
return t, nil
|
||||||
@@ -67,10 +66,26 @@ func (t *TelegramConfirmations) Upsert(ctx context.Context, record *model.Telegr
|
|||||||
if record.ReceivedAt.IsZero() {
|
if record.ReceivedAt.IsZero() {
|
||||||
record.ReceivedAt = time.Now()
|
record.ReceivedAt = time.Now()
|
||||||
}
|
}
|
||||||
update := bson.M{
|
filter := repository.Filter(fieldRequestID, record.RequestID)
|
||||||
"$set": record,
|
existing := &model.TelegramConfirmation{}
|
||||||
|
|
||||||
|
err := t.repo.FindOneByFilter(ctx, filter, existing)
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
record.ID = existing.ID
|
||||||
|
err = t.repo.Update(ctx, record)
|
||||||
|
case errors.Is(err, merrors.ErrNoData):
|
||||||
|
record.ID = bson.NilObjectID
|
||||||
|
err = t.repo.Insert(ctx, record, filter)
|
||||||
|
if errors.Is(err, merrors.ErrDataConflict) {
|
||||||
|
if findErr := t.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
|
||||||
|
err = findErr
|
||||||
|
break
|
||||||
|
}
|
||||||
|
record.ID = existing.ID
|
||||||
|
err = t.repo.Update(ctx, record)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_, err := t.coll.UpdateOne(ctx, bson.M{fieldRequestID: record.RequestID}, update, options.UpdateOne().SetUpsert(true))
|
|
||||||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||||
fields := []zap.Field{zap.String("request_id", record.RequestID)}
|
fields := []zap.Field{zap.String("request_id", record.RequestID)}
|
||||||
if record.PaymentIntentID != "" {
|
if record.PaymentIntentID != "" {
|
||||||
|
|||||||
Reference in New Issue
Block a user