balance cache
This commit is contained in:
@@ -55,3 +55,6 @@ key_management:
|
||||
namespace: ""
|
||||
mount_path: kv
|
||||
key_prefix: gateway/chain/wallets
|
||||
|
||||
cache:
|
||||
wallet_balance_ttl_seconds: 120
|
||||
|
||||
@@ -34,9 +34,10 @@ type Imp struct {
|
||||
|
||||
type config struct {
|
||||
*grpcapp.Config `yaml:",inline"`
|
||||
Chains []chainConfig `yaml:"chains"`
|
||||
ServiceWallet serviceWalletConfig `yaml:"service_wallet"`
|
||||
KeyManagement keymanager.Config `yaml:"key_management"`
|
||||
Chains []chainConfig `yaml:"chains"`
|
||||
ServiceWallet serviceWalletConfig `yaml:"service_wallet"`
|
||||
KeyManagement keymanager.Config `yaml:"key_management"`
|
||||
Settings gatewayservice.CacheSettings `yaml:"cache"`
|
||||
}
|
||||
|
||||
type chainConfig struct {
|
||||
@@ -111,6 +112,7 @@ func (i *Imp) Start() error {
|
||||
gatewayservice.WithServiceWallet(walletConfig),
|
||||
gatewayservice.WithKeyManager(keyManager),
|
||||
gatewayservice.WithTransferExecutor(executor),
|
||||
gatewayservice.WithSettings(cfg.Settings),
|
||||
}
|
||||
return gatewayservice.NewService(logger, repo, producer, opts...), nil
|
||||
}
|
||||
|
||||
@@ -4,7 +4,10 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
||||
"github.com/tech/sendico/gateway/chain/storage/model"
|
||||
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mservice"
|
||||
@@ -14,6 +17,8 @@ import (
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
const fallbackBalanceCacheTTL = 2 * time.Minute
|
||||
|
||||
type getWalletBalanceCommand struct {
|
||||
deps Deps
|
||||
}
|
||||
@@ -48,30 +53,88 @@ func (c *getWalletBalanceCommand) Execute(ctx context.Context, req *chainv1.GetW
|
||||
|
||||
balance, chainErr := onChainWalletBalance(ctx, c.deps, wallet)
|
||||
if chainErr != nil {
|
||||
c.deps.Logger.Warn("on-chain balance fetch failed, falling back to stored balance", zap.Error(chainErr), zap.String("wallet_ref", walletRef))
|
||||
c.deps.Logger.Warn("on-chain balance fetch failed, attempting cached balance", zap.Error(chainErr), zap.String("wallet_ref", walletRef))
|
||||
stored, err := c.deps.Storage.Wallets().GetBalance(ctx, walletRef)
|
||||
if err != nil {
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
c.deps.Logger.Warn("stored balance not found", zap.String("wallet_ref", walletRef))
|
||||
return gsresponse.NotFound[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, err)
|
||||
c.deps.Logger.Warn("cached balance not found", zap.String("wallet_ref", walletRef))
|
||||
return gsresponse.Auto[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, chainErr)
|
||||
}
|
||||
return gsresponse.Auto[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, err)
|
||||
}
|
||||
if c.isCachedBalanceStale(stored) {
|
||||
c.deps.Logger.Warn("cached balance is stale",
|
||||
zap.String("wallet_ref", walletRef),
|
||||
zap.Time("calculated_at", stored.CalculatedAt),
|
||||
zap.Duration("ttl", c.cacheTTL()),
|
||||
)
|
||||
return gsresponse.Auto[chainv1.GetWalletBalanceResponse](c.deps.Logger, mservice.ChainGateway, chainErr)
|
||||
}
|
||||
return gsresponse.Success(&chainv1.GetWalletBalanceResponse{Balance: toProtoWalletBalance(stored)})
|
||||
}
|
||||
|
||||
return gsresponse.Success(&chainv1.GetWalletBalanceResponse{Balance: onChainBalanceToProto(balance)})
|
||||
calculatedAt := c.now()
|
||||
c.persistCachedBalance(ctx, walletRef, balance, calculatedAt)
|
||||
|
||||
return gsresponse.Success(&chainv1.GetWalletBalanceResponse{
|
||||
Balance: onChainBalanceToProto(balance, calculatedAt),
|
||||
})
|
||||
}
|
||||
|
||||
func onChainBalanceToProto(balance *moneyv1.Money) *chainv1.WalletBalance {
|
||||
func onChainBalanceToProto(balance *moneyv1.Money, calculatedAt time.Time) *chainv1.WalletBalance {
|
||||
if balance == nil {
|
||||
return nil
|
||||
}
|
||||
zero := &moneyv1.Money{Currency: balance.Currency, Amount: "0"}
|
||||
zero := zeroMoney(balance.Currency)
|
||||
return &chainv1.WalletBalance{
|
||||
Available: balance,
|
||||
PendingInbound: zero,
|
||||
PendingOutbound: zero,
|
||||
CalculatedAt: timestamppb.Now(),
|
||||
CalculatedAt: timestamppb.New(calculatedAt.UTC()),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *getWalletBalanceCommand) persistCachedBalance(ctx context.Context, walletRef string, available *moneyv1.Money, calculatedAt time.Time) {
|
||||
if available == nil {
|
||||
return
|
||||
}
|
||||
record := &model.WalletBalance{
|
||||
WalletRef: walletRef,
|
||||
Available: shared.CloneMoney(available),
|
||||
PendingInbound: zeroMoney(available.Currency),
|
||||
PendingOutbound: zeroMoney(available.Currency),
|
||||
CalculatedAt: calculatedAt,
|
||||
}
|
||||
if err := c.deps.Storage.Wallets().SaveBalance(ctx, record); err != nil {
|
||||
c.deps.Logger.Warn("failed to cache wallet balance", zap.String("wallet_ref", walletRef), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *getWalletBalanceCommand) isCachedBalanceStale(balance *model.WalletBalance) bool {
|
||||
if balance == nil || balance.CalculatedAt.IsZero() {
|
||||
return true
|
||||
}
|
||||
return c.now().After(balance.CalculatedAt.Add(c.cacheTTL()))
|
||||
}
|
||||
|
||||
func (c *getWalletBalanceCommand) cacheTTL() time.Duration {
|
||||
if c.deps.BalanceCacheTTL > 0 {
|
||||
return c.deps.BalanceCacheTTL
|
||||
}
|
||||
// Fallback to sane default if not configured.
|
||||
return fallbackBalanceCacheTTL
|
||||
}
|
||||
|
||||
func (c *getWalletBalanceCommand) now() time.Time {
|
||||
if c.deps.Clock != nil {
|
||||
return c.deps.Clock.Now().UTC()
|
||||
}
|
||||
return time.Now().UTC()
|
||||
}
|
||||
|
||||
func zeroMoney(currency string) *moneyv1.Money {
|
||||
if strings.TrimSpace(currency) == "" {
|
||||
return nil
|
||||
}
|
||||
return &moneyv1.Money{Currency: currency, Amount: "0"}
|
||||
}
|
||||
|
||||
@@ -2,10 +2,12 @@ package wallet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/chain/internal/keymanager"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
||||
"github.com/tech/sendico/gateway/chain/storage"
|
||||
clockpkg "github.com/tech/sendico/pkg/clock"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
)
|
||||
|
||||
@@ -14,6 +16,8 @@ type Deps struct {
|
||||
Networks map[string]shared.Network
|
||||
KeyManager keymanager.Manager
|
||||
Storage storage.Repository
|
||||
Clock clockpkg.Clock
|
||||
BalanceCacheTTL time.Duration
|
||||
EnsureRepository func(context.Context) error
|
||||
}
|
||||
|
||||
|
||||
@@ -67,3 +67,10 @@ func WithClock(clk clockpkg.Clock) Option {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithSettings applies gateway settings.
|
||||
func WithSettings(settings CacheSettings) Option {
|
||||
return func(s *Service) {
|
||||
s.settings = settings.withDefaults()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,8 @@ type Service struct {
|
||||
producer msg.Producer
|
||||
clock clockpkg.Clock
|
||||
|
||||
settings CacheSettings
|
||||
|
||||
networks map[string]shared.Network
|
||||
serviceWallet shared.ServiceWallet
|
||||
keyManager keymanager.Manager
|
||||
@@ -52,6 +54,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
|
||||
storage: repo,
|
||||
producer: producer,
|
||||
clock: clockpkg.System{},
|
||||
settings: defaultSettings(),
|
||||
networks: map[string]shared.Network{},
|
||||
}
|
||||
|
||||
@@ -69,6 +72,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
|
||||
if svc.networks == nil {
|
||||
svc.networks = map[string]shared.Network{}
|
||||
}
|
||||
svc.settings = svc.settings.withDefaults()
|
||||
|
||||
svc.commands = commands.NewRegistry(commands.RegistryDeps{
|
||||
Wallet: commandsWalletDeps(svc),
|
||||
@@ -130,6 +134,8 @@ func commandsWalletDeps(s *Service) wallet.Deps {
|
||||
Networks: s.networks,
|
||||
KeyManager: s.keyManager,
|
||||
Storage: s.storage,
|
||||
Clock: s.clock,
|
||||
BalanceCacheTTL: s.settings.walletBalanceCacheTTL(),
|
||||
EnsureRepository: s.ensureRepository,
|
||||
}
|
||||
}
|
||||
|
||||
30
api/gateway/chain/internal/service/gateway/settings.go
Normal file
30
api/gateway/chain/internal/service/gateway/settings.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package gateway
|
||||
|
||||
import "time"
|
||||
|
||||
const defaultWalletBalanceCacheTTL = 120 * time.Second
|
||||
|
||||
// CacheSettings holds tunable gateway behaviour.
|
||||
type CacheSettings struct {
|
||||
WalletBalanceCacheTTLSeconds int `yaml:"wallet_balance_ttl_seconds"`
|
||||
}
|
||||
|
||||
func defaultSettings() CacheSettings {
|
||||
return CacheSettings{
|
||||
WalletBalanceCacheTTLSeconds: int(defaultWalletBalanceCacheTTL.Seconds()),
|
||||
}
|
||||
}
|
||||
|
||||
func (s CacheSettings) withDefaults() CacheSettings {
|
||||
if s.WalletBalanceCacheTTLSeconds <= 0 {
|
||||
s.WalletBalanceCacheTTLSeconds = int(defaultWalletBalanceCacheTTL.Seconds())
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s CacheSettings) walletBalanceCacheTTL() time.Duration {
|
||||
if s.WalletBalanceCacheTTLSeconds <= 0 {
|
||||
return defaultWalletBalanceCacheTTL
|
||||
}
|
||||
return time.Duration(s.WalletBalanceCacheTTLSeconds) * time.Second
|
||||
}
|
||||
Reference in New Issue
Block a user