Files
sendico/api/server/internal/server/walletapiimp/list.go
2026-01-30 16:39:12 +01:00

158 lines
5.0 KiB
Go

package walletapiimp
import (
"context"
"crypto/tls"
"net/http"
"strings"
"sync"
"github.com/tech/sendico/pkg/api/http/response"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
"github.com/tech/sendico/pkg/mutil/mzap"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
"github.com/tech/sendico/server/interface/api/sresponse"
mutil "github.com/tech/sendico/server/internal/mutil/param"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc {
orgRef, err := a.oph.GetRef(r)
if err != nil {
a.logger.Warn("Failed to parse organization reference for wallet list", zap.Error(err), zap.String(a.oph.Name(), a.oph.GetID(r)))
return response.BadReference(a.logger, a.Name(), a.oph.Name(), a.oph.GetID(r), err)
}
ctx := r.Context()
hasReadPermission, err := a.enf.Enforce(ctx, a.walletsPermissionRef, account.ID, orgRef, primitive.NilObjectID, model.ActionRead)
if err != nil {
a.logger.Warn("Failed to check chain wallet access permissions", zap.Error(err), mutil.PLog(a.oph, r))
return response.Auto(a.logger, a.Name(), err)
}
if a.discovery == nil {
return response.Internal(a.logger, mservice.ChainGateway, merrors.Internal("discovery client is not configured"))
}
// Discover CRYPTO rail gateways
lookupCtx, cancel := context.WithTimeout(ctx, discoveryLookupTimeout)
defer cancel()
lookupResp, err := a.discovery.Lookup(lookupCtx)
if err != nil {
a.logger.Warn("Failed to lookup discovery registry", zap.Error(err))
return response.Auto(a.logger, a.Name(), err)
}
// Filter gateways by CRYPTO rail
cryptoGateways := filterCryptoGateways(lookupResp.Gateways)
if len(cryptoGateways) == 0 {
a.logger.Debug("No CRYPTO rail gateways found in discovery")
return sresponse.Wallets(a.logger, nil, token)
}
// Build request
req := &connectorv1.ListAccountsRequest{
OrganizationRef: orgRef.Hex(),
Kind: connectorv1.AccountKind_CHAIN_MANAGED_WALLET,
}
// If user has read permission, return all wallets in organization.
// Otherwise, filter to only wallets owned by the requesting account.
if !hasReadPermission {
req.OwnerRefFilter = wrapperspb.String(account.ID.Hex())
a.logger.Debug("Filtering wallets by owner due to limited permissions",
mzap.ObjRef("owner_ref", account.ID), mutil.PLog(a.oph, r))
}
// Query all gateways in parallel
allAccounts := a.queryAllGateways(ctx, cryptoGateways, req)
return sresponse.WalletsFromAccounts(a.logger, allAccounts, token)
}
func filterCryptoGateways(gateways []discovery.GatewaySummary) []discovery.GatewaySummary {
result := make([]discovery.GatewaySummary, 0)
for _, gw := range gateways {
if strings.EqualFold(gw.Rail, cryptoRail) && gw.Healthy && strings.TrimSpace(gw.InvokeURI) != "" {
result = append(result, gw)
}
}
return result
}
func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) []*connectorv1.Account {
var mu sync.Mutex
var wg sync.WaitGroup
allAccounts := make([]*connectorv1.Account, 0)
for _, gw := range gateways {
wg.Add(1)
go func(gateway discovery.GatewaySummary) {
defer wg.Done()
accounts, err := a.queryGateway(ctx, gateway, req)
if err != nil {
a.logger.Warn("Failed to query gateway",
zap.String("gateway_id", gateway.ID),
zap.String("invoke_uri", gateway.InvokeURI),
zap.String("network", gateway.Network),
zap.Error(err))
return
}
mu.Lock()
allAccounts = append(allAccounts, accounts...)
mu.Unlock()
a.logger.Debug("Queried gateway successfully",
zap.String("gateway_id", gateway.ID),
zap.String("network", gateway.Network),
zap.Int("accounts_count", len(accounts)))
}(gw)
}
wg.Wait()
return allAccounts
}
func (a *WalletAPI) queryGateway(ctx context.Context, gateway discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) ([]*connectorv1.Account, error) {
// Create connection with timeout
dialCtx, cancel := context.WithTimeout(ctx, a.dialTimeout)
defer cancel()
var dialOpts []grpc.DialOption
if a.insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
}
conn, err := grpc.DialContext(dialCtx, gateway.InvokeURI, dialOpts...)
if err != nil {
return nil, merrors.InternalWrap(err, "dial gateway")
}
defer conn.Close()
client := connectorv1.NewConnectorServiceClient(conn)
// Call with timeout
callCtx, callCancel := context.WithTimeout(ctx, a.callTimeout)
defer callCancel()
resp, err := client.ListAccounts(callCtx, req)
if err != nil {
return nil, err
}
return resp.GetAccounts(), nil
}