refactored initialization
This commit is contained in:
@@ -3,6 +3,7 @@ package transfer
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/rpcclient"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
||||
"github.com/tech/sendico/gateway/chain/storage"
|
||||
clockpkg "github.com/tech/sendico/pkg/clock"
|
||||
@@ -11,7 +12,7 @@ import (
|
||||
|
||||
type Deps struct {
|
||||
Logger mlogger.Logger
|
||||
Networks map[string]shared.Network
|
||||
Networks *rpcclient.Registry
|
||||
Storage storage.Repository
|
||||
Clock clockpkg.Clock
|
||||
EnsureRepository func(context.Context) error
|
||||
|
||||
@@ -63,7 +63,7 @@ func (c *estimateTransferFeeCommand) Execute(ctx context.Context, req *chainv1.E
|
||||
}
|
||||
|
||||
networkKey := strings.ToLower(strings.TrimSpace(sourceWallet.Network))
|
||||
networkCfg, ok := c.deps.Networks[networkKey]
|
||||
networkCfg, ok := c.deps.Networks.Network(networkKey)
|
||||
if !ok {
|
||||
c.deps.Logger.Warn("unsupported chain", zap.String("network", networkKey))
|
||||
return gsresponse.InvalidArgument[chainv1.EstimateTransferFeeResponse](c.deps.Logger, mservice.ChainGateway, merrors.InvalidArgument("unsupported chain for wallet"))
|
||||
|
||||
@@ -78,7 +78,7 @@ func (c *submitTransferCommand) Execute(ctx context.Context, req *chainv1.Submit
|
||||
return gsresponse.InvalidArgument[chainv1.SubmitTransferResponse](c.deps.Logger, mservice.ChainGateway, merrors.InvalidArgument("organization_ref mismatch with wallet"))
|
||||
}
|
||||
networkKey := strings.ToLower(strings.TrimSpace(sourceWallet.Network))
|
||||
networkCfg, ok := c.deps.Networks[networkKey]
|
||||
networkCfg, ok := c.deps.Networks.Network(networkKey)
|
||||
if !ok {
|
||||
c.deps.Logger.Warn("unsupported chain", zap.String("network", networkKey))
|
||||
return gsresponse.InvalidArgument[chainv1.SubmitTransferResponse](c.deps.Logger, mservice.ChainGateway, merrors.InvalidArgument("unsupported chain for wallet"))
|
||||
|
||||
@@ -60,7 +60,7 @@ func (c *createManagedWalletCommand) Execute(ctx context.Context, req *chainv1.C
|
||||
c.deps.Logger.Warn("unsupported chain", zap.Any("chain", asset.GetChain()))
|
||||
return gsresponse.InvalidArgument[chainv1.CreateManagedWalletResponse](c.deps.Logger, mservice.ChainGateway, merrors.InvalidArgument("unsupported chain"))
|
||||
}
|
||||
networkCfg, ok := c.deps.Networks[chainKey]
|
||||
networkCfg, ok := c.deps.Networks.Network(chainKey)
|
||||
if !ok {
|
||||
c.deps.Logger.Warn("unsupported chain in config", zap.String("chain", chainKey))
|
||||
return gsresponse.InvalidArgument[chainv1.CreateManagedWalletResponse](c.deps.Logger, mservice.ChainGateway, merrors.InvalidArgument("unsupported chain"))
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/chain/internal/keymanager"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/rpcclient"
|
||||
"github.com/tech/sendico/gateway/chain/storage"
|
||||
clockpkg "github.com/tech/sendico/pkg/clock"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
@@ -13,7 +13,7 @@ import (
|
||||
|
||||
type Deps struct {
|
||||
Logger mlogger.Logger
|
||||
Networks map[string]shared.Network
|
||||
Networks *rpcclient.Registry
|
||||
KeyManager keymanager.Manager
|
||||
Storage storage.Repository
|
||||
Clock clockpkg.Clock
|
||||
|
||||
@@ -2,6 +2,7 @@ package wallet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -19,8 +20,18 @@ import (
|
||||
|
||||
func onChainWalletBalance(ctx context.Context, deps Deps, wallet *model.ManagedWallet) (*moneyv1.Money, error) {
|
||||
logger := deps.Logger
|
||||
registry := deps.Networks
|
||||
|
||||
networkKey := strings.ToLower(strings.TrimSpace(wallet.Network))
|
||||
network := deps.Networks[networkKey]
|
||||
network, ok := registry.Network(networkKey)
|
||||
if !ok {
|
||||
logger.Warn("Requested network is not configured",
|
||||
zap.String("wallet_ref", wallet.WalletRef),
|
||||
zap.String("network", networkKey),
|
||||
)
|
||||
return nil, merrors.Internal(fmt.Sprintf("Requested network '%s' is not configured", networkKey))
|
||||
}
|
||||
|
||||
rpcURL := strings.TrimSpace(network.RPCURL)
|
||||
|
||||
logFields := []zap.Field{
|
||||
@@ -32,55 +43,54 @@ func onChainWalletBalance(ctx context.Context, deps Deps, wallet *model.ManagedW
|
||||
}
|
||||
|
||||
if rpcURL == "" {
|
||||
logger.Warn("network rpc url is not configured", logFields...)
|
||||
logger.Warn("Network rpc url is not configured", logFields...)
|
||||
return nil, merrors.Internal("network rpc url is not configured")
|
||||
}
|
||||
contract := strings.TrimSpace(wallet.ContractAddress)
|
||||
if contract == "" || !common.IsHexAddress(contract) {
|
||||
logger.Warn("invalid contract address for balance fetch", logFields...)
|
||||
logger.Warn("Invalid contract address for balance fetch", logFields...)
|
||||
return nil, merrors.InvalidArgument("invalid contract address")
|
||||
}
|
||||
if wallet.DepositAddress == "" || !common.IsHexAddress(wallet.DepositAddress) {
|
||||
logger.Warn("invalid wallet address for balance fetch", logFields...)
|
||||
logger.Warn("Invalid wallet address for balance fetch", logFields...)
|
||||
return nil, merrors.InvalidArgument("invalid wallet address")
|
||||
}
|
||||
|
||||
logger.Info("fetching on-chain wallet balance", logFields...)
|
||||
logger.Info("Fetching on-chain wallet balance", logFields...)
|
||||
|
||||
client, err := ethclient.DialContext(ctx, rpcURL)
|
||||
client, err := registry.Client(networkKey)
|
||||
if err != nil {
|
||||
logger.Warn("failed to connect rpc", append(logFields, zap.Error(err))...)
|
||||
return nil, merrors.Internal("failed to connect rpc: " + err.Error())
|
||||
logger.Warn("Failed to fetch rpc client", append(logFields, zap.Error(err))...)
|
||||
return nil, err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
tokenABI, err := abi.JSON(strings.NewReader(erc20ABIJSON))
|
||||
if err != nil {
|
||||
logger.Warn("failed to parse erc20 abi", append(logFields, zap.Error(err))...)
|
||||
logger.Warn("Failed to parse erc20 abi", append(logFields, zap.Error(err))...)
|
||||
return nil, merrors.Internal("failed to parse erc20 abi: " + err.Error())
|
||||
}
|
||||
tokenAddr := common.HexToAddress(contract)
|
||||
walletAddr := common.HexToAddress(wallet.DepositAddress)
|
||||
|
||||
logger.Debug("calling token decimals", logFields...)
|
||||
logger.Debug("Calling token decimals", logFields...)
|
||||
decimals, err := readDecimals(timeoutCtx, client, tokenABI, tokenAddr)
|
||||
if err != nil {
|
||||
logger.Warn("token decimals call failed", append(logFields, zap.Error(err))...)
|
||||
logger.Warn("Token decimals call failed", append(logFields, zap.Error(err))...)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger.Debug("calling token balanceOf", append(logFields, zap.Uint8("decimals", decimals))...)
|
||||
logger.Debug("Calling token balanceOf", append(logFields, zap.Uint8("decimals", decimals))...)
|
||||
bal, err := readBalanceOf(timeoutCtx, client, tokenABI, tokenAddr, walletAddr)
|
||||
if err != nil {
|
||||
logger.Warn("token balanceOf call failed", append(logFields, zap.Uint8("decimals", decimals), zap.Error(err))...)
|
||||
logger.Warn("Token balanceOf call failed", append(logFields, zap.Uint8("decimals", decimals), zap.Error(err))...)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dec := decimal.NewFromBigInt(bal, 0).Shift(-int32(decimals))
|
||||
logger.Info("on-chain wallet balance fetched",
|
||||
logger.Info("On-chain wallet balance fetched",
|
||||
append(logFields,
|
||||
zap.Uint8("decimals", decimals),
|
||||
zap.String("balance_raw", bal.String()),
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"math/big"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum"
|
||||
@@ -14,6 +13,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethclient"
|
||||
"github.com/shopspring/decimal"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/rpcclient"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
||||
"go.uber.org/zap"
|
||||
|
||||
@@ -30,11 +30,11 @@ type TransferExecutor interface {
|
||||
}
|
||||
|
||||
// NewOnChainExecutor constructs a TransferExecutor that talks to an EVM-compatible chain.
|
||||
func NewOnChainExecutor(logger mlogger.Logger, keyManager keymanager.Manager) TransferExecutor {
|
||||
func NewOnChainExecutor(logger mlogger.Logger, keyManager keymanager.Manager, clients *rpcclient.Clients) TransferExecutor {
|
||||
return &onChainExecutor{
|
||||
logger: logger.Named("executor"),
|
||||
keyManager: keyManager,
|
||||
clients: map[string]*ethclient.Client{},
|
||||
clients: clients,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,8 +42,7 @@ type onChainExecutor struct {
|
||||
logger mlogger.Logger
|
||||
keyManager keymanager.Manager
|
||||
|
||||
mu sync.Mutex
|
||||
clients map[string]*ethclient.Client
|
||||
clients *rpcclient.Clients
|
||||
}
|
||||
|
||||
func (o *onChainExecutor) SubmitTransfer(ctx context.Context, transfer *model.Transfer, source *model.ManagedWallet, destinationAddress string, network shared.Network) (string, error) {
|
||||
@@ -80,7 +79,7 @@ func (o *onChainExecutor) SubmitTransfer(ctx context.Context, transfer *model.Tr
|
||||
zap.String("destination", strings.ToLower(destinationAddress)),
|
||||
)
|
||||
|
||||
client, err := o.getClient(ctx, rpcURL)
|
||||
client, err := o.clients.Client(network.Name)
|
||||
if err != nil {
|
||||
o.logger.Warn("failed to initialise rpc client",
|
||||
zap.String("network", network.Name),
|
||||
@@ -214,30 +213,6 @@ func (o *onChainExecutor) SubmitTransfer(ctx context.Context, transfer *model.Tr
|
||||
return txHash, nil
|
||||
}
|
||||
|
||||
func (o *onChainExecutor) getClient(ctx context.Context, rpcURL string) (*ethclient.Client, error) {
|
||||
o.mu.Lock()
|
||||
client, ok := o.clients[rpcURL]
|
||||
o.mu.Unlock()
|
||||
if ok {
|
||||
return client, nil
|
||||
}
|
||||
|
||||
c, err := ethclient.DialContext(ctx, rpcURL)
|
||||
if err != nil {
|
||||
return nil, executorInternal("failed to connect to rpc "+rpcURL, err)
|
||||
}
|
||||
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
if existing, ok := o.clients[rpcURL]; ok {
|
||||
// Another routine initialised it in the meantime; prefer the existing client and close the new one.
|
||||
c.Close()
|
||||
return existing, nil
|
||||
}
|
||||
o.clients[rpcURL] = c
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (o *onChainExecutor) AwaitConfirmation(ctx context.Context, network shared.Network, txHash string) (*types.Receipt, error) {
|
||||
if strings.TrimSpace(txHash) == "" {
|
||||
o.logger.Warn("missing transaction hash for confirmation", zap.String("network", network.Name))
|
||||
@@ -249,7 +224,7 @@ func (o *onChainExecutor) AwaitConfirmation(ctx context.Context, network shared.
|
||||
return nil, executorInvalid("network rpc url is not configured")
|
||||
}
|
||||
|
||||
client, err := o.getClient(ctx, rpcURL)
|
||||
client, err := o.clients.Client(network.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/tech/sendico/gateway/chain/internal/keymanager"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/rpcclient"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
||||
clockpkg "github.com/tech/sendico/pkg/clock"
|
||||
)
|
||||
@@ -25,6 +26,13 @@ func WithTransferExecutor(executor TransferExecutor) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithRPCClients configures pre-initialised RPC clients.
|
||||
func WithRPCClients(clients *rpcclient.Clients) Option {
|
||||
return func(s *Service) {
|
||||
s.rpcClients = clients
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetworks configures supported blockchain networks.
|
||||
func WithNetworks(networks []shared.Network) Option {
|
||||
return func(s *Service) {
|
||||
|
||||
@@ -37,7 +37,7 @@ func Prepare(ctx context.Context, logger mlogger.Logger, networks []shared.Netwo
|
||||
name := strings.ToLower(strings.TrimSpace(network.Name))
|
||||
rpcURL := strings.TrimSpace(network.RPCURL)
|
||||
if name == "" {
|
||||
clientLogger.Warn("skipping network with empty name during rpc client preparation")
|
||||
clientLogger.Warn("Skipping network with empty name during rpc client preparation")
|
||||
continue
|
||||
}
|
||||
if rpcURL == "" {
|
||||
@@ -74,7 +74,10 @@ func Prepare(ctx context.Context, logger mlogger.Logger, networks []shared.Netwo
|
||||
}
|
||||
|
||||
if len(result.clients) == 0 {
|
||||
clientLogger.Warn("No rpc clients were initialised")
|
||||
return nil, merrors.InvalidArgument("no rpc clients initialised")
|
||||
} else {
|
||||
clientLogger.Info("RPC clients initialised", zap.Int("count", len(result.clients)))
|
||||
}
|
||||
|
||||
return result, nil
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
package rpcclient
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/ethclient"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
)
|
||||
|
||||
// Registry binds static network metadata with prepared RPC clients.
|
||||
type Registry struct {
|
||||
networks map[string]shared.Network
|
||||
clients *Clients
|
||||
}
|
||||
|
||||
// NewRegistry constructs a registry keyed by lower-cased network name.
|
||||
func NewRegistry(networks map[string]shared.Network, clients *Clients) *Registry {
|
||||
return &Registry{
|
||||
networks: networks,
|
||||
clients: clients,
|
||||
}
|
||||
}
|
||||
|
||||
// Network fetches network metadata by key (case-insensitive).
|
||||
func (r *Registry) Network(key string) (shared.Network, bool) {
|
||||
if r == nil || len(r.networks) == 0 {
|
||||
return shared.Network{}, false
|
||||
}
|
||||
n, ok := r.networks[strings.ToLower(strings.TrimSpace(key))]
|
||||
return n, ok
|
||||
}
|
||||
|
||||
// Client returns the prepared RPC client for the given network name.
|
||||
func (r *Registry) Client(key string) (*ethclient.Client, error) {
|
||||
if r == nil || r.clients == nil {
|
||||
return nil, merrors.Internal("rpc clients not initialised")
|
||||
}
|
||||
return r.clients.Client(strings.ToLower(strings.TrimSpace(key)))
|
||||
}
|
||||
|
||||
// Networks exposes the registry map for iteration when needed.
|
||||
func (r *Registry) Networks() map[string]shared.Network {
|
||||
return r.networks
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/commands"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/commands/transfer"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/commands/wallet"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/rpcclient"
|
||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
||||
"github.com/tech/sendico/gateway/chain/storage"
|
||||
"github.com/tech/sendico/pkg/api/routers"
|
||||
@@ -38,11 +39,13 @@ type Service struct {
|
||||
|
||||
settings CacheSettings
|
||||
|
||||
networks map[string]shared.Network
|
||||
serviceWallet shared.ServiceWallet
|
||||
keyManager keymanager.Manager
|
||||
executor TransferExecutor
|
||||
commands commands.Registry
|
||||
networks map[string]shared.Network
|
||||
serviceWallet shared.ServiceWallet
|
||||
keyManager keymanager.Manager
|
||||
executor TransferExecutor
|
||||
rpcClients *rpcclient.Clients
|
||||
networkRegistry *rpcclient.Registry
|
||||
commands commands.Registry
|
||||
|
||||
chainv1.UnimplementedChainGatewayServiceServer
|
||||
}
|
||||
@@ -73,6 +76,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
|
||||
svc.networks = map[string]shared.Network{}
|
||||
}
|
||||
svc.settings = svc.settings.withDefaults()
|
||||
svc.networkRegistry = rpcclient.NewRegistry(svc.networks, svc.rpcClients)
|
||||
|
||||
svc.commands = commands.NewRegistry(commands.RegistryDeps{
|
||||
Wallet: commandsWalletDeps(svc),
|
||||
@@ -131,7 +135,7 @@ func (s *Service) ensureRepository(ctx context.Context) error {
|
||||
func commandsWalletDeps(s *Service) wallet.Deps {
|
||||
return wallet.Deps{
|
||||
Logger: s.logger.Named("command"),
|
||||
Networks: s.networks,
|
||||
Networks: s.networkRegistry,
|
||||
KeyManager: s.keyManager,
|
||||
Storage: s.storage,
|
||||
Clock: s.clock,
|
||||
@@ -143,7 +147,7 @@ func commandsWalletDeps(s *Service) wallet.Deps {
|
||||
func commandsTransferDeps(s *Service) transfer.Deps {
|
||||
return transfer.Deps{
|
||||
Logger: s.logger.Named("transfer_cmd"),
|
||||
Networks: s.networks,
|
||||
Networks: s.networkRegistry,
|
||||
Storage: s.storage,
|
||||
Clock: s.clock,
|
||||
EnsureRepository: s.ensureRepository,
|
||||
|
||||
Reference in New Issue
Block a user