wallets listing dedupe
This commit is contained in:
@@ -75,20 +75,89 @@ func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *
|
||||
|
||||
// Query all gateways in parallel
|
||||
allAccounts := a.queryAllGateways(ctx, cryptoGateways, req)
|
||||
dedupedAccounts := dedupeAccountsByWalletRef(allAccounts)
|
||||
if len(dedupedAccounts) != len(allAccounts) {
|
||||
a.logger.Debug("Deduplicated duplicate wallets from gateway fan-out",
|
||||
zap.Int("before", len(allAccounts)),
|
||||
zap.Int("after", len(dedupedAccounts)))
|
||||
}
|
||||
|
||||
return sresponse.WalletsFromAccounts(a.logger, allAccounts, token)
|
||||
return sresponse.WalletsFromAccounts(a.logger, dedupedAccounts, token)
|
||||
}
|
||||
|
||||
func filterCryptoGateways(gateways []discovery.GatewaySummary) []discovery.GatewaySummary {
|
||||
result := make([]discovery.GatewaySummary, 0)
|
||||
indexByInvokeURI := map[string]int{}
|
||||
for _, gw := range gateways {
|
||||
if strings.EqualFold(gw.Rail, cryptoRail) && gw.Healthy && strings.TrimSpace(gw.InvokeURI) != "" {
|
||||
invokeURI := strings.ToLower(strings.TrimSpace(gw.InvokeURI))
|
||||
if idx, ok := indexByInvokeURI[invokeURI]; ok {
|
||||
// Keep the entry with higher priority if the same backend was announced multiple times.
|
||||
if gw.RoutingPriority > result[idx].RoutingPriority {
|
||||
result[idx] = gw
|
||||
}
|
||||
continue
|
||||
}
|
||||
indexByInvokeURI[invokeURI] = len(result)
|
||||
result = append(result, gw)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func dedupeAccountsByWalletRef(accounts []*connectorv1.Account) []*connectorv1.Account {
|
||||
if len(accounts) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
result := make([]*connectorv1.Account, 0, len(accounts))
|
||||
seen := make(map[string]struct{}, len(accounts))
|
||||
|
||||
for _, account := range accounts {
|
||||
if account == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
walletRef := accountWalletRef(account)
|
||||
if walletRef == "" {
|
||||
// If ref is missing, keep item to avoid dropping potentially valid records.
|
||||
result = append(result, account)
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[walletRef]; ok {
|
||||
continue
|
||||
}
|
||||
seen[walletRef] = struct{}{}
|
||||
result = append(result, account)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func accountWalletRef(account *connectorv1.Account) string {
|
||||
if account == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
if ref := account.GetRef(); ref != nil {
|
||||
accountID := strings.TrimSpace(ref.GetAccountId())
|
||||
if accountID != "" {
|
||||
return accountID
|
||||
}
|
||||
}
|
||||
|
||||
details := account.GetProviderDetails()
|
||||
if details == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
field, ok := details.GetFields()["wallet_ref"]
|
||||
if !ok || field == nil {
|
||||
return ""
|
||||
}
|
||||
return strings.TrimSpace(field.GetStringValue())
|
||||
}
|
||||
|
||||
func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) []*connectorv1.Account {
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
||||
116
api/server/internal/server/walletapiimp/list_test.go
Normal file
116
api/server/internal/server/walletapiimp/list_test.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package walletapiimp
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/tech/sendico/pkg/discovery"
|
||||
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
)
|
||||
|
||||
func TestFilterCryptoGateways_DedupesByInvokeURI(t *testing.T) {
|
||||
gateways := []discovery.GatewaySummary{
|
||||
{
|
||||
ID: "gw-low",
|
||||
Rail: "CRYPTO",
|
||||
Healthy: true,
|
||||
InvokeURI: "dev-tron-gateway:50071",
|
||||
Network: "TRON_MAINNET",
|
||||
RoutingPriority: 10,
|
||||
},
|
||||
{
|
||||
ID: "gw-high",
|
||||
Rail: "CRYPTO",
|
||||
Healthy: true,
|
||||
InvokeURI: "dev-tron-gateway:50071",
|
||||
Network: "TRON_NILE",
|
||||
RoutingPriority: 20,
|
||||
},
|
||||
{
|
||||
ID: "gw-chain",
|
||||
Rail: "CRYPTO",
|
||||
Healthy: true,
|
||||
InvokeURI: "dev-chain-gateway:50070",
|
||||
Network: "ARBITRUM_SEPOLIA",
|
||||
RoutingPriority: 5,
|
||||
},
|
||||
{
|
||||
ID: "gw-unhealthy",
|
||||
Rail: "CRYPTO",
|
||||
Healthy: false,
|
||||
InvokeURI: "dev-unhealthy:50070",
|
||||
Network: "TRON_MAINNET",
|
||||
RoutingPriority: 99,
|
||||
},
|
||||
}
|
||||
|
||||
filtered := filterCryptoGateways(gateways)
|
||||
if len(filtered) != 2 {
|
||||
t.Fatalf("expected 2 filtered gateways, got %d", len(filtered))
|
||||
}
|
||||
|
||||
byInvoke := map[string]discovery.GatewaySummary{}
|
||||
for _, gw := range filtered {
|
||||
byInvoke[gw.InvokeURI] = gw
|
||||
}
|
||||
|
||||
tron, ok := byInvoke["dev-tron-gateway:50071"]
|
||||
if !ok {
|
||||
t.Fatalf("expected tron gateway entry")
|
||||
}
|
||||
if tron.ID != "gw-high" {
|
||||
t.Fatalf("expected higher-priority duplicate to win, got %q", tron.ID)
|
||||
}
|
||||
|
||||
if _, ok := byInvoke["dev-chain-gateway:50070"]; !ok {
|
||||
t.Fatalf("expected chain gateway entry")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDedupeAccountsByWalletRef(t *testing.T) {
|
||||
detailsA, err := structpb.NewStruct(map[string]interface{}{"wallet_ref": "wallet-3"})
|
||||
if err != nil {
|
||||
t.Fatalf("build provider details: %v", err)
|
||||
}
|
||||
detailsB, err := structpb.NewStruct(map[string]interface{}{"wallet_ref": "wallet-3"})
|
||||
if err != nil {
|
||||
t.Fatalf("build provider details: %v", err)
|
||||
}
|
||||
|
||||
accounts := []*connectorv1.Account{
|
||||
{Ref: &connectorv1.AccountRef{AccountId: "wallet-1"}},
|
||||
{Ref: &connectorv1.AccountRef{AccountId: "wallet-1"}}, // duplicate
|
||||
{Ref: &connectorv1.AccountRef{AccountId: "wallet-2"}},
|
||||
{ProviderDetails: detailsA},
|
||||
{ProviderDetails: detailsB}, // duplicate via provider_details.wallet_ref
|
||||
{Ref: &connectorv1.AccountRef{AccountId: ""}}, // kept: missing ref
|
||||
nil, // ignored
|
||||
}
|
||||
|
||||
deduped := dedupeAccountsByWalletRef(accounts)
|
||||
if len(deduped) != 4 {
|
||||
t.Fatalf("expected 4 accounts after dedupe, got %d", len(deduped))
|
||||
}
|
||||
|
||||
seen := map[string]int{}
|
||||
for _, acc := range deduped {
|
||||
if acc == nil {
|
||||
t.Fatalf("deduped account should never be nil")
|
||||
}
|
||||
ref := accountWalletRef(acc)
|
||||
seen[ref]++
|
||||
}
|
||||
|
||||
if seen["wallet-1"] != 1 {
|
||||
t.Fatalf("expected wallet-1 once, got %d", seen["wallet-1"])
|
||||
}
|
||||
if seen["wallet-2"] != 1 {
|
||||
t.Fatalf("expected wallet-2 once, got %d", seen["wallet-2"])
|
||||
}
|
||||
if seen["wallet-3"] != 1 {
|
||||
t.Fatalf("expected wallet-3 once, got %d", seen["wallet-3"])
|
||||
}
|
||||
if seen[""] != 1 {
|
||||
t.Fatalf("expected one account with missing wallet ref, got %d", seen[""])
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user