new tron gateway

This commit is contained in:
Stephan D
2026-01-30 15:51:28 +01:00
parent 51f5b0804a
commit 8788ff67ec
77 changed files with 11050 additions and 0 deletions

View File

@@ -0,0 +1,161 @@
package store
import (
"context"
"errors"
"strings"
"time"
"github.com/tech/sendico/gateway/tron/storage"
"github.com/tech/sendico/gateway/tron/storage/model"
"github.com/tech/sendico/pkg/db/repository"
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
const (
defaultDepositPageSize int64 = 100
maxDepositPageSize int64 = 500
)
type Deposits struct {
logger mlogger.Logger
repo repository.Repository
}
// NewDeposits constructs a Mongo-backed deposits store.
func NewDeposits(logger mlogger.Logger, db *mongo.Database) (*Deposits, error) {
if db == nil {
return nil, merrors.InvalidArgument("mongo database is nil")
}
repo := repository.CreateMongoRepository(db, mservice.ChainDeposits)
indexes := []*ri.Definition{
{
Keys: []ri.Key{{Field: "depositRef", Sort: ri.Asc}},
Unique: true,
},
{
Keys: []ri.Key{{Field: "walletRef", Sort: ri.Asc}, {Field: "status", Sort: ri.Asc}},
},
{
Keys: []ri.Key{{Field: "txHash", Sort: ri.Asc}},
Unique: true,
},
}
for _, def := range indexes {
if err := repo.CreateIndex(def); err != nil {
logger.Error("Failed to ensure deposit index", zap.Error(err), zap.String("collection", repo.Collection()))
return nil, err
}
}
childLogger := logger.Named("deposits")
childLogger.Debug("Deposits store initialised")
return &Deposits{logger: childLogger, repo: repo}, nil
}
func (d *Deposits) Record(ctx context.Context, deposit *model.Deposit) error {
if deposit == nil {
return merrors.InvalidArgument("depositsStore: nil deposit")
}
deposit.Normalize()
if strings.TrimSpace(deposit.DepositRef) == "" {
return merrors.InvalidArgument("depositsStore: empty depositRef")
}
if deposit.Status == "" {
deposit.Status = model.DepositStatusPending
}
if deposit.ObservedAt.IsZero() {
deposit.ObservedAt = time.Now().UTC()
}
if deposit.RecordedAt.IsZero() {
deposit.RecordedAt = time.Now().UTC()
}
if deposit.LastStatusAt.IsZero() {
deposit.LastStatusAt = time.Now().UTC()
}
existing := &model.Deposit{}
err := d.repo.FindOneByFilter(ctx, repository.Filter("depositRef", deposit.DepositRef), existing)
switch {
case err == nil:
existing.Status = deposit.Status
existing.ObservedAt = deposit.ObservedAt
existing.RecordedAt = deposit.RecordedAt
existing.LastStatusAt = time.Now().UTC()
if deposit.Amount != nil {
existing.Amount = deposit.Amount
}
if deposit.BlockID != "" {
existing.BlockID = deposit.BlockID
}
if deposit.TxHash != "" {
existing.TxHash = deposit.TxHash
}
if deposit.Network != "" {
existing.Network = deposit.Network
}
if deposit.TokenSymbol != "" {
existing.TokenSymbol = deposit.TokenSymbol
}
if deposit.ContractAddress != "" {
existing.ContractAddress = deposit.ContractAddress
}
if deposit.SourceAddress != "" {
existing.SourceAddress = deposit.SourceAddress
}
if err := d.repo.Update(ctx, existing); err != nil {
return err
}
return nil
case errors.Is(err, merrors.ErrNoData):
if err := d.repo.Insert(ctx, deposit, repository.Filter("depositRef", deposit.DepositRef)); err != nil {
return err
}
return nil
default:
return err
}
}
func (d *Deposits) ListPending(ctx context.Context, network string, limit int32) ([]*model.Deposit, error) {
query := repository.Query().Filter(repository.Field("status"), model.DepositStatusPending)
if net := strings.TrimSpace(network); net != "" {
query = query.Filter(repository.Field("network"), strings.ToLower(net))
}
pageSize := sanitizeDepositLimit(limit)
query = query.Sort(repository.Field("observedAt"), true).Limit(&pageSize)
deposits := make([]*model.Deposit, 0, pageSize)
decoder := func(cur *mongo.Cursor) error {
item := &model.Deposit{}
if err := cur.Decode(item); err != nil {
return err
}
deposits = append(deposits, item)
return nil
}
if err := d.repo.FindManyByFilter(ctx, query, decoder); err != nil && !errors.Is(err, merrors.ErrNoData) {
return nil, err
}
return deposits, nil
}
func sanitizeDepositLimit(requested int32) int64 {
if requested <= 0 {
return defaultDepositPageSize
}
if requested > int32(maxDepositPageSize) {
return maxDepositPageSize
}
return int64(requested)
}
var _ storage.DepositsStore = (*Deposits)(nil)

View File

@@ -0,0 +1,200 @@
package store
import (
"context"
"errors"
"strings"
"time"
"github.com/tech/sendico/gateway/tron/storage"
"github.com/tech/sendico/gateway/tron/storage/model"
"github.com/tech/sendico/pkg/db/repository"
"github.com/tech/sendico/pkg/db/repository/builder"
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
const (
defaultTransferPageSize int64 = 50
maxTransferPageSize int64 = 200
)
type Transfers struct {
logger mlogger.Logger
repo repository.Repository
}
// NewTransfers constructs a Mongo-backed transfers store.
func NewTransfers(logger mlogger.Logger, db *mongo.Database) (*Transfers, error) {
if db == nil {
return nil, merrors.InvalidArgument("mongo database is nil")
}
repo := repository.CreateMongoRepository(db, mservice.ChainTransfers)
indexes := []*ri.Definition{
{
Keys: []ri.Key{{Field: "transferRef", Sort: ri.Asc}},
Unique: true,
},
{
Keys: []ri.Key{{Field: "idempotencyKey", Sort: ri.Asc}},
Unique: true,
},
{
Keys: []ri.Key{{Field: "sourceWalletRef", Sort: ri.Asc}, {Field: "status", Sort: ri.Asc}},
},
{
Keys: []ri.Key{{Field: "destination.managedWalletRef", Sort: ri.Asc}},
},
}
for _, def := range indexes {
if err := repo.CreateIndex(def); err != nil {
logger.Error("Failed to ensure transfer index", zap.Error(err), zap.String("collection", repo.Collection()))
return nil, err
}
}
childLogger := logger.Named("transfers")
childLogger.Debug("Transfers store initialised")
return &Transfers{
logger: childLogger,
repo: repo,
}, nil
}
func (t *Transfers) Create(ctx context.Context, transfer *model.Transfer) (*model.Transfer, error) {
if transfer == nil {
return nil, merrors.InvalidArgument("transfersStore: nil transfer")
}
transfer.Normalize()
if strings.TrimSpace(transfer.TransferRef) == "" {
return nil, merrors.InvalidArgument("transfersStore: empty transferRef")
}
if strings.TrimSpace(transfer.IdempotencyKey) == "" {
return nil, merrors.InvalidArgument("transfersStore: empty idempotencyKey")
}
if transfer.Status == "" {
transfer.Status = model.TransferStatusPending
}
if transfer.LastStatusAt.IsZero() {
transfer.LastStatusAt = time.Now().UTC()
}
if strings.TrimSpace(transfer.IdempotencyKey) == "" {
return nil, merrors.InvalidArgument("transfersStore: empty idempotencyKey")
}
if err := t.repo.Insert(ctx, transfer, repository.Filter("idempotencyKey", transfer.IdempotencyKey)); err != nil {
if errors.Is(err, merrors.ErrDataConflict) {
t.logger.Debug("Transfer already exists", zap.String("transfer_ref", transfer.TransferRef), zap.String("idempotency_key", transfer.IdempotencyKey))
return transfer, nil
}
return nil, err
}
t.logger.Debug("Transfer created", zap.String("transfer_ref", transfer.TransferRef))
return transfer, nil
}
func (t *Transfers) Get(ctx context.Context, transferRef string) (*model.Transfer, error) {
transferRef = strings.TrimSpace(transferRef)
if transferRef == "" {
return nil, merrors.InvalidArgument("transfersStore: empty transferRef")
}
transfer := &model.Transfer{}
if err := t.repo.FindOneByFilter(ctx, repository.Filter("transferRef", transferRef), transfer); err != nil {
return nil, err
}
return transfer, nil
}
func (t *Transfers) List(ctx context.Context, filter model.TransferFilter) (*model.TransferList, error) {
query := repository.Query()
if src := strings.TrimSpace(filter.SourceWalletRef); src != "" {
query = query.Filter(repository.Field("sourceWalletRef"), src)
}
if dst := strings.TrimSpace(filter.DestinationWalletRef); dst != "" {
query = query.Filter(repository.Field("destination.managedWalletRef"), dst)
}
if status := strings.TrimSpace(string(filter.Status)); status != "" {
query = query.Filter(repository.Field("status"), status)
}
if cursor := strings.TrimSpace(filter.Cursor); cursor != "" {
if oid, err := primitive.ObjectIDFromHex(cursor); err == nil {
query = query.Comparison(repository.IDField(), builder.Gt, oid)
} else {
t.logger.Warn("Ignoring invalid transfer cursor", zap.String("cursor", cursor), zap.Error(err))
}
}
limit := sanitizeTransferLimit(filter.Limit)
fetchLimit := limit + 1
query = query.Sort(repository.IDField(), true).Limit(&fetchLimit)
transfers := make([]*model.Transfer, 0, fetchLimit)
decoder := func(cur *mongo.Cursor) error {
item := &model.Transfer{}
if err := cur.Decode(item); err != nil {
return err
}
transfers = append(transfers, item)
return nil
}
if err := t.repo.FindManyByFilter(ctx, query, decoder); err != nil && !errors.Is(err, merrors.ErrNoData) {
return nil, err
}
nextCursor := ""
if int64(len(transfers)) == fetchLimit {
last := transfers[len(transfers)-1]
nextCursor = last.ID.Hex()
transfers = transfers[:len(transfers)-1]
}
return &model.TransferList{
Items: transfers,
NextCursor: nextCursor,
}, nil
}
func (t *Transfers) UpdateStatus(ctx context.Context, transferRef string, status model.TransferStatus, failureReason string, txHash string) (*model.Transfer, error) {
transferRef = strings.TrimSpace(transferRef)
if transferRef == "" {
return nil, merrors.InvalidArgument("transfersStore: empty transferRef")
}
transfer := &model.Transfer{}
if err := t.repo.FindOneByFilter(ctx, repository.Filter("transferRef", transferRef), transfer); err != nil {
return nil, err
}
transfer.Status = status
if status == model.TransferStatusFailed {
transfer.FailureReason = strings.TrimSpace(failureReason)
} else {
transfer.FailureReason = ""
}
if hash := strings.TrimSpace(txHash); hash != "" {
transfer.TxHash = strings.ToLower(hash)
}
transfer.LastStatusAt = time.Now().UTC()
if err := t.repo.Update(ctx, transfer); err != nil {
return nil, err
}
return transfer, nil
}
func sanitizeTransferLimit(requested int32) int64 {
if requested <= 0 {
return defaultTransferPageSize
}
if requested > int32(maxTransferPageSize) {
return maxTransferPageSize
}
return int64(requested)
}
var _ storage.TransfersStore = (*Transfers)(nil)

View File

@@ -0,0 +1,288 @@
package store
import (
"context"
"errors"
"strings"
"time"
"github.com/tech/sendico/gateway/tron/storage"
"github.com/tech/sendico/gateway/tron/storage/model"
"github.com/tech/sendico/pkg/db/repository"
"github.com/tech/sendico/pkg/db/repository/builder"
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
mutil "github.com/tech/sendico/pkg/mutil/db"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
const (
defaultWalletPageSize int64 = 50
maxWalletPageSize int64 = 200
)
type Wallets struct {
logger mlogger.Logger
walletRepo repository.Repository
balanceRepo repository.Repository
}
// NewWallets constructs a Mongo-backed wallets store.
func NewWallets(logger mlogger.Logger, db *mongo.Database) (*Wallets, error) {
if db == nil {
return nil, merrors.InvalidArgument("mongo database is nil")
}
walletRepo := repository.CreateMongoRepository(db, mservice.ChainWallets)
walletIndexes := []*ri.Definition{
{
Keys: []ri.Key{{Field: "walletRef", Sort: ri.Asc}},
Unique: true,
},
{
Keys: []ri.Key{{Field: "idempotencyKey", Sort: ri.Asc}},
Unique: true,
},
{
Keys: []ri.Key{{Field: "depositAddress", Sort: ri.Asc}},
Unique: true,
},
{
Keys: []ri.Key{{Field: "organizationRef", Sort: ri.Asc}, {Field: "ownerRef", Sort: ri.Asc}},
},
}
for _, def := range walletIndexes {
if err := walletRepo.CreateIndex(def); err != nil {
logger.Error("Failed to ensure wallet index", zap.String("collection", walletRepo.Collection()), zap.Error(err))
return nil, err
}
}
balanceRepo := repository.CreateMongoRepository(db, mservice.ChainWalletBalances)
balanceIndexes := []*ri.Definition{
{
Keys: []ri.Key{{Field: "walletRef", Sort: ri.Asc}},
Unique: true,
},
}
for _, def := range balanceIndexes {
if err := balanceRepo.CreateIndex(def); err != nil {
logger.Error("Failed to ensure wallet balance index", zap.String("collection", balanceRepo.Collection()), zap.Error(err))
return nil, err
}
}
childLogger := logger.Named("wallets")
childLogger.Debug("Wallet stores initialised")
return &Wallets{
logger: childLogger,
walletRepo: walletRepo,
balanceRepo: balanceRepo,
}, nil
}
func (w *Wallets) Create(ctx context.Context, wallet *model.ManagedWallet) (*model.ManagedWallet, error) {
if wallet == nil {
return nil, merrors.InvalidArgument("walletsStore: nil wallet")
}
wallet.Normalize()
if strings.TrimSpace(wallet.WalletRef) == "" {
return nil, merrors.InvalidArgument("walletsStore: empty walletRef")
}
if wallet.Status == "" {
wallet.Status = model.ManagedWalletStatusActive
}
if strings.TrimSpace(wallet.IdempotencyKey) == "" {
return nil, merrors.InvalidArgument("walletsStore: empty idempotencyKey")
}
fields := []zap.Field{
zap.String("wallet_ref", wallet.WalletRef),
zap.String("idempotency_key", wallet.IdempotencyKey),
}
if wallet.OrganizationRef != "" {
fields = append(fields, zap.String("organization_ref", wallet.OrganizationRef))
}
if wallet.OwnerRef != "" {
fields = append(fields, zap.String("owner_ref", wallet.OwnerRef))
}
if wallet.Network != "" {
fields = append(fields, zap.String("network", wallet.Network))
}
if wallet.TokenSymbol != "" {
fields = append(fields, zap.String("token_symbol", wallet.TokenSymbol))
}
if err := w.walletRepo.Insert(ctx, wallet, repository.Filter("idempotencyKey", wallet.IdempotencyKey)); err != nil {
if errors.Is(err, merrors.ErrDataConflict) {
w.logger.Debug("Wallet already exists", fields...)
return wallet, nil
}
w.logger.Warn("Wallet create failed", append(fields, zap.Error(err))...)
return nil, err
}
w.logger.Debug("Wallet created", fields...)
return wallet, nil
}
func (w *Wallets) Get(ctx context.Context, walletID string) (*model.ManagedWallet, error) {
walletID = strings.TrimSpace(walletID)
if walletID == "" {
return nil, merrors.InvalidArgument("walletsStore: empty walletRef")
}
fields := []zap.Field{
zap.String("wallet_id", walletID),
}
wallet := &model.ManagedWallet{}
if err := w.walletRepo.FindOneByFilter(ctx, repository.Filter("walletRef", walletID), wallet); err != nil {
if errors.Is(err, merrors.ErrNoData) {
w.logger.Debug("Wallet not found", fields...)
} else {
w.logger.Warn("Wallet lookup failed", append(fields, zap.Error(err))...)
}
return nil, err
}
return wallet, nil
}
func (w *Wallets) List(ctx context.Context, filter model.ManagedWalletFilter) (*model.ManagedWalletList, error) {
query := repository.Query()
fields := make([]zap.Field, 0, 6)
if org := strings.TrimSpace(filter.OrganizationRef); org != "" {
query = query.Filter(repository.Field("organizationRef"), org)
fields = append(fields, zap.String("organization_ref", org))
}
if filter.OwnerRefFilter != nil {
ownerRef := strings.TrimSpace(*filter.OwnerRefFilter)
query = query.Filter(repository.Field("ownerRef"), ownerRef)
fields = append(fields, zap.String("owner_ref_filter", ownerRef))
}
if network := strings.TrimSpace(filter.Network); network != "" {
normalized := strings.ToLower(network)
query = query.Filter(repository.Field("network"), normalized)
fields = append(fields, zap.String("network", normalized))
}
if token := strings.TrimSpace(filter.TokenSymbol); token != "" {
normalized := strings.ToUpper(token)
query = query.Filter(repository.Field("tokenSymbol"), normalized)
fields = append(fields, zap.String("token_symbol", normalized))
}
if cursor := strings.TrimSpace(filter.Cursor); cursor != "" {
if oid, err := primitive.ObjectIDFromHex(cursor); err == nil {
query = query.Comparison(repository.IDField(), builder.Gt, oid)
fields = append(fields, zap.String("cursor", cursor))
} else {
w.logger.Warn("Ignoring invalid wallet cursor", zap.String("cursor", cursor), zap.Error(err))
}
}
limit := sanitizeWalletLimit(filter.Limit)
fields = append(fields, zap.Int64("limit", limit))
fetchLimit := limit + 1
query = query.Sort(repository.IDField(), true).Limit(&fetchLimit)
wallets, listErr := mutil.GetObjects[model.ManagedWallet](ctx, w.logger, query, nil, w.walletRepo)
if listErr != nil && !errors.Is(listErr, merrors.ErrNoData) {
w.logger.Warn("Wallet list failed", append(fields, zap.Error(listErr))...)
return nil, listErr
}
nextCursor := ""
if int64(len(wallets)) == fetchLimit {
last := wallets[len(wallets)-1]
nextCursor = last.ID.Hex()
wallets = wallets[:len(wallets)-1]
}
result := &model.ManagedWalletList{
Items: wallets,
NextCursor: nextCursor,
}
fields = append(fields,
zap.Int("count", len(result.Items)),
zap.String("next_cursor", result.NextCursor),
)
if errors.Is(listErr, merrors.ErrNoData) {
w.logger.Debug("Wallet list empty", fields...)
} else {
w.logger.Debug("Wallet list fetched", fields...)
}
return result, nil
}
func (w *Wallets) SaveBalance(ctx context.Context, balance *model.WalletBalance) error {
if balance == nil {
return merrors.InvalidArgument("walletsStore: nil balance")
}
balance.Normalize()
if strings.TrimSpace(balance.WalletRef) == "" {
return merrors.InvalidArgument("walletsStore: empty walletRef for balance")
}
if balance.CalculatedAt.IsZero() {
balance.CalculatedAt = time.Now().UTC()
}
fields := []zap.Field{zap.String("wallet_ref", balance.WalletRef)}
existing := &model.WalletBalance{}
err := w.balanceRepo.FindOneByFilter(ctx, repository.Filter("walletRef", balance.WalletRef), existing)
switch {
case err == nil:
existing.Available = balance.Available
existing.PendingInbound = balance.PendingInbound
existing.PendingOutbound = balance.PendingOutbound
existing.CalculatedAt = balance.CalculatedAt
if err := w.balanceRepo.Update(ctx, existing); err != nil {
w.logger.Warn("Wallet balance update failed", append(fields, zap.Error(err))...)
return err
}
w.logger.Debug("Wallet balance updated", fields...)
return nil
case errors.Is(err, merrors.ErrNoData):
if err := w.balanceRepo.Insert(ctx, balance, repository.Filter("walletRef", balance.WalletRef)); err != nil {
w.logger.Warn("Wallet balance create failed", append(fields, zap.Error(err))...)
return err
}
w.logger.Debug("Wallet balance created", fields...)
return nil
default:
w.logger.Warn("Wallet balance lookup failed", append(fields, zap.Error(err))...)
return err
}
}
func (w *Wallets) GetBalance(ctx context.Context, walletID string) (*model.WalletBalance, error) {
walletID = strings.TrimSpace(walletID)
if walletID == "" {
return nil, merrors.InvalidArgument("walletsStore: empty walletRef")
}
fields := []zap.Field{zap.String("wallet_ref", walletID)}
balance := &model.WalletBalance{}
if err := w.balanceRepo.FindOneByFilter(ctx, repository.Filter("walletRef", walletID), balance); err != nil {
if errors.Is(err, merrors.ErrNoData) {
w.logger.Debug("Wallet balance not found", fields...)
} else {
w.logger.Warn("Wallet balance lookup failed", append(fields, zap.Error(err))...)
}
return nil, err
}
w.logger.Debug("Wallet balance fetched", fields...)
return balance, nil
}
func sanitizeWalletLimit(requested int32) int64 {
if requested <= 0 {
return defaultWalletPageSize
}
if requested > int32(maxWalletPageSize) {
return maxWalletPageSize
}
return int64(requested)
}
var _ storage.WalletsStore = (*Wallets)(nil)