bff dev upgrde

This commit is contained in:
Stephan D
2026-01-30 16:39:12 +01:00
parent 51f5b0804a
commit e1f58b0982
22 changed files with 969 additions and 185 deletions

View File

@@ -10,18 +10,18 @@ import (
func Network2Proto(network model.ChainNetwork) (chainv1.ChainNetwork, error) {
switch network {
case model.ChainNetworkARB:
case model.ChainNetworkArbitrumOne:
return chainv1.ChainNetwork_CHAIN_NETWORK_ARBITRUM_ONE, nil
case model.ChainNetworkEthMain:
case model.ChainNetworkEthereumMainnet:
return chainv1.ChainNetwork_CHAIN_NETWORK_ETHEREUM_MAINNET, nil
case model.ChainNetworkTronMain:
case model.ChainNetworkTronMainnet:
return chainv1.ChainNetwork_CHAIN_NETWORK_TRON_MAINNET, nil
case model.ChainNetworkTronNile:
return chainv1.ChainNetwork_CHAIN_NETWORK_TRON_NILE, nil
case model.ChainNetworkUnspecified:
return chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED, nil
default:
return chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED, merrors.InvalidArgument(fmt.Sprintf("Unkwnown chain network value '%s'", network), "network")
return chainv1.ChainNetwork_CHAIN_NETWORK_UNSPECIFIED, merrors.InvalidArgument(fmt.Sprintf("unknown chain network value '%s'", network), "network")
}
}

View File

@@ -47,6 +47,10 @@ func (a *LedgerAPI) createAccount(r *http.Request, account *model.Account, token
if err != nil {
return response.BadPayload(a.logger, a.Name(), err)
}
accountRole, err := mapLedgerAccountRole(payload.Role)
if err != nil {
return response.BadPayload(a.logger, a.Name(), err)
}
if a.client == nil {
return response.Internal(a.logger, mservice.Ledger, merrors.Internal("ledger client is not configured"))
}
@@ -78,7 +82,7 @@ func (a *LedgerAPI) createAccount(r *http.Request, account *model.Account, token
Currency: payload.Currency,
Status: ledgerv1.AccountStatus_ACCOUNT_STATUS_ACTIVE,
AllowNegative: payload.AllowNegative,
IsSettlement: payload.IsSettlement,
Role: accountRole,
Metadata: payload.Metadata,
Describable: describable,
})
@@ -128,14 +132,14 @@ func mapLedgerAccountType(accountType srequest.LedgerAccountType) (ledgerv1.Acco
return parsed, nil
}
func mapLedgerAccountStatus(status srequest.LedgerAccountStatus) (ledgerv1.AccountStatus, error) {
raw := string(status)
if ledgerconv.IsAccountStatusUnspecified(raw) {
return ledgerv1.AccountStatus_ACCOUNT_STATUS_UNSPECIFIED, nil
func mapLedgerAccountRole(role model.AccountRole) (ledgerv1.AccountRole, error) {
raw := strings.TrimSpace(string(role))
if ledgerconv.IsAccountRoleUnspecified(raw) {
return ledgerv1.AccountRole_ACCOUNT_ROLE_OPERATING, nil
}
parsed, ok := ledgerconv.ParseAccountStatus(raw)
parsed, ok := ledgerconv.ParseAccountRole(raw)
if !ok {
return ledgerv1.AccountStatus_ACCOUNT_STATUS_UNSPECIFIED, merrors.InvalidArgument("unsupported status: "+string(status), "status")
return ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED, merrors.InvalidArgument("unsupported role: "+raw, "role")
}
return parsed, nil
}

View File

@@ -7,11 +7,13 @@ import (
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
"github.com/tech/sendico/pkg/mutil/mzap"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"github.com/tech/sendico/server/interface/api/sresponse"
mutil "github.com/tech/sendico/server/internal/mutil/param"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func (a *LedgerAPI) listAccounts(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc {
@@ -22,22 +24,28 @@ func (a *LedgerAPI) listAccounts(r *http.Request, account *model.Account, token
}
ctx := r.Context()
res, err := a.enf.Enforce(ctx, a.permissionRef, account.ID, orgRef, primitive.NilObjectID, model.ActionRead)
hasReadPermission, err := a.enf.Enforce(ctx, a.permissionRef, account.ID, orgRef, primitive.NilObjectID, model.ActionRead)
if err != nil {
a.logger.Warn("Failed to check ledger accounts access permissions", zap.Error(err), mutil.PLog(a.oph, r))
return response.Auto(a.logger, a.Name(), err)
}
if !res {
a.logger.Debug("Access denied when listing ledger accounts", mutil.PLog(a.oph, r))
return response.AccessDenied(a.logger, a.Name(), "ledger accounts read permission denied")
}
if a.client == nil {
return response.Internal(a.logger, mservice.Ledger, merrors.Internal("ledger client is not configured"))
}
resp, err := a.client.ListAccounts(ctx, &ledgerv1.ListAccountsRequest{
req := &ledgerv1.ListAccountsRequest{
OrganizationRef: orgRef.Hex(),
})
}
// If user has read permission, return all accounts in organization.
// Otherwise, filter to only accounts owned by the requesting account.
if !hasReadPermission {
req.OwnerRefFilter = wrapperspb.String(account.ID.Hex())
a.logger.Debug("Filtering ledger accounts by owner due to limited permissions",
mzap.ObjRef("owner_ref", account.ID), mutil.PLog(a.oph, r))
}
resp, err := a.client.ListAccounts(ctx, req)
if err != nil {
a.logger.Warn("Failed to list ledger accounts", zap.Error(err), zap.String("organization_ref", orgRef.Hex()))
return response.Auto(a.logger, mservice.Ledger, err)

View File

@@ -1,18 +1,25 @@
package walletapiimp
import (
"context"
"crypto/tls"
"net/http"
"strings"
"sync"
"github.com/tech/sendico/pkg/api/http/response"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
"github.com/tech/sendico/server/interface/api/sresponse"
mutil "github.com/tech/sendico/server/internal/mutil/param"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
func (a *WalletAPI) getWalletBalance(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc {
@@ -36,21 +43,126 @@ func (a *WalletAPI) getWalletBalance(r *http.Request, account *model.Account, to
a.logger.Debug("Access denied when reading wallet balance", mutil.PLog(a.oph, r), zap.String("wallet_ref", walletRef))
return response.AccessDenied(a.logger, a.Name(), "wallet balance read permission denied")
}
if a.chainGateway == nil {
return response.Internal(a.logger, mservice.ChainGateway, merrors.Internal("chain gateway client is not configured"))
if a.discovery == nil {
return response.Internal(a.logger, mservice.ChainGateway, merrors.Internal("discovery client is not configured"))
}
resp, err := a.chainGateway.GetWalletBalance(ctx, &chainv1.GetWalletBalanceRequest{WalletRef: walletRef})
// Discover CRYPTO rail gateways
lookupCtx, cancel := context.WithTimeout(ctx, discoveryLookupTimeout)
defer cancel()
lookupResp, err := a.discovery.Lookup(lookupCtx)
if err != nil {
a.logger.Warn("Failed to fetch wallet balance", zap.Error(err), zap.String("wallet_ref", walletRef))
a.logger.Warn("Failed to lookup discovery registry", zap.Error(err))
return response.Auto(a.logger, a.Name(), err)
}
// Filter gateways by CRYPTO rail
cryptoGateways := filterCryptoGateways(lookupResp.Gateways)
if len(cryptoGateways) == 0 {
a.logger.Debug("No CRYPTO rail gateways found in discovery")
return response.Auto(a.logger, a.Name(), merrors.NoData("no crypto gateways available"))
}
// Query all gateways in parallel to find the wallet balance
bal, err := a.queryBalanceFromGateways(ctx, cryptoGateways, walletRef)
if err != nil {
a.logger.Warn("Failed to fetch wallet balance from gateways", zap.Error(err), zap.String("wallet_ref", walletRef))
return response.Auto(a.logger, mservice.ChainGateway, err)
}
bal := resp.GetBalance()
if bal == nil {
a.logger.Warn("Wallet balance missing in response", zap.String("wallet_ref", walletRef))
return response.Auto(a.logger, mservice.ChainGateway, merrors.Internal("wallet balance not available"))
a.logger.Warn("Wallet balance not found on any gateway", zap.String("wallet_ref", walletRef))
return response.Auto(a.logger, mservice.ChainGateway, merrors.NoData("wallet not found"))
}
return sresponse.WalletBalance(a.logger, bal, token)
return sresponse.WalletBalanceFromConnector(a.logger, bal, token)
}
func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []discovery.GatewaySummary, walletRef string) (*connectorv1.Balance, error) {
var mu sync.Mutex
var wg sync.WaitGroup
var result *connectorv1.Balance
var lastErr error
for _, gw := range gateways {
wg.Add(1)
go func(gateway discovery.GatewaySummary) {
defer wg.Done()
bal, err := a.queryGatewayBalance(ctx, gateway, walletRef)
if err != nil {
a.logger.Debug("Failed to query gateway for balance",
zap.String("gateway_id", gateway.ID),
zap.String("invoke_uri", gateway.InvokeURI),
zap.String("wallet_ref", walletRef),
zap.Error(err))
mu.Lock()
lastErr = err
mu.Unlock()
return
}
if bal != nil {
mu.Lock()
if result == nil {
result = bal
a.logger.Debug("Found wallet balance on gateway",
zap.String("gateway_id", gateway.ID),
zap.String("network", gateway.Network),
zap.String("wallet_ref", walletRef))
}
mu.Unlock()
}
}(gw)
}
wg.Wait()
if result != nil {
return result, nil
}
if lastErr != nil {
return nil, lastErr
}
return nil, nil
}
func (a *WalletAPI) queryGatewayBalance(ctx context.Context, gateway discovery.GatewaySummary, walletRef string) (*connectorv1.Balance, error) {
// Create connection with timeout
dialCtx, cancel := context.WithTimeout(ctx, a.dialTimeout)
defer cancel()
var dialOpts []grpc.DialOption
if a.insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
}
conn, err := grpc.DialContext(dialCtx, gateway.InvokeURI, dialOpts...)
if err != nil {
return nil, merrors.InternalWrap(err, "dial gateway")
}
defer conn.Close()
client := connectorv1.NewConnectorServiceClient(conn)
// Call with timeout
callCtx, callCancel := context.WithTimeout(ctx, a.callTimeout)
defer callCancel()
req := &connectorv1.GetBalanceRequest{
AccountRef: &connectorv1.AccountRef{
AccountId: walletRef,
},
}
resp, err := client.GetBalance(callCtx, req)
if err != nil {
return nil, err
}
return resp.GetBalance(), nil
}

View File

@@ -1,24 +1,29 @@
package walletapiimp
import (
"context"
"crypto/tls"
"encoding/json"
"net/http"
"strings"
"github.com/google/uuid"
"github.com/tech/sendico/pkg/api/http/response"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
"github.com/tech/sendico/pkg/mutil/mzap"
describablev1 "github.com/tech/sendico/pkg/proto/common/describable/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
"github.com/tech/sendico/server/interface/api/srequest"
"github.com/tech/sendico/server/interface/api/sresponse"
mutil "github.com/tech/sendico/server/internal/mutil/param"
ast "github.com/tech/sendico/server/internal/mutil/proto"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/structpb"
)
func (a *WalletAPI) create(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc {
@@ -52,47 +57,134 @@ func (a *WalletAPI) create(r *http.Request, account *model.Account, token *sresp
return response.Auto(a.logger, a.Name(), err)
}
if a.chainGateway == nil {
return response.Internal(a.logger, mservice.ChainGateway, merrors.Internal("chain gateway client is not configured"))
if a.discovery == nil {
return response.Internal(a.logger, mservice.ChainGateway, merrors.Internal("discovery client is not configured"))
}
// Find gateway for this network
lookupCtx, cancel := context.WithTimeout(ctx, discoveryLookupTimeout)
defer cancel()
lookupResp, err := a.discovery.Lookup(lookupCtx)
if err != nil {
a.logger.Warn("Failed to lookup discovery registry", zap.Error(err))
return response.Auto(a.logger, a.Name(), err)
}
// Find gateway that handles this network
networkName := strings.ToLower(string(asset.Asset.Chain))
gateway := findGatewayForNetwork(lookupResp.Gateways, networkName)
if gateway == nil {
a.logger.Warn("No gateway found for network",
zap.String("network", networkName),
zap.String("chain", string(sr.Asset.Chain)))
return response.Auto(a.logger, a.Name(), merrors.InvalidArgument("no gateway available for network: "+networkName))
}
var ownerRef string
if sr.OwnerRef != nil && !sr.OwnerRef.IsZero() {
ownerRef = sr.OwnerRef.Hex()
}
passet, err := ast.Asset2Proto(&asset.Asset)
if err != nil {
a.logger.Warn("Failed to convert asset to proto asset", zap.Error(err),
mzap.StorableRef(asset), mzap.StorableRef(account))
return response.Auto(a.logger, a.Name(), err)
// Build params for connector OpenAccount
params := map[string]interface{}{
"organization_ref": orgRef.Hex(),
"network": networkName,
"token_symbol": asset.Asset.TokenSymbol,
"contract_address": asset.Asset.ContractAddress,
}
if sr.Description.Description != nil {
params["description"] = *sr.Description.Description
}
params["metadata"] = map[string]interface{}{
"source": "create",
"login": account.Login,
}
req := &chainv1.CreateManagedWalletRequest{
IdempotencyKey: uuid.NewString(),
OrganizationRef: orgRef.Hex(),
OwnerRef: ownerRef,
Describable: &describablev1.Describable{
Name: sr.Description.Name,
Description: sr.Description.Description,
},
Asset: passet,
Metadata: map[string]string{
"source": "create",
"login": account.Login,
},
paramsStruct, _ := structpb.NewStruct(params)
assetString := networkName + "-" + asset.Asset.TokenSymbol
req := &connectorv1.OpenAccountRequest{
IdempotencyKey: uuid.NewString(),
Kind: connectorv1.AccountKind_CHAIN_MANAGED_WALLET,
Asset: assetString,
OwnerRef: ownerRef,
Label: sr.Description.Name,
Params: paramsStruct,
}
resp, err := a.chainGateway.CreateManagedWallet(ctx, req)
// Connect to gateway and create wallet
walletRef, err := a.createWalletOnGateway(ctx, *gateway, req)
if err != nil {
a.logger.Warn("Failed to create managed wallet", zap.Error(err), mzap.ObjRef("organization_ref", orgRef), mzap.StorableRef(account))
a.logger.Warn("Failed to create managed wallet", zap.Error(err),
mzap.ObjRef("organization_ref", orgRef), mzap.StorableRef(account),
zap.String("gateway_id", gateway.ID), zap.String("network", gateway.Network))
return response.Auto(a.logger, a.Name(), err)
}
if resp == nil || resp.Wallet == nil || strings.TrimSpace(resp.Wallet.WalletRef) == "" {
return response.Auto(a.logger, a.Name(), merrors.Internal("chain gateway returned empty wallet reference"))
}
a.logger.Info("Managed wallet created for organization", mzap.ObjRef("organization_ref", orgRef),
zap.String("wallet_ref", resp.Wallet.WalletRef), mzap.StorableRef(account))
zap.String("wallet_ref", walletRef), mzap.StorableRef(account),
zap.String("gateway_id", gateway.ID), zap.String("network", gateway.Network))
return sresponse.Success(a.logger, token)
}
func findGatewayForNetwork(gateways []discovery.GatewaySummary, network string) *discovery.GatewaySummary {
network = strings.ToLower(strings.TrimSpace(network))
for _, gw := range gateways {
if !strings.EqualFold(gw.Rail, cryptoRail) || !gw.Healthy || strings.TrimSpace(gw.InvokeURI) == "" {
continue
}
// Check if gateway network matches
gwNetwork := strings.ToLower(strings.TrimSpace(gw.Network))
if gwNetwork == network {
return &gw
}
// Also check if network starts with gateway network prefix (e.g., "tron" matches "tron_mainnet")
if strings.HasPrefix(network, gwNetwork) || strings.HasPrefix(gwNetwork, network) {
return &gw
}
}
return nil
}
func (a *WalletAPI) createWalletOnGateway(ctx context.Context, gateway discovery.GatewaySummary, req *connectorv1.OpenAccountRequest) (string, error) {
// Create connection with timeout
dialCtx, cancel := context.WithTimeout(ctx, a.dialTimeout)
defer cancel()
var dialOpts []grpc.DialOption
if a.insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
}
conn, err := grpc.DialContext(dialCtx, gateway.InvokeURI, dialOpts...)
if err != nil {
return "", merrors.InternalWrap(err, "dial gateway")
}
defer conn.Close()
client := connectorv1.NewConnectorServiceClient(conn)
// Call with timeout
callCtx, callCancel := context.WithTimeout(ctx, a.callTimeout)
defer callCancel()
resp, err := client.OpenAccount(callCtx, req)
if err != nil {
return "", err
}
if resp.GetError() != nil {
return "", merrors.Internal(resp.GetError().GetMessage())
}
account := resp.GetAccount()
if account == nil || account.GetRef() == nil {
return "", merrors.Internal("gateway returned empty account")
}
return strings.TrimSpace(account.GetRef().GetAccountId()), nil
}

View File

@@ -1,18 +1,27 @@
package walletapiimp
import (
"context"
"crypto/tls"
"net/http"
"strings"
"sync"
"github.com/tech/sendico/pkg/api/http/response"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mservice"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"github.com/tech/sendico/pkg/mutil/mzap"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
"github.com/tech/sendico/server/interface/api/sresponse"
mutil "github.com/tech/sendico/server/internal/mutil/param"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
)
func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *sresponse.TokenData) http.HandlerFunc {
@@ -23,31 +32,126 @@ func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *
}
ctx := r.Context()
res, err := a.enf.Enforce(ctx, a.walletsPermissionRef, account.ID, orgRef, primitive.NilObjectID, model.ActionRead)
hasReadPermission, err := a.enf.Enforce(ctx, a.walletsPermissionRef, account.ID, orgRef, primitive.NilObjectID, model.ActionRead)
if err != nil {
a.logger.Warn("Failed to check chain wallet access permissions", zap.Error(err), mutil.PLog(a.oph, r))
return response.Auto(a.logger, a.Name(), err)
}
if !res {
a.logger.Debug("Access denied when listing organization wallets", mutil.PLog(a.oph, r))
return response.AccessDenied(a.logger, a.Name(), "wallets read permission denied")
}
if a.chainGateway == nil {
return response.Internal(a.logger, mservice.ChainGateway, merrors.Internal("chain gateway client is not configured"))
if a.discovery == nil {
return response.Internal(a.logger, mservice.ChainGateway, merrors.Internal("discovery client is not configured"))
}
req := &chainv1.ListManagedWalletsRequest{
OrganizationRef: orgRef.Hex(),
}
if owner := strings.TrimSpace(r.URL.Query().Get("owner_ref")); owner != "" {
req.OwnerRef = owner
}
// Discover CRYPTO rail gateways
lookupCtx, cancel := context.WithTimeout(ctx, discoveryLookupTimeout)
defer cancel()
resp, err := a.chainGateway.ListManagedWallets(ctx, req)
lookupResp, err := a.discovery.Lookup(lookupCtx)
if err != nil {
a.logger.Warn("Failed to list managed wallets", zap.Error(err), zap.String("organization_ref", orgRef.Hex()))
return response.Auto(a.logger, mservice.ChainGateway, err)
a.logger.Warn("Failed to lookup discovery registry", zap.Error(err))
return response.Auto(a.logger, a.Name(), err)
}
return sresponse.Wallets(a.logger, resp, token)
// Filter gateways by CRYPTO rail
cryptoGateways := filterCryptoGateways(lookupResp.Gateways)
if len(cryptoGateways) == 0 {
a.logger.Debug("No CRYPTO rail gateways found in discovery")
return sresponse.Wallets(a.logger, nil, token)
}
// Build request
req := &connectorv1.ListAccountsRequest{
OrganizationRef: orgRef.Hex(),
Kind: connectorv1.AccountKind_CHAIN_MANAGED_WALLET,
}
// If user has read permission, return all wallets in organization.
// Otherwise, filter to only wallets owned by the requesting account.
if !hasReadPermission {
req.OwnerRefFilter = wrapperspb.String(account.ID.Hex())
a.logger.Debug("Filtering wallets by owner due to limited permissions",
mzap.ObjRef("owner_ref", account.ID), mutil.PLog(a.oph, r))
}
// Query all gateways in parallel
allAccounts := a.queryAllGateways(ctx, cryptoGateways, req)
return sresponse.WalletsFromAccounts(a.logger, allAccounts, token)
}
func filterCryptoGateways(gateways []discovery.GatewaySummary) []discovery.GatewaySummary {
result := make([]discovery.GatewaySummary, 0)
for _, gw := range gateways {
if strings.EqualFold(gw.Rail, cryptoRail) && gw.Healthy && strings.TrimSpace(gw.InvokeURI) != "" {
result = append(result, gw)
}
}
return result
}
func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) []*connectorv1.Account {
var mu sync.Mutex
var wg sync.WaitGroup
allAccounts := make([]*connectorv1.Account, 0)
for _, gw := range gateways {
wg.Add(1)
go func(gateway discovery.GatewaySummary) {
defer wg.Done()
accounts, err := a.queryGateway(ctx, gateway, req)
if err != nil {
a.logger.Warn("Failed to query gateway",
zap.String("gateway_id", gateway.ID),
zap.String("invoke_uri", gateway.InvokeURI),
zap.String("network", gateway.Network),
zap.Error(err))
return
}
mu.Lock()
allAccounts = append(allAccounts, accounts...)
mu.Unlock()
a.logger.Debug("Queried gateway successfully",
zap.String("gateway_id", gateway.ID),
zap.String("network", gateway.Network),
zap.Int("accounts_count", len(accounts)))
}(gw)
}
wg.Wait()
return allAccounts
}
func (a *WalletAPI) queryGateway(ctx context.Context, gateway discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) ([]*connectorv1.Account, error) {
// Create connection with timeout
dialCtx, cancel := context.WithTimeout(ctx, a.dialTimeout)
defer cancel()
var dialOpts []grpc.DialOption
if a.insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
}
conn, err := grpc.DialContext(dialCtx, gateway.InvokeURI, dialOpts...)
if err != nil {
return nil, merrors.InternalWrap(err, "dial gateway")
}
defer conn.Close()
client := connectorv1.NewConnectorServiceClient(conn)
// Call with timeout
callCtx, callCancel := context.WithTimeout(ctx, a.callTimeout)
defer callCancel()
resp, err := client.ListAccounts(callCtx, req)
if err != nil {
return nil, err
}
return resp.GetAccounts(), nil
}

View File

@@ -2,60 +2,63 @@ package walletapiimp
import (
"context"
"fmt"
"os"
"strings"
"time"
chaingatewayclient "github.com/tech/sendico/gateway/chain/client"
api "github.com/tech/sendico/pkg/api/http"
"github.com/tech/sendico/pkg/auth"
"github.com/tech/sendico/pkg/db/chainassets"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
eapi "github.com/tech/sendico/server/interface/api"
mutil "github.com/tech/sendico/server/internal/mutil/param"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
)
const (
cryptoRail = "CRYPTO"
defaultDialTimeout = 5 * time.Second
defaultCallTimeout = 10 * time.Second
discoveryLookupTimeout = 3 * time.Second
)
type WalletAPI struct {
logger mlogger.Logger
chainGateway chainWalletClient
discovery *discovery.Client
enf auth.Enforcer
oph mutil.ParamHelper
wph mutil.ParamHelper
walletsPermissionRef primitive.ObjectID
balancesPermissionRef primitive.ObjectID
assets chainassets.DB
}
type chainWalletClient interface {
CreateManagedWallet(ctx context.Context, req *chainv1.CreateManagedWalletRequest) (*chainv1.CreateManagedWalletResponse, error)
ListManagedWallets(ctx context.Context, req *chainv1.ListManagedWalletsRequest) (*chainv1.ListManagedWalletsResponse, error)
GetWalletBalance(ctx context.Context, req *chainv1.GetWalletBalanceRequest) (*chainv1.GetWalletBalanceResponse, error)
Close() error
// Gateway connection settings
dialTimeout time.Duration
callTimeout time.Duration
insecure bool
}
func (a *WalletAPI) Name() mservice.Type { return mservice.ChainWallets }
func (a *WalletAPI) Finish(ctx context.Context) error {
if a.chainGateway != nil {
if err := a.chainGateway.Close(); err != nil {
a.logger.Warn("Failed to close chain gateway client", zap.Error(err))
}
if a.discovery != nil {
a.discovery.Close()
}
return nil
}
func CreateAPI(apiCtx eapi.API) (*WalletAPI, error) {
p := &WalletAPI{
logger: apiCtx.Logger().Named(mservice.Wallets),
enf: apiCtx.Permissions().Enforcer(),
oph: mutil.CreatePH(mservice.Organizations),
wph: mutil.CreatePH(mservice.Wallets),
logger: apiCtx.Logger().Named(mservice.Wallets),
enf: apiCtx.Permissions().Enforcer(),
oph: mutil.CreatePH(mservice.Organizations),
wph: mutil.CreatePH(mservice.Wallets),
dialTimeout: defaultDialTimeout,
callTimeout: defaultCallTimeout,
insecure: true,
}
var err error
@@ -83,9 +86,22 @@ func CreateAPI(apiCtx eapi.API) (*WalletAPI, error) {
p.logger.Error("Failed to fetch service configuration")
return nil, merrors.InvalidArgument("No configuration provided")
}
if err := p.initChainGateway(cfg.ChainGateway); err != nil {
p.logger.Error("Failed to initialize chain gateway client", zap.Error(err))
return nil, err
// Apply gateway connection settings from config
if gatewayCfg := cfg.ChainGateway; gatewayCfg != nil {
if gatewayCfg.DialTimeoutSeconds > 0 {
p.dialTimeout = time.Duration(gatewayCfg.DialTimeoutSeconds) * time.Second
}
if gatewayCfg.CallTimeoutSeconds > 0 {
p.callTimeout = time.Duration(gatewayCfg.CallTimeoutSeconds) * time.Second
}
p.insecure = gatewayCfg.Insecure
}
// Initialize discovery client
if err := p.initDiscoveryClient(cfg); err != nil {
p.logger.Warn("Failed to initialize discovery client", zap.Error(err))
// Not fatal - we can still work without discovery
}
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/"), api.Get, p.listWallets)
@@ -95,31 +111,22 @@ func CreateAPI(apiCtx eapi.API) (*WalletAPI, error) {
return p, nil
}
func (a *WalletAPI) initChainGateway(cfg *eapi.ChainGatewayConfig) error {
if cfg == nil {
return merrors.InvalidArgument("chain gateway configuration is not provided")
func (a *WalletAPI) initDiscoveryClient(cfg *eapi.Config) error {
if cfg == nil || cfg.Mw == nil {
return nil
}
cfg.Address = strings.TrimSpace(cfg.Address)
if cfg.Address == "" {
cfg.Address = strings.TrimSpace(os.Getenv(cfg.AddressEnv))
msgCfg := cfg.Mw.Messaging
if msgCfg.Driver == "" {
return nil
}
if cfg.Address == "" {
return merrors.InvalidArgument(fmt.Sprintf("chain gateway address is not specified and address env %s is empty", cfg.AddressEnv))
}
clientCfg := chaingatewayclient.Config{
Address: cfg.Address,
DialTimeout: time.Duration(cfg.DialTimeoutSeconds) * time.Second,
CallTimeout: time.Duration(cfg.CallTimeoutSeconds) * time.Second,
Insecure: cfg.Insecure,
}
client, err := chaingatewayclient.New(context.Background(), clientCfg)
broker, err := msg.CreateMessagingBroker(a.logger.Named("discovery_bus"), &msgCfg)
if err != nil {
return err
}
a.chainGateway = client
client, err := discovery.NewClient(a.logger, broker, nil, string(a.Name()))
if err != nil {
return err
}
a.discovery = client
return nil
}