Files
sendico/api/edge/bff/internal/server/walletapiimp/list.go
2026-02-28 00:39:20 +01:00

248 lines
7.8 KiB
Go

package walletapiimp
import (
"context"
"crypto/tls"
"net/http"
"strings"
"sync"
"github.com/tech/sendico/pkg/api/http/response"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
"github.com/tech/sendico/pkg/mutil/mzap"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
"github.com/tech/sendico/server/interface/api/sresponse"
mutil "github.com/tech/sendico/server/internal/mutil/param"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc {
orgRef, err := a.oph.GetRef(r)
if err != nil {
a.logger.Warn("Failed to parse organization reference for wallet list", zap.Error(err), zap.String(a.oph.Name(), a.oph.GetID(r)))
return response.BadReference(a.logger, a.Name(), a.oph.Name(), a.oph.GetID(r), err)
}
ctx := r.Context()
hasReadPermission, err := a.enf.Enforce(ctx, a.walletsPermissionRef, account.ID, orgRef, bson.NilObjectID, model.ActionRead)
if err != nil {
a.logger.Warn("Failed to check chain wallet access permissions", zap.Error(err), mutil.PLog(a.oph, r))
return response.Auto(a.logger, a.Name(), err)
}
if a.discovery == nil {
return response.Internal(a.logger, mservice.ChainGateway, merrors.Internal("discovery client is not configured"))
}
// Discover CRYPTO rail gateways
lookupCtx, cancel := context.WithTimeout(ctx, discoveryLookupTimeout)
defer cancel()
lookupResp, err := a.discovery.Lookup(lookupCtx)
if err != nil {
a.logger.Warn("Failed to lookup discovery registry", zap.Error(err))
return response.Auto(a.logger, a.Name(), err)
}
// Filter gateways by CRYPTO rail
cryptoGateways := filterCryptoGateways(lookupResp.Gateways)
if len(cryptoGateways) == 0 {
a.logger.Debug("No CRYPTO rail gateways found in discovery")
return sresponse.Wallets(a.logger, nil, token)
}
a.logger.Debug("Resolved CRYPTO gateways for wallet list",
zap.String("organization_ref", orgRef.Hex()),
zap.Int("gateway_count", len(cryptoGateways)))
// Build request
req := &connectorv1.ListAccountsRequest{
OrganizationRef: orgRef.Hex(),
Kind: connectorv1.AccountKind_CHAIN_MANAGED_WALLET,
}
// If user has read permission, return all wallets in organization.
// Otherwise, filter to only wallets owned by the requesting account.
if !hasReadPermission {
req.OwnerRefFilter = wrapperspb.String(account.ID.Hex())
a.logger.Debug("Filtering wallets by owner due to limited permissions",
mzap.ObjRef("owner_ref", account.ID), mutil.PLog(a.oph, r))
}
// Query all gateways in parallel
allAccounts := a.queryAllGateways(ctx, cryptoGateways, req)
dedupedAccounts := dedupeAccountsByWalletRef(allAccounts)
a.logger.Debug("Wallet list fan-out completed",
zap.String("organization_ref", orgRef.Hex()),
zap.Int("accounts_raw", len(allAccounts)),
zap.Int("accounts_deduped", len(dedupedAccounts)),
zap.Int("gateway_count", len(cryptoGateways)))
if len(dedupedAccounts) != len(allAccounts) {
a.logger.Debug("Deduplicated duplicate wallets from gateway fan-out",
zap.Int("before", len(allAccounts)),
zap.Int("after", len(dedupedAccounts)))
}
for _, account := range dedupedAccounts {
a.rememberWalletRoute(ctx, orgRef.Hex(), accountWalletRef(account), accountNetwork(account), "")
}
return sresponse.WalletsFromAccounts(a.logger, dedupedAccounts, token)
}
func filterCryptoGateways(gateways []discovery.GatewaySummary) []discovery.GatewaySummary {
result := make([]discovery.GatewaySummary, 0)
indexByInvokeURI := map[string]int{}
for _, gw := range gateways {
if strings.EqualFold(gw.Rail, cryptoRail) && gw.Healthy && strings.TrimSpace(gw.InvokeURI) != "" {
invokeURI := strings.ToLower(strings.TrimSpace(gw.InvokeURI))
if idx, ok := indexByInvokeURI[invokeURI]; ok {
// Keep the entry with higher priority if the same backend was announced multiple times.
if gw.RoutingPriority > result[idx].RoutingPriority {
result[idx] = gw
}
continue
}
indexByInvokeURI[invokeURI] = len(result)
result = append(result, gw)
}
}
return result
}
func dedupeAccountsByWalletRef(accounts []*connectorv1.Account) []*connectorv1.Account {
if len(accounts) == 0 {
return nil
}
result := make([]*connectorv1.Account, 0, len(accounts))
seen := make(map[string]struct{}, len(accounts))
for _, account := range accounts {
if account == nil {
continue
}
walletRef := accountWalletRef(account)
if walletRef == "" {
// If ref is missing, keep item to avoid dropping potentially valid records.
result = append(result, account)
continue
}
if _, ok := seen[walletRef]; ok {
continue
}
seen[walletRef] = struct{}{}
result = append(result, account)
}
return result
}
func accountWalletRef(account *connectorv1.Account) string {
if account == nil {
return ""
}
if ref := account.GetRef(); ref != nil {
accountID := strings.TrimSpace(ref.GetAccountId())
if accountID != "" {
return accountID
}
}
details := account.GetProviderDetails()
if details == nil {
return ""
}
field, ok := details.GetFields()["wallet_ref"]
if !ok || field == nil {
return ""
}
return strings.TrimSpace(field.GetStringValue())
}
func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) []*connectorv1.Account {
var mu sync.Mutex
var wg sync.WaitGroup
allAccounts := make([]*connectorv1.Account, 0)
a.logger.Debug("Starting wallet list gateway fan-out",
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
zap.Int("gateway_count", len(gateways)))
for _, gw := range gateways {
wg.Add(1)
go func(gateway discovery.GatewaySummary) {
defer wg.Done()
accounts, err := a.queryGateway(ctx, gateway, req)
if err != nil {
a.logger.Warn("Failed to query gateway",
zap.String("gateway_id", gateway.ID),
zap.String("invoke_uri", gateway.InvokeURI),
zap.String("network", gateway.Network),
zap.Error(err))
return
}
mu.Lock()
allAccounts = append(allAccounts, accounts...)
mu.Unlock()
for _, account := range accounts {
a.rememberWalletRoute(ctx, strings.TrimSpace(req.GetOrganizationRef()), accountWalletRef(account), accountNetwork(account), gateway.ID)
}
a.logger.Debug("Queried gateway successfully",
zap.String("gateway_id", gateway.ID),
zap.String("network", gateway.Network),
zap.Int("accounts_count", len(accounts)))
}(gw)
}
wg.Wait()
a.logger.Debug("Finished wallet list gateway fan-out",
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
zap.Int("accounts_raw", len(allAccounts)),
zap.Int("gateway_count", len(gateways)))
return allAccounts
}
func (a *WalletAPI) queryGateway(ctx context.Context, gateway discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) ([]*connectorv1.Account, error) {
// Create connection with timeout
dialCtx, cancel := context.WithTimeout(ctx, a.dialTimeout)
defer cancel()
var dialOpts []grpc.DialOption
if a.insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
}
conn, err := grpc.DialContext(dialCtx, gateway.InvokeURI, dialOpts...)
if err != nil {
return nil, merrors.InternalWrap(err, "dial gateway")
}
defer conn.Close()
client := connectorv1.NewConnectorServiceClient(conn)
// Call with timeout
callCtx, callCancel := context.WithTimeout(ctx, a.callTimeout)
defer callCancel()
resp, err := client.ListAccounts(callCtx, req)
if err != nil {
return nil, err
}
return resp.GetAccounts(), nil
}