151 lines
5.4 KiB
Go
151 lines
5.4 KiB
Go
package wallet
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
|
"github.com/tech/sendico/gateway/chain/storage/model"
|
|
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
|
|
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
const fallbackBalanceCacheTTL = 2 * time.Minute
|
|
|
|
type getWalletBalanceCommand struct {
|
|
deps Deps
|
|
}
|
|
|
|
func NewGetWalletBalance(deps Deps) *getWalletBalanceCommand {
|
|
return &getWalletBalanceCommand{deps: deps}
|
|
}
|
|
|
|
func (c *getWalletBalanceCommand) Execute(ctx context.Context, req *chainv1.GetWalletBalanceRequest) gsresponse.Responder[chainv1.GetWalletBalanceResponse] {
|
|
if err := c.deps.EnsureRepository(ctx); err != nil {
|
|
c.deps.Logger.Warn("repository unavailable", zap.Error(err))
|
|
return gsresponse.Unavailable[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, err)
|
|
}
|
|
if req == nil {
|
|
c.deps.Logger.Warn("nil request")
|
|
return gsresponse.InvalidArgument[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, merrors.InvalidArgument("nil request"))
|
|
}
|
|
walletRef := strings.TrimSpace(req.GetWalletRef())
|
|
if walletRef == "" {
|
|
c.deps.Logger.Warn("wallet_ref missing")
|
|
return gsresponse.InvalidArgument[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, merrors.InvalidArgument("wallet_ref is required"))
|
|
}
|
|
wallet, err := c.deps.Storage.Wallets().Get(ctx, walletRef)
|
|
if err != nil {
|
|
if errors.Is(err, merrors.ErrNoData) {
|
|
c.deps.Logger.Warn("not found", zap.String("wallet_ref", walletRef))
|
|
return gsresponse.NotFound[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, err)
|
|
}
|
|
c.deps.Logger.Warn("storage get failed", zap.Error(err), zap.String("wallet_ref", walletRef))
|
|
return gsresponse.Auto[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, err)
|
|
}
|
|
|
|
tokenBalance, nativeBalance, chainErr := OnChainWalletBalances(ctx, c.deps, wallet)
|
|
if chainErr != nil {
|
|
c.deps.Logger.Warn("on-chain balance fetch failed, attempting cached balance", zap.Error(chainErr), zap.String("wallet_ref", walletRef))
|
|
stored, err := c.deps.Storage.Wallets().GetBalance(ctx, walletRef)
|
|
if err != nil {
|
|
if errors.Is(err, merrors.ErrNoData) {
|
|
c.deps.Logger.Warn("cached balance not found", zap.String("wallet_ref", walletRef))
|
|
return gsresponse.Auto[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, chainErr)
|
|
}
|
|
return gsresponse.Auto[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, err)
|
|
}
|
|
if c.isCachedBalanceStale(stored) {
|
|
c.deps.Logger.Warn("cached balance is stale",
|
|
zap.String("wallet_ref", walletRef),
|
|
zap.Time("calculated_at", stored.CalculatedAt),
|
|
zap.Duration("ttl", c.cacheTTL()),
|
|
)
|
|
return gsresponse.Auto[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, chainErr)
|
|
}
|
|
return gsresponse.Success(&chainv1.GetWalletBalanceResponse{Balance: toProtoWalletBalance(stored)})
|
|
}
|
|
|
|
calculatedAt := c.now()
|
|
c.persistCachedBalance(ctx, walletRef, tokenBalance, nativeBalance, calculatedAt)
|
|
|
|
return gsresponse.Success(&chainv1.GetWalletBalanceResponse{
|
|
Balance: onChainBalanceToProto(tokenBalance, nativeBalance, calculatedAt),
|
|
})
|
|
}
|
|
|
|
func onChainBalanceToProto(balance *moneyv1.Money, native *moneyv1.Money, calculatedAt time.Time) *chainv1.WalletBalance {
|
|
if balance == nil && native == nil {
|
|
return nil
|
|
}
|
|
currency := ""
|
|
if balance != nil {
|
|
currency = balance.Currency
|
|
}
|
|
zero := zeroMoney(currency)
|
|
return &chainv1.WalletBalance{
|
|
Available: balance,
|
|
NativeAvailable: native,
|
|
PendingInbound: zero,
|
|
PendingOutbound: zero,
|
|
CalculatedAt: timestamppb.New(calculatedAt.UTC()),
|
|
}
|
|
}
|
|
|
|
func (c *getWalletBalanceCommand) persistCachedBalance(ctx context.Context, walletRef string, available *moneyv1.Money, nativeAvailable *moneyv1.Money, calculatedAt time.Time) {
|
|
if available == nil && nativeAvailable == nil {
|
|
return
|
|
}
|
|
record := &model.WalletBalance{
|
|
WalletRef: walletRef,
|
|
Available: shared.CloneMoney(available),
|
|
NativeAvailable: shared.CloneMoney(nativeAvailable),
|
|
CalculatedAt: calculatedAt,
|
|
}
|
|
currency := ""
|
|
if available != nil {
|
|
currency = available.Currency
|
|
}
|
|
record.PendingInbound = zeroMoney(currency)
|
|
record.PendingOutbound = zeroMoney(currency)
|
|
if err := c.deps.Storage.Wallets().SaveBalance(ctx, record); err != nil {
|
|
c.deps.Logger.Warn("failed to cache wallet balance", zap.String("wallet_ref", walletRef), zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func (c *getWalletBalanceCommand) isCachedBalanceStale(balance *model.WalletBalance) bool {
|
|
if balance == nil || balance.CalculatedAt.IsZero() {
|
|
return true
|
|
}
|
|
return c.now().After(balance.CalculatedAt.Add(c.cacheTTL()))
|
|
}
|
|
|
|
func (c *getWalletBalanceCommand) cacheTTL() time.Duration {
|
|
if c.deps.BalanceCacheTTL > 0 {
|
|
return c.deps.BalanceCacheTTL
|
|
}
|
|
// Fallback to sane default if not configured.
|
|
return fallbackBalanceCacheTTL
|
|
}
|
|
|
|
func (c *getWalletBalanceCommand) now() time.Time {
|
|
if c.deps.Clock != nil {
|
|
return c.deps.Clock.Now().UTC()
|
|
}
|
|
return time.Now().UTC()
|
|
}
|
|
|
|
func zeroMoney(currency string) *moneyv1.Money {
|
|
if strings.TrimSpace(currency) == "" {
|
|
return nil
|
|
}
|
|
return &moneyv1.Money{Currency: currency, Amount: "0"}
|
|
}
|