200 lines
5.3 KiB
Go
200 lines
5.3 KiB
Go
package rpcclient
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/ethclient"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Clients holds pre-initialised RPC clients keyed by network name.
|
|
type Clients struct {
|
|
logger mlogger.Logger
|
|
clients map[string]clientEntry
|
|
}
|
|
|
|
type clientEntry struct {
|
|
eth *ethclient.Client
|
|
rpc *rpc.Client
|
|
}
|
|
|
|
// Prepare dials all configured networks up front and returns a ready-to-use client set.
|
|
func Prepare(ctx context.Context, logger mlogger.Logger, networks []shared.Network) (*Clients, error) {
|
|
if logger == nil {
|
|
return nil, merrors.Internal("rpc clients: logger is required")
|
|
}
|
|
clientLogger := logger.Named("rpc_client")
|
|
result := &Clients{
|
|
logger: clientLogger,
|
|
clients: make(map[string]clientEntry),
|
|
}
|
|
|
|
for _, network := range networks {
|
|
name := strings.ToLower(strings.TrimSpace(network.Name))
|
|
rpcURL := strings.TrimSpace(network.RPCURL)
|
|
if name == "" {
|
|
clientLogger.Warn("Skipping network with empty name during rpc client preparation")
|
|
continue
|
|
}
|
|
if rpcURL == "" {
|
|
result.Close()
|
|
err := merrors.InvalidArgument(fmt.Sprintf("rpc url not configured for network %s", name))
|
|
clientLogger.Warn("rpc url missing", zap.String("network", name))
|
|
return nil, err
|
|
}
|
|
|
|
fields := []zap.Field{
|
|
zap.String("network", name),
|
|
}
|
|
clientLogger.Info("initialising rpc client", fields...)
|
|
|
|
dialCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
httpClient := &http.Client{
|
|
Transport: &loggingRoundTripper{
|
|
logger: clientLogger,
|
|
network: name,
|
|
endpoint: rpcURL,
|
|
base: http.DefaultTransport,
|
|
},
|
|
}
|
|
rpcCli, err := rpc.DialOptions(dialCtx, rpcURL, rpc.WithHTTPClient(httpClient))
|
|
cancel()
|
|
if err != nil {
|
|
result.Close()
|
|
clientLogger.Warn("failed to dial rpc endpoint", append(fields, zap.Error(err))...)
|
|
return nil, merrors.Internal(fmt.Sprintf("rpc dial failed for %s: %s", name, err.Error()))
|
|
}
|
|
client := ethclient.NewClient(rpcCli)
|
|
result.clients[name] = clientEntry{
|
|
eth: client,
|
|
rpc: rpcCli,
|
|
}
|
|
clientLogger.Info("rpc client ready", fields...)
|
|
}
|
|
|
|
if len(result.clients) == 0 {
|
|
clientLogger.Warn("No rpc clients were initialised")
|
|
return nil, merrors.InvalidArgument("no rpc clients initialised")
|
|
} else {
|
|
clientLogger.Info("RPC clients initialised", zap.Int("count", len(result.clients)))
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Client returns a prepared client for the given network name.
|
|
func (c *Clients) Client(network string) (*ethclient.Client, error) {
|
|
if c == nil {
|
|
return nil, merrors.Internal("rpc clients not initialised")
|
|
}
|
|
name := strings.ToLower(strings.TrimSpace(network))
|
|
entry, ok := c.clients[name]
|
|
if !ok || entry.eth == nil {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("rpc client not configured for network %s", name))
|
|
}
|
|
return entry.eth, nil
|
|
}
|
|
|
|
// RPCClient returns the raw RPC client for low-level calls.
|
|
func (c *Clients) RPCClient(network string) (*rpc.Client, error) {
|
|
if c == nil {
|
|
return nil, merrors.Internal("rpc clients not initialised")
|
|
}
|
|
name := strings.ToLower(strings.TrimSpace(network))
|
|
entry, ok := c.clients[name]
|
|
if !ok || entry.rpc == nil {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("rpc client not configured for network %s", name))
|
|
}
|
|
return entry.rpc, nil
|
|
}
|
|
|
|
// Close tears down all RPC clients, logging each close.
|
|
func (c *Clients) Close() {
|
|
if c == nil {
|
|
return
|
|
}
|
|
for name, entry := range c.clients {
|
|
if entry.rpc != nil {
|
|
entry.rpc.Close()
|
|
} else if entry.eth != nil {
|
|
entry.eth.Close()
|
|
}
|
|
if c.logger != nil {
|
|
c.logger.Info("rpc client closed", zap.String("network", name))
|
|
}
|
|
}
|
|
}
|
|
|
|
type loggingRoundTripper struct {
|
|
logger mlogger.Logger
|
|
network string
|
|
endpoint string
|
|
base http.RoundTripper
|
|
}
|
|
|
|
func (l *loggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
if l.base == nil {
|
|
l.base = http.DefaultTransport
|
|
}
|
|
|
|
var reqBody []byte
|
|
if req.Body != nil {
|
|
raw, _ := io.ReadAll(req.Body)
|
|
reqBody = raw
|
|
req.Body = io.NopCloser(strings.NewReader(string(raw)))
|
|
}
|
|
|
|
fields := []zap.Field{
|
|
zap.String("network", l.network),
|
|
zap.String("rpc_endpoint", l.endpoint),
|
|
}
|
|
if len(reqBody) > 0 {
|
|
fields = append(fields, zap.String("rpc_request", truncate(string(reqBody), 2048)))
|
|
}
|
|
l.logger.Debug("rpc request", fields...)
|
|
|
|
resp, err := l.base.RoundTrip(req)
|
|
if err != nil {
|
|
l.logger.Warn("rpc http request failed", append(fields, zap.Error(err))...)
|
|
return nil, err
|
|
}
|
|
|
|
bodyBytes, _ := io.ReadAll(resp.Body)
|
|
resp.Body.Close()
|
|
resp.Body = io.NopCloser(strings.NewReader(string(bodyBytes)))
|
|
|
|
respFields := append(fields,
|
|
zap.Int("status_code", resp.StatusCode),
|
|
)
|
|
if len(bodyBytes) > 0 {
|
|
respFields = append(respFields, zap.String("rpc_response", truncate(string(bodyBytes), 2048)))
|
|
}
|
|
if resp.StatusCode >= 400 {
|
|
l.logger.Warn("RPC response error", respFields...)
|
|
} else {
|
|
// Log response content so downstream parse failures can be inspected without debug logs.
|
|
l.logger.Warn("RPC response", respFields...)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func truncate(s string, max int) string {
|
|
if len(s) <= max {
|
|
return s
|
|
}
|
|
if max <= 3 {
|
|
return s[:max]
|
|
}
|
|
return s[:max-3] + "..."
|
|
}
|