package walletapiimp import ( "context" "crypto/tls" "net/http" "strings" "sync" "github.com/tech/sendico/pkg/api/http/response" "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/mservice" "github.com/tech/sendico/pkg/mutil/mzap" connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" "github.com/tech/sendico/server/interface/api/sresponse" mutil "github.com/tech/sendico/server/internal/mutil/param" "go.mongodb.org/mongo-driver/v2/bson" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/wrapperspb" ) func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc { orgRef, err := a.oph.GetRef(r) if err != nil { a.logger.Warn("Failed to parse organization reference for wallet list", zap.Error(err), zap.String(a.oph.Name(), a.oph.GetID(r))) return response.BadReference(a.logger, a.Name(), a.oph.Name(), a.oph.GetID(r), err) } ctx := r.Context() hasReadPermission, err := a.enf.Enforce(ctx, a.walletsPermissionRef, account.ID, orgRef, bson.NilObjectID, model.ActionRead) if err != nil { a.logger.Warn("Failed to check chain wallet access permissions", zap.Error(err), mutil.PLog(a.oph, r)) return response.Auto(a.logger, a.Name(), err) } if a.discovery == nil { return response.Internal(a.logger, mservice.ChainGateway, merrors.Internal("discovery client is not configured")) } // Discover CRYPTO rail gateways lookupCtx, cancel := context.WithTimeout(ctx, discoveryLookupTimeout) defer cancel() lookupResp, err := a.discovery.Lookup(lookupCtx) if err != nil { a.logger.Warn("Failed to lookup discovery registry", zap.Error(err)) return response.Auto(a.logger, a.Name(), err) } // Filter gateways by CRYPTO rail cryptoGateways := filterCryptoGateways(lookupResp.Gateways) if len(cryptoGateways) == 0 { a.logger.Debug("No CRYPTO rail gateways found in discovery") return sresponse.Wallets(a.logger, nil, token) } a.logger.Debug("Resolved CRYPTO gateways for wallet list", mzap.ObjRef("organization_ref", orgRef), zap.Int("gateway_count", len(cryptoGateways))) // Build request req := &connectorv1.ListAccountsRequest{ OrganizationRef: orgRef.Hex(), Kind: connectorv1.AccountKind_CHAIN_MANAGED_WALLET, } // If user has read permission, return all wallets in organization. // Otherwise, filter to only wallets owned by the requesting account. if !hasReadPermission { req.OwnerRefFilter = wrapperspb.String(account.ID.Hex()) a.logger.Debug("Filtering wallets by owner due to limited permissions", mzap.ObjRef("owner_ref", account.ID), mutil.PLog(a.oph, r)) } // Query all gateways in parallel allAccounts := a.queryAllGateways(ctx, cryptoGateways, req) dedupedAccounts := dedupeAccountsByWalletRef(allAccounts) a.logger.Debug("Wallet list fan-out completed", mzap.ObjRef("organization_ref", orgRef), zap.Int("accounts_raw", len(allAccounts)), zap.Int("accounts_deduped", len(dedupedAccounts)), zap.Int("gateway_count", len(cryptoGateways))) if len(dedupedAccounts) != len(allAccounts) { a.logger.Debug("Deduplicated duplicate wallets from gateway fan-out", zap.Int("before", len(allAccounts)), zap.Int("after", len(dedupedAccounts))) } for _, account := range dedupedAccounts { a.rememberWalletRoute(ctx, orgRef.Hex(), accountWalletRef(account), accountNetwork(account), "") } return sresponse.WalletsFromAccounts(a.logger, dedupedAccounts, token) } func filterCryptoGateways(gateways []discovery.GatewaySummary) []discovery.GatewaySummary { result := make([]discovery.GatewaySummary, 0) indexByInvokeURI := map[string]int{} for _, gw := range gateways { if strings.EqualFold(gw.Rail, cryptoRail) && gw.Healthy && strings.TrimSpace(gw.InvokeURI) != "" { invokeURI := strings.ToLower(strings.TrimSpace(gw.InvokeURI)) if idx, ok := indexByInvokeURI[invokeURI]; ok { // Keep the entry with higher priority if the same backend was announced multiple times. if gw.RoutingPriority > result[idx].RoutingPriority { result[idx] = gw } continue } indexByInvokeURI[invokeURI] = len(result) result = append(result, gw) } } return result } func dedupeAccountsByWalletRef(accounts []*connectorv1.Account) []*connectorv1.Account { if len(accounts) == 0 { return nil } result := make([]*connectorv1.Account, 0, len(accounts)) seen := make(map[string]struct{}, len(accounts)) for _, account := range accounts { if account == nil { continue } walletRef := accountWalletRef(account) if walletRef == "" { // If ref is missing, keep item to avoid dropping potentially valid records. result = append(result, account) continue } if _, ok := seen[walletRef]; ok { continue } seen[walletRef] = struct{}{} result = append(result, account) } return result } func accountWalletRef(account *connectorv1.Account) string { if account == nil { return "" } if ref := account.GetRef(); ref != nil { accountID := strings.TrimSpace(ref.GetAccountId()) if accountID != "" { return accountID } } details := account.GetProviderDetails() if details == nil { return "" } field, ok := details.GetFields()["wallet_ref"] if !ok || field == nil { return "" } return strings.TrimSpace(field.GetStringValue()) } func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) []*connectorv1.Account { var mu sync.Mutex var wg sync.WaitGroup allAccounts := make([]*connectorv1.Account, 0) a.logger.Debug("Starting wallet list gateway fan-out", zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), zap.Int("gateway_count", len(gateways))) for _, gw := range gateways { wg.Add(1) go func(gateway discovery.GatewaySummary) { defer wg.Done() accounts, err := a.queryGateway(ctx, gateway, req) if err != nil { a.logger.Warn("Failed to query gateway", zap.String("gateway_id", gateway.ID), zap.String("invoke_uri", gateway.InvokeURI), zap.String("network", gateway.Network), zap.Error(err)) return } mu.Lock() allAccounts = append(allAccounts, accounts...) mu.Unlock() for _, account := range accounts { a.rememberWalletRoute(ctx, strings.TrimSpace(req.GetOrganizationRef()), accountWalletRef(account), accountNetwork(account), gateway.ID) } a.logger.Debug("Queried gateway successfully", zap.String("gateway_id", gateway.ID), zap.String("network", gateway.Network), zap.Int("accounts_count", len(accounts))) }(gw) } wg.Wait() a.logger.Debug("Finished wallet list gateway fan-out", zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), zap.Int("accounts_raw", len(allAccounts)), zap.Int("gateway_count", len(gateways))) return allAccounts } func (a *WalletAPI) queryGateway(ctx context.Context, gateway discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) ([]*connectorv1.Account, error) { var dialOpts []grpc.DialOption if a.insecure { dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) } else { dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } conn, err := grpc.NewClient(gateway.InvokeURI, dialOpts...) if err != nil { return nil, merrors.InternalWrap(err, "dial gateway") } defer conn.Close() client := connectorv1.NewConnectorServiceClient(conn) // Call with timeout callCtx, callCancel := context.WithTimeout(ctx, a.callTimeout) defer callCancel() resp, err := client.ListAccounts(callCtx, req) if err != nil { return nil, err } return resp.GetAccounts(), nil }