package store import ( "context" "errors" "strings" "time" "github.com/tech/sendico/gateway/chain/storage" "github.com/tech/sendico/gateway/chain/storage/model" "github.com/tech/sendico/pkg/db/repository" "github.com/tech/sendico/pkg/db/repository/builder" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mservice" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" ) const ( defaultTransferPageSize int64 = 50 maxTransferPageSize int64 = 200 ) type Transfers struct { logger mlogger.Logger repo repository.Repository } // NewTransfers constructs a Mongo-backed transfers store. func NewTransfers(logger mlogger.Logger, db *mongo.Database) (*Transfers, error) { if db == nil { return nil, merrors.InvalidArgument("mongo database is nil") } repo := repository.CreateMongoRepository(db, mservice.ChainTransfers) indexes := []*ri.Definition{ { Keys: []ri.Key{{Field: "transferRef", Sort: ri.Asc}}, Unique: true, }, { Keys: []ri.Key{{Field: "idempotencyKey", Sort: ri.Asc}}, Unique: true, }, { Keys: []ri.Key{{Field: "sourceWalletRef", Sort: ri.Asc}, {Field: "status", Sort: ri.Asc}}, }, { Keys: []ri.Key{{Field: "destination.managedWalletRef", Sort: ri.Asc}}, }, } for _, def := range indexes { if err := repo.CreateIndex(def); err != nil { logger.Error("Failed to ensure transfer index", zap.Error(err), zap.String("collection", repo.Collection())) return nil, err } } childLogger := logger.Named("transfers") childLogger.Debug("Transfers store initialised") return &Transfers{ logger: childLogger, repo: repo, }, nil } func (t *Transfers) Create(ctx context.Context, transfer *model.Transfer) (*model.Transfer, error) { if transfer == nil { return nil, merrors.InvalidArgument("transfersStore: nil transfer") } transfer.Normalize() if strings.TrimSpace(transfer.TransferRef) == "" { return nil, merrors.InvalidArgument("transfersStore: empty transferRef") } if strings.TrimSpace(transfer.IdempotencyKey) == "" { return nil, merrors.InvalidArgument("transfersStore: empty idempotencyKey") } if transfer.Status == "" { transfer.Status = model.TransferStatusPending } if transfer.LastStatusAt.IsZero() { transfer.LastStatusAt = time.Now().UTC() } if strings.TrimSpace(transfer.IdempotencyKey) == "" { return nil, merrors.InvalidArgument("transfersStore: empty idempotencyKey") } if err := t.repo.Insert(ctx, transfer, repository.Filter("idempotencyKey", transfer.IdempotencyKey)); err != nil { if errors.Is(err, merrors.ErrDataConflict) { t.logger.Debug("Transfer already exists", zap.String("transfer_ref", transfer.TransferRef), zap.String("idempotency_key", transfer.IdempotencyKey)) return transfer, nil } return nil, err } t.logger.Debug("Transfer created", zap.String("transfer_ref", transfer.TransferRef)) return transfer, nil } func (t *Transfers) Get(ctx context.Context, transferRef string) (*model.Transfer, error) { transferRef = strings.TrimSpace(transferRef) if transferRef == "" { return nil, merrors.InvalidArgument("transfersStore: empty transferRef") } transfer := &model.Transfer{} if err := t.repo.FindOneByFilter(ctx, repository.Filter("transferRef", transferRef), transfer); err != nil { return nil, err } return transfer, nil } func (t *Transfers) List(ctx context.Context, filter model.TransferFilter) (*model.TransferList, error) { query := repository.Query() if src := strings.TrimSpace(filter.SourceWalletRef); src != "" { query = query.Filter(repository.Field("sourceWalletRef"), src) } if dst := strings.TrimSpace(filter.DestinationWalletRef); dst != "" { query = query.Filter(repository.Field("destination.managedWalletRef"), dst) } if status := strings.TrimSpace(string(filter.Status)); status != "" { query = query.Filter(repository.Field("status"), status) } if cursor := strings.TrimSpace(filter.Cursor); cursor != "" { if oid, err := primitive.ObjectIDFromHex(cursor); err == nil { query = query.Comparison(repository.IDField(), builder.Gt, oid) } else { t.logger.Warn("Ignoring invalid transfer cursor", zap.String("cursor", cursor), zap.Error(err)) } } limit := sanitizeTransferLimit(filter.Limit) fetchLimit := limit + 1 query = query.Sort(repository.IDField(), true).Limit(&fetchLimit) transfers := make([]*model.Transfer, 0, fetchLimit) decoder := func(cur *mongo.Cursor) error { item := &model.Transfer{} if err := cur.Decode(item); err != nil { return err } transfers = append(transfers, item) return nil } if err := t.repo.FindManyByFilter(ctx, query, decoder); err != nil && !errors.Is(err, merrors.ErrNoData) { return nil, err } nextCursor := "" if int64(len(transfers)) == fetchLimit { last := transfers[len(transfers)-1] nextCursor = last.ID.Hex() transfers = transfers[:len(transfers)-1] } return &model.TransferList{ Items: transfers, NextCursor: nextCursor, }, nil } func (t *Transfers) UpdateStatus(ctx context.Context, transferRef string, status model.TransferStatus, failureReason string, txHash string) (*model.Transfer, error) { transferRef = strings.TrimSpace(transferRef) if transferRef == "" { return nil, merrors.InvalidArgument("transfersStore: empty transferRef") } transfer := &model.Transfer{} if err := t.repo.FindOneByFilter(ctx, repository.Filter("transferRef", transferRef), transfer); err != nil { return nil, err } transfer.Status = status if status == model.TransferStatusFailed { transfer.FailureReason = strings.TrimSpace(failureReason) } else { transfer.FailureReason = "" } if hash := strings.TrimSpace(txHash); hash != "" { transfer.TxHash = strings.ToLower(hash) } transfer.LastStatusAt = time.Now().UTC() if err := t.repo.Update(ctx, transfer); err != nil { return nil, err } return transfer, nil } func sanitizeTransferLimit(requested int32) int64 { if requested <= 0 { return defaultTransferPageSize } if requested > int32(maxTransferPageSize) { return maxTransferPageSize } return int64(requested) } var _ storage.TransfersStore = (*Transfers)(nil)