cached gateway routing

This commit is contained in:
Stephan D
2026-02-20 15:38:22 +01:00
parent bc2bc3770d
commit 671ccc55a0
23 changed files with 777 additions and 23 deletions

View File

@@ -64,9 +64,83 @@ func (a *WalletAPI) getWalletBalance(r *http.Request, account *model.Account, to
a.logger.Debug("No CRYPTO rail gateways found in discovery")
return response.Auto(a.logger, a.Name(), merrors.NoData("no crypto gateways available"))
}
a.logger.Debug("Resolved CRYPTO gateways for wallet balance lookup",
zap.String("organization_ref", orgRef.Hex()),
zap.String("wallet_ref", walletRef),
zap.Int("gateway_count", len(cryptoGateways)))
// Query all gateways in parallel to find the wallet balance
bal, err := a.queryBalanceFromGateways(ctx, cryptoGateways, walletRef)
route, routeErr := a.walletRoute(ctx, orgRef.Hex(), walletRef)
if routeErr != nil {
a.logger.Warn("Failed to resolve wallet route", zap.Error(routeErr), zap.String("wallet_ref", walletRef), zap.String("organization_ref", orgRef.Hex()))
}
if route != nil {
a.logger.Debug("Resolved stored wallet route",
zap.String("organization_ref", orgRef.Hex()),
zap.String("wallet_ref", walletRef),
zap.String("route_network", route.Network),
zap.String("route_gateway_id", route.GatewayID))
preferred := findGatewayForRoute(cryptoGateways, route)
if preferred != nil {
a.logger.Debug("Using preferred gateway from stored wallet route",
zap.String("organization_ref", orgRef.Hex()),
zap.String("wallet_ref", walletRef),
zap.String("gateway_id", preferred.ID),
zap.String("network", preferred.Network),
zap.String("invoke_uri", preferred.InvokeURI))
bal, preferredErr := a.queryGatewayBalance(ctx, *preferred, walletRef)
if preferredErr == nil && bal != nil {
a.rememberWalletRoute(ctx, orgRef.Hex(), walletRef, preferred.Network, preferred.ID)
a.logger.Debug("Wallet balance resolved via preferred gateway",
zap.String("organization_ref", orgRef.Hex()),
zap.String("wallet_ref", walletRef),
zap.String("gateway_id", preferred.ID),
zap.String("network", preferred.Network))
return sresponse.WalletBalanceFromConnector(a.logger, bal, token)
}
if preferredErr != nil {
a.logger.Debug("Preferred gateway balance lookup failed, falling back to fan-out",
zap.String("wallet_ref", walletRef),
zap.String("network", route.Network),
zap.String("gateway_id", preferred.ID),
zap.String("invoke_uri", preferred.InvokeURI),
zap.Error(preferredErr))
} else {
a.logger.Debug("Preferred gateway returned empty balance, falling back to fan-out",
zap.String("wallet_ref", walletRef),
zap.String("network", route.Network),
zap.String("gateway_id", preferred.ID),
zap.String("invoke_uri", preferred.InvokeURI))
}
cryptoGateways = dropGatewayByInvokeURI(cryptoGateways, preferred.InvokeURI)
if len(cryptoGateways) == 0 {
if preferredErr != nil {
a.logger.Warn("Failed to fetch wallet balance from preferred gateway", zap.Error(preferredErr), zap.String("wallet_ref", walletRef))
return response.Auto(a.logger, a.Name(), preferredErr)
}
a.logger.Warn("Wallet balance not found on preferred gateway", zap.String("wallet_ref", walletRef))
return response.Auto(a.logger, a.Name(), merrors.NoData("wallet not found"))
}
} else {
a.logger.Warn("Stored wallet route did not match any healthy discovery gateway",
zap.String("organization_ref", orgRef.Hex()),
zap.String("wallet_ref", walletRef),
zap.String("route_network", route.Network),
zap.String("route_gateway_id", route.GatewayID))
}
} else {
a.logger.Debug("Stored wallet route not found; using gateway fallback",
zap.String("organization_ref", orgRef.Hex()),
zap.String("wallet_ref", walletRef))
}
// Fall back to querying remaining gateways in parallel.
a.logger.Debug("Starting fallback wallet balance fan-out",
zap.String("organization_ref", orgRef.Hex()),
zap.String("wallet_ref", walletRef),
zap.Int("gateway_count", len(cryptoGateways)))
bal, err := a.queryBalanceFromGateways(ctx, cryptoGateways, orgRef.Hex(), walletRef)
if err != nil {
a.logger.Warn("Failed to fetch wallet balance from gateways", zap.Error(err), zap.String("wallet_ref", walletRef))
return response.Auto(a.logger, a.Name(), err)
@@ -80,11 +154,18 @@ func (a *WalletAPI) getWalletBalance(r *http.Request, account *model.Account, to
return sresponse.WalletBalanceFromConnector(a.logger, bal, token)
}
func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []discovery.GatewaySummary, walletRef string) (*connectorv1.Balance, error) {
func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []discovery.GatewaySummary, organizationRef string, walletRef string) (*connectorv1.Balance, error) {
var mu sync.Mutex
var wg sync.WaitGroup
var result *connectorv1.Balance
var lastErr error
selectedGatewayID := ""
selectedNetwork := ""
a.logger.Debug("Querying wallet balance across gateways",
zap.String("organization_ref", organizationRef),
zap.String("wallet_ref", walletRef),
zap.Int("gateway_count", len(gateways)))
for _, gw := range gateways {
wg.Add(1)
@@ -108,6 +189,9 @@ func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []dis
mu.Lock()
if result == nil {
result = bal
a.rememberWalletRoute(ctx, organizationRef, walletRef, gateway.Network, gateway.ID)
selectedGatewayID = gateway.ID
selectedNetwork = gateway.Network
a.logger.Debug("Found wallet balance on gateway",
zap.String("gateway_id", gateway.ID),
zap.String("network", gateway.Network),
@@ -121,11 +205,23 @@ func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []dis
wg.Wait()
if result != nil {
a.logger.Debug("Wallet balance fan-out completed with result",
zap.String("organization_ref", organizationRef),
zap.String("wallet_ref", walletRef),
zap.String("gateway_id", selectedGatewayID),
zap.String("network", selectedNetwork))
return result, nil
}
if lastErr != nil {
a.logger.Debug("Wallet balance fan-out completed with errors",
zap.String("organization_ref", organizationRef),
zap.String("wallet_ref", walletRef),
zap.Error(lastErr))
return nil, lastErr
}
a.logger.Debug("Wallet balance fan-out completed without result",
zap.String("organization_ref", organizationRef),
zap.String("wallet_ref", walletRef))
return nil, nil
}

View File

@@ -80,6 +80,12 @@ func (a *WalletAPI) create(r *http.Request, account *model.Account, token *sresp
zap.String("chain", string(sr.Asset.Chain)))
return response.Auto(a.logger, a.Name(), merrors.InvalidArgument("no gateway available for network: "+networkName))
}
a.logger.Debug("Selected gateway for wallet creation",
zap.String("organization_ref", orgRef.Hex()),
zap.String("network", networkName),
zap.String("gateway_id", gateway.ID),
zap.String("gateway_network", gateway.Network),
zap.String("invoke_uri", gateway.InvokeURI))
var ownerRef string
if sr.OwnerRef != nil && !sr.OwnerRef.IsZero() {
@@ -125,6 +131,13 @@ func (a *WalletAPI) create(r *http.Request, account *model.Account, token *sresp
a.logger.Info("Managed wallet created for organization", mzap.ObjRef("organization_ref", orgRef),
zap.String("wallet_ref", walletRef), mzap.StorableRef(account),
zap.String("gateway_id", gateway.ID), zap.String("network", gateway.Network))
a.rememberWalletRoute(ctx, orgRef.Hex(), walletRef, networkName, gateway.ID)
a.rememberWalletRoute(ctx, orgRef.Hex(), walletRef, gateway.Network, gateway.ID)
a.logger.Debug("Persisted wallet route after wallet creation",
zap.String("organization_ref", orgRef.Hex()),
zap.String("wallet_ref", walletRef),
zap.String("network", networkName),
zap.String("gateway_id", gateway.ID))
return sresponse.Success(a.logger, token)
}

View File

@@ -58,6 +58,9 @@ func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *
a.logger.Debug("No CRYPTO rail gateways found in discovery")
return sresponse.Wallets(a.logger, nil, token)
}
a.logger.Debug("Resolved CRYPTO gateways for wallet list",
zap.String("organization_ref", orgRef.Hex()),
zap.Int("gateway_count", len(cryptoGateways)))
// Build request
req := &connectorv1.ListAccountsRequest{
@@ -76,11 +79,19 @@ func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *
// Query all gateways in parallel
allAccounts := a.queryAllGateways(ctx, cryptoGateways, req)
dedupedAccounts := dedupeAccountsByWalletRef(allAccounts)
a.logger.Debug("Wallet list fan-out completed",
zap.String("organization_ref", orgRef.Hex()),
zap.Int("accounts_raw", len(allAccounts)),
zap.Int("accounts_deduped", len(dedupedAccounts)),
zap.Int("gateway_count", len(cryptoGateways)))
if len(dedupedAccounts) != len(allAccounts) {
a.logger.Debug("Deduplicated duplicate wallets from gateway fan-out",
zap.Int("before", len(allAccounts)),
zap.Int("after", len(dedupedAccounts)))
}
for _, account := range dedupedAccounts {
a.rememberWalletRoute(ctx, orgRef.Hex(), accountWalletRef(account), accountNetwork(account), "")
}
return sresponse.WalletsFromAccounts(a.logger, dedupedAccounts, token)
}
@@ -162,6 +173,9 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G
var mu sync.Mutex
var wg sync.WaitGroup
allAccounts := make([]*connectorv1.Account, 0)
a.logger.Debug("Starting wallet list gateway fan-out",
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
zap.Int("gateway_count", len(gateways)))
for _, gw := range gateways {
wg.Add(1)
@@ -181,6 +195,9 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G
mu.Lock()
allAccounts = append(allAccounts, accounts...)
mu.Unlock()
for _, account := range accounts {
a.rememberWalletRoute(ctx, strings.TrimSpace(req.GetOrganizationRef()), accountWalletRef(account), accountNetwork(account), gateway.ID)
}
a.logger.Debug("Queried gateway successfully",
zap.String("gateway_id", gateway.ID),
@@ -190,6 +207,10 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G
}
wg.Wait()
a.logger.Debug("Finished wallet list gateway fan-out",
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
zap.Int("accounts_raw", len(allAccounts)),
zap.Int("gateway_count", len(gateways)))
return allAccounts
}

View File

@@ -0,0 +1,136 @@
package walletapiimp
import (
"context"
"errors"
"strings"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/model"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
"go.uber.org/zap"
)
func normalizeNetworkName(raw string) string {
value := strings.TrimSpace(raw)
if value == "" {
return ""
}
if idx := strings.Index(value, "-"); idx > 0 {
value = value[:idx]
}
value = strings.ToLower(value)
value = strings.TrimPrefix(value, "chain_network_")
return strings.TrimSpace(value)
}
func (a *WalletAPI) rememberWalletRoute(ctx context.Context, organizationRef string, walletRef string, network string, gatewayID string) {
if a.routes == nil {
return
}
walletRef = strings.TrimSpace(walletRef)
organizationRef = strings.TrimSpace(organizationRef)
network = normalizeNetworkName(network)
gatewayID = strings.TrimSpace(gatewayID)
if walletRef == "" || organizationRef == "" || (network == "" && gatewayID == "") {
return
}
if err := a.routes.Upsert(ctx, &model.ChainWalletRoute{
OrganizationRef: organizationRef,
WalletRef: walletRef,
Network: network,
GatewayID: gatewayID,
}); err != nil {
a.logger.Warn("Failed to persist wallet route",
zap.String("organization_ref", organizationRef),
zap.String("wallet_ref", walletRef),
zap.String("network", network),
zap.String("gateway_id", gatewayID),
zap.Error(err))
} else {
a.logger.Debug("Persisted wallet route",
zap.String("organization_ref", organizationRef),
zap.String("wallet_ref", walletRef),
zap.String("network", network),
zap.String("gateway_id", gatewayID))
}
}
func (a *WalletAPI) walletRoute(ctx context.Context, organizationRef string, walletRef string) (*model.ChainWalletRoute, error) {
if a.routes == nil {
return nil, nil
}
walletRef = strings.TrimSpace(walletRef)
organizationRef = strings.TrimSpace(organizationRef)
if walletRef == "" || organizationRef == "" {
return nil, nil
}
route, err := a.routes.Get(ctx, organizationRef, walletRef)
if err != nil {
if errors.Is(err, merrors.ErrNoData) {
return nil, nil
}
return nil, err
}
return route, nil
}
func findGatewayForRoute(gateways []discovery.GatewaySummary, route *model.ChainWalletRoute) *discovery.GatewaySummary {
if route == nil {
return nil
}
gatewayID := strings.TrimSpace(route.GatewayID)
if gatewayID != "" {
for _, gw := range gateways {
if strings.EqualFold(strings.TrimSpace(gw.ID), gatewayID) &&
strings.EqualFold(gw.Rail, cryptoRail) &&
gw.Healthy &&
strings.TrimSpace(gw.InvokeURI) != "" {
return &gw
}
}
}
return findGatewayForNetwork(gateways, route.Network)
}
func accountNetwork(account *connectorv1.Account) string {
if account == nil {
return ""
}
if details := account.GetProviderDetails(); details != nil {
if field, ok := details.GetFields()["network"]; ok && field != nil {
if network := normalizeNetworkName(field.GetStringValue()); network != "" {
return network
}
}
}
return normalizeNetworkName(account.GetAsset())
}
func dropGatewayByInvokeURI(gateways []discovery.GatewaySummary, invokeURI string) []discovery.GatewaySummary {
invokeURI = strings.ToLower(strings.TrimSpace(invokeURI))
if invokeURI == "" || len(gateways) == 0 {
return gateways
}
result := make([]discovery.GatewaySummary, 0, len(gateways))
for _, gw := range gateways {
if strings.ToLower(strings.TrimSpace(gw.InvokeURI)) == invokeURI {
continue
}
result = append(result, gw)
}
return result
}

View File

@@ -0,0 +1,93 @@
package walletapiimp
import (
"testing"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/model"
)
func TestFindGatewayForRoute_PrefersGatewayID(t *testing.T) {
gateways := []discovery.GatewaySummary{
{
ID: "gw-fallback",
Rail: "CRYPTO",
Healthy: true,
InvokeURI: "chain-gw:50070",
Network: "ethereum_mainnet",
},
{
ID: "gw-route",
Rail: "CRYPTO",
Healthy: true,
InvokeURI: "tron-gw:50071",
Network: "tron_mainnet",
},
}
route := &model.ChainWalletRoute{
WalletRef: "wallet-1",
Network: "ethereum_mainnet",
GatewayID: "gw-route",
}
selected := findGatewayForRoute(gateways, route)
if selected == nil {
t.Fatal("expected selected gateway")
}
if selected.ID != "gw-route" {
t.Fatalf("expected gw-route, got %q", selected.ID)
}
}
func TestFindGatewayForRoute_FallsBackToNetwork(t *testing.T) {
gateways := []discovery.GatewaySummary{
{
ID: "gw-chain",
Rail: "CRYPTO",
Healthy: true,
InvokeURI: "chain-gw:50070",
Network: "ethereum_mainnet",
},
{
ID: "gw-tron",
Rail: "CRYPTO",
Healthy: true,
InvokeURI: "tron-gw:50071",
Network: "tron_mainnet",
},
}
route := &model.ChainWalletRoute{
WalletRef: "wallet-1",
Network: "tron_mainnet",
GatewayID: "unknown",
}
selected := findGatewayForRoute(gateways, route)
if selected == nil {
t.Fatal("expected selected gateway")
}
if selected.ID != "gw-tron" {
t.Fatalf("expected gw-tron, got %q", selected.ID)
}
}
func TestNormalizeNetworkName(t *testing.T) {
tests := []struct {
in string
want string
}{
{in: "CHAIN_NETWORK_TRON_MAINNET", want: "tron_mainnet"},
{in: "tron_mainnet-USDT", want: "tron_mainnet"},
{in: " ethereum_mainnet ", want: "ethereum_mainnet"},
{in: "", want: ""},
}
for _, tc := range tests {
got := normalizeNetworkName(tc.in)
if got != tc.want {
t.Fatalf("normalizeNetworkName(%q) = %q, want %q", tc.in, got, tc.want)
}
}
}

View File

@@ -7,6 +7,7 @@ import (
api "github.com/tech/sendico/pkg/api/http"
"github.com/tech/sendico/pkg/auth"
"github.com/tech/sendico/pkg/db/chainassets"
"github.com/tech/sendico/pkg/db/chainwalletroutes"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
msg "github.com/tech/sendico/pkg/messaging"
@@ -34,6 +35,7 @@ type WalletAPI struct {
walletsPermissionRef bson.ObjectID
balancesPermissionRef bson.ObjectID
assets chainassets.DB
routes chainwalletroutes.DB
// Gateway connection settings
dialTimeout time.Duration
@@ -66,6 +68,10 @@ func CreateAPI(apiCtx eapi.API) (*WalletAPI, error) {
p.logger.Warn("Failed to create asstes db", zap.Error(err))
return nil, err
}
if p.routes, err = apiCtx.DBFactory().NewChainWalletRoutesDB(); err != nil {
p.logger.Warn("Failed to create chain wallet routes db", zap.Error(err))
return nil, err
}
walletsPolicy, err := apiCtx.Permissions().GetPolicyDescription(context.Background(), mservice.ChainWallets)
if err != nil {