Merge pull request 'cached gateway routing' (#543) from gw-542 into main
All checks were successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/billing_documents Pipeline was successful
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/discovery Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/gateway_chain Pipeline was successful
ci/woodpecker/push/gateway_mntx Pipeline was successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
ci/woodpecker/push/gateway_tron Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
ci/woodpecker/push/payments_methods Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful
ci/woodpecker/push/payments_quotation Pipeline was successful
All checks were successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/billing_documents Pipeline was successful
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/discovery Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/gateway_chain Pipeline was successful
ci/woodpecker/push/gateway_mntx Pipeline was successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
ci/woodpecker/push/gateway_tron Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
ci/woodpecker/push/payments_methods Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful
ci/woodpecker/push/payments_quotation Pipeline was successful
Reviewed-on: #543
This commit was merged in pull request #543.
This commit is contained in:
@@ -33,6 +33,7 @@ func TestResolver_GlobalFallbackWhenOrgMissing(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected fallback to global, got error: %v", err)
|
t.Fatalf("expected fallback to global, got error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if plan.OrganizationRef != nil && !plan.OrganizationRef.IsZero() {
|
if plan.OrganizationRef != nil && !plan.OrganizationRef.IsZero() {
|
||||||
t.Fatalf("expected global plan, got orgRef %s", plan.OrganizationRef.Hex())
|
t.Fatalf("expected global plan, got orgRef %s", plan.OrganizationRef.Hex())
|
||||||
}
|
}
|
||||||
@@ -158,6 +159,7 @@ func TestResolver_EffectiveDateFiltering(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected fallback to global, got error: %v", err)
|
t.Fatalf("expected fallback to global, got error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rule.RuleID != "current" {
|
if rule.RuleID != "current" {
|
||||||
t.Fatalf("expected current global rule, got %s", rule.RuleID)
|
t.Fatalf("expected current global rule, got %s", rule.RuleID)
|
||||||
}
|
}
|
||||||
@@ -182,6 +184,7 @@ func TestResolver_AppliesToFiltering(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected card rule, got error: %v", err)
|
t.Fatalf("expected card rule, got error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rule.RuleID != "card" {
|
if rule.RuleID != "card" {
|
||||||
t.Fatalf("expected card rule, got %s", rule.RuleID)
|
t.Fatalf("expected card rule, got %s", rule.RuleID)
|
||||||
}
|
}
|
||||||
@@ -216,6 +219,7 @@ func TestResolver_AppliesToFilteringSupportsListsAndWildcard(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected list match rule, got error: %v", err)
|
t.Fatalf("expected list match rule, got error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rule.RuleID != "network_multi" {
|
if rule.RuleID != "network_multi" {
|
||||||
t.Fatalf("expected network list rule, got %s", rule.RuleID)
|
t.Fatalf("expected network list rule, got %s", rule.RuleID)
|
||||||
}
|
}
|
||||||
@@ -224,6 +228,7 @@ func TestResolver_AppliesToFilteringSupportsListsAndWildcard(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected wildcard rule, got error: %v", err)
|
t.Fatalf("expected wildcard rule, got error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rule.RuleID != "asset_any" {
|
if rule.RuleID != "asset_any" {
|
||||||
t.Fatalf("expected asset wildcard rule, got %s", rule.RuleID)
|
t.Fatalf("expected asset wildcard rule, got %s", rule.RuleID)
|
||||||
}
|
}
|
||||||
@@ -232,6 +237,7 @@ func TestResolver_AppliesToFilteringSupportsListsAndWildcard(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("expected default rule, got error: %v", err)
|
t.Fatalf("expected default rule, got error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if rule.RuleID != "default" {
|
if rule.RuleID != "default" {
|
||||||
t.Fatalf("expected default rule, got %s", rule.RuleID)
|
t.Fatalf("expected default rule, got %s", rule.RuleID)
|
||||||
}
|
}
|
||||||
@@ -315,9 +321,11 @@ func (m *memoryPlansStore) FindActiveOrgPlan(_ context.Context, orgRef bson.Obje
|
|||||||
if plan == nil || plan.OrganizationRef == nil || plan.OrganizationRef.IsZero() || (*plan.OrganizationRef != orgRef) {
|
if plan == nil || plan.OrganizationRef == nil || plan.OrganizationRef.IsZero() || (*plan.OrganizationRef != orgRef) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if !plan.Active {
|
if !plan.Active {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if plan.EffectiveFrom.After(asOf) {
|
if plan.EffectiveFrom.After(asOf) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -328,9 +336,11 @@ func (m *memoryPlansStore) FindActiveOrgPlan(_ context.Context, orgRef bson.Obje
|
|||||||
|
|
||||||
matches = append(matches, plan)
|
matches = append(matches, plan)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(matches) == 0 {
|
if len(matches) == 0 {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(matches) > 1 {
|
if len(matches) > 1 {
|
||||||
return nil, storage.ErrConflictingFeePlans
|
return nil, storage.ErrConflictingFeePlans
|
||||||
}
|
}
|
||||||
@@ -349,18 +359,22 @@ func (m *memoryPlansStore) FindActiveGlobalPlan(_ context.Context, asOf time.Tim
|
|||||||
if !plan.Active {
|
if !plan.Active {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if plan.EffectiveFrom.After(asOf) {
|
if plan.EffectiveFrom.After(asOf) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if plan.EffectiveTo != nil && !plan.EffectiveTo.After(asOf) {
|
if plan.EffectiveTo != nil && !plan.EffectiveTo.After(asOf) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
matches = append(matches, plan)
|
matches = append(matches, plan)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(matches) == 0 {
|
if len(matches) == 0 {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(matches) > 1 {
|
if len(matches) > 1 {
|
||||||
return nil, storage.ErrConflictingFeePlans
|
return nil, storage.ErrConflictingFeePlans
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -479,6 +479,7 @@ func (s *Service) observePrecomputeFees(logger mlogger.Logger, err error, resp *
|
|||||||
if !expiresAt.IsZero() {
|
if !expiresAt.IsZero() {
|
||||||
logFields = append(logFields, zap.Time("expires_at", expiresAt))
|
logFields = append(logFields, zap.Time("expires_at", expiresAt))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warn("PrecomputeFees finished", append(logFields, zap.Error(err))...)
|
logger.Warn("PrecomputeFees finished", append(logFields, zap.Error(err))...)
|
||||||
|
|
||||||
|
|||||||
@@ -93,9 +93,11 @@ func TestQuoteFees_ComputesDerivedLines(t *testing.T) {
|
|||||||
if got := line.GetMoney().GetAmount(); got != "3.20" {
|
if got := line.GetMoney().GetAmount(); got != "3.20" {
|
||||||
t.Fatalf("expected fee amount 3.20, got %s", got)
|
t.Fatalf("expected fee amount 3.20, got %s", got)
|
||||||
}
|
}
|
||||||
|
|
||||||
if line.GetMoney().GetCurrency() != "USD" {
|
if line.GetMoney().GetCurrency() != "USD" {
|
||||||
t.Fatalf("expected currency USD, got %s", line.GetMoney().GetCurrency())
|
t.Fatalf("expected currency USD, got %s", line.GetMoney().GetCurrency())
|
||||||
}
|
}
|
||||||
|
|
||||||
if line.GetLedgerAccountRef() != "acct:fees" {
|
if line.GetLedgerAccountRef() != "acct:fees" {
|
||||||
t.Fatalf("unexpected ledger account ref %s", line.GetLedgerAccountRef())
|
t.Fatalf("unexpected ledger account ref %s", line.GetLedgerAccountRef())
|
||||||
}
|
}
|
||||||
@@ -111,9 +113,11 @@ func TestQuoteFees_ComputesDerivedLines(t *testing.T) {
|
|||||||
if applied.GetTaxCode() != "VAT" || applied.GetTaxRate() != "0.20" {
|
if applied.GetTaxCode() != "VAT" || applied.GetTaxRate() != "0.20" {
|
||||||
t.Fatalf("applied rule metadata mismatch: %+v", applied)
|
t.Fatalf("applied rule metadata mismatch: %+v", applied)
|
||||||
}
|
}
|
||||||
|
|
||||||
if applied.GetRounding() != moneyv1.RoundingMode_ROUND_HALF_UP {
|
if applied.GetRounding() != moneyv1.RoundingMode_ROUND_HALF_UP {
|
||||||
t.Fatalf("expected rounding HALF_UP, got %v", applied.GetRounding())
|
t.Fatalf("expected rounding HALF_UP, got %v", applied.GetRounding())
|
||||||
}
|
}
|
||||||
|
|
||||||
if applied.GetParameters()["scale"] != "2" {
|
if applied.GetParameters()["scale"] != "2" {
|
||||||
t.Fatalf("expected parameters to carry metadata scale, got %+v", applied.GetParameters())
|
t.Fatalf("expected parameters to carry metadata scale, got %+v", applied.GetParameters())
|
||||||
}
|
}
|
||||||
@@ -189,6 +193,7 @@ func TestQuoteFees_FiltersByAttributesAndDates(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("QuoteFees returned error: %v", err)
|
t.Fatalf("QuoteFees returned error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(resp.GetLines()) != 1 {
|
if len(resp.GetLines()) != 1 {
|
||||||
t.Fatalf("expected only base rule to fire, got %d lines", len(resp.GetLines()))
|
t.Fatalf("expected only base rule to fire, got %d lines", len(resp.GetLines()))
|
||||||
}
|
}
|
||||||
@@ -197,6 +202,7 @@ func TestQuoteFees_FiltersByAttributesAndDates(t *testing.T) {
|
|||||||
if line.GetLedgerAccountRef() != "acct:base" {
|
if line.GetLedgerAccountRef() != "acct:base" {
|
||||||
t.Fatalf("expected base rule to apply, got %s", line.GetLedgerAccountRef())
|
t.Fatalf("expected base rule to apply, got %s", line.GetLedgerAccountRef())
|
||||||
}
|
}
|
||||||
|
|
||||||
if line.GetMoney().GetAmount() != "5.00" {
|
if line.GetMoney().GetAmount() != "5.00" {
|
||||||
t.Fatalf("expected 5.00 amount, got %s", line.GetMoney().GetAmount())
|
t.Fatalf("expected 5.00 amount, got %s", line.GetMoney().GetAmount())
|
||||||
}
|
}
|
||||||
@@ -250,9 +256,11 @@ func TestQuoteFees_RoundingDown(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("QuoteFees returned error: %v", err)
|
t.Fatalf("QuoteFees returned error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(resp.GetLines()) != 1 {
|
if len(resp.GetLines()) != 1 {
|
||||||
t.Fatalf("expected single derived line, got %d", len(resp.GetLines()))
|
t.Fatalf("expected single derived line, got %d", len(resp.GetLines()))
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.GetLines()[0].GetMoney().GetAmount() != "0.01" {
|
if resp.GetLines()[0].GetMoney().GetAmount() != "0.01" {
|
||||||
t.Fatalf("expected rounding down to 0.01, got %s", resp.GetLines()[0].GetMoney().GetAmount())
|
t.Fatalf("expected rounding down to 0.01, got %s", resp.GetLines()[0].GetMoney().GetAmount())
|
||||||
}
|
}
|
||||||
@@ -317,15 +325,19 @@ func TestQuoteFees_UsesInjectedCalculator(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("QuoteFees returned error: %v", err)
|
t.Fatalf("QuoteFees returned error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !calc.called {
|
if !calc.called {
|
||||||
t.Fatalf("expected calculator to be invoked")
|
t.Fatalf("expected calculator to be invoked")
|
||||||
}
|
}
|
||||||
|
|
||||||
if calc.gotPlan != plan {
|
if calc.gotPlan != plan {
|
||||||
t.Fatalf("expected calculator to receive plan pointer")
|
t.Fatalf("expected calculator to receive plan pointer")
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(resp.GetLines()) != len(result.Lines) {
|
if len(resp.GetLines()) != len(result.Lines) {
|
||||||
t.Fatalf("expected %d lines, got %d", len(result.Lines), len(resp.GetLines()))
|
t.Fatalf("expected %d lines, got %d", len(result.Lines), len(resp.GetLines()))
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.GetLines()[0].GetLedgerAccountRef() != "acct:stub" {
|
if resp.GetLines()[0].GetLedgerAccountRef() != "acct:stub" {
|
||||||
t.Fatalf("unexpected ledger account in response: %s", resp.GetLines()[0].GetLedgerAccountRef())
|
t.Fatalf("unexpected ledger account in response: %s", resp.GetLines()[0].GetLedgerAccountRef())
|
||||||
}
|
}
|
||||||
@@ -405,6 +417,7 @@ func TestQuoteFees_PopulatesFxUsed(t *testing.T) {
|
|||||||
if fx.GetProvider() != "TestProvider" || fx.GetRate().GetValue() != "1.2300" {
|
if fx.GetProvider() != "TestProvider" || fx.GetRate().GetValue() != "1.2300" {
|
||||||
t.Fatalf("unexpected FxUsed payload: %+v", fx)
|
t.Fatalf("unexpected FxUsed payload: %+v", fx)
|
||||||
}
|
}
|
||||||
|
|
||||||
if fx.GetPair().GetBase() != "USD" || fx.GetPair().GetQuote() != "EUR" {
|
if fx.GetPair().GetBase() != "USD" || fx.GetPair().GetQuote() != "EUR" {
|
||||||
t.Fatalf("unexpected currency pair: %+v", fx.GetPair())
|
t.Fatalf("unexpected currency pair: %+v", fx.GetPair())
|
||||||
}
|
}
|
||||||
@@ -447,6 +460,7 @@ func (s *stubPlansStore) GetActivePlan(ctx context.Context, orgRef bson.ObjectID
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.FindActiveGlobalPlan(ctx, asOf)
|
return s.FindActiveGlobalPlan(ctx, asOf)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -454,15 +468,19 @@ func (s *stubPlansStore) FindActiveOrgPlan(_ context.Context, orgRef bson.Object
|
|||||||
if s.plan == nil {
|
if s.plan == nil {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.plan.OrganizationRef != nil) && (*s.plan.OrganizationRef != orgRef) {
|
if (s.plan.OrganizationRef != nil) && (*s.plan.OrganizationRef != orgRef) {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.plan.Active {
|
if !s.plan.Active {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.plan.EffectiveFrom.After(asOf) {
|
if s.plan.EffectiveFrom.After(asOf) {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.plan.EffectiveTo != nil && !s.plan.EffectiveTo.After(asOf) {
|
if s.plan.EffectiveTo != nil && !s.plan.EffectiveTo.After(asOf) {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
@@ -474,12 +492,15 @@ func (s *stubPlansStore) FindActiveGlobalPlan(_ context.Context, asOf time.Time)
|
|||||||
if s.globalPlan == nil {
|
if s.globalPlan == nil {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.globalPlan.Active {
|
if !s.globalPlan.Active {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.globalPlan.EffectiveFrom.After(asOf) {
|
if s.globalPlan.EffectiveFrom.After(asOf) {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.globalPlan.EffectiveTo != nil && !s.globalPlan.EffectiveTo.After(asOf) {
|
if s.globalPlan.EffectiveTo != nil && !s.globalPlan.EffectiveTo.After(asOf) {
|
||||||
return nil, storage.ErrFeePlanNotFound
|
return nil, storage.ErrFeePlanNotFound
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,11 +2,13 @@ package wallet
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
|
||||||
"github.com/tech/sendico/gateway/chain/storage/model"
|
"github.com/tech/sendico/gateway/chain/storage/model"
|
||||||
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
||||||
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
"github.com/tech/sendico/pkg/mservice"
|
"github.com/tech/sendico/pkg/mservice"
|
||||||
paginationv1 "github.com/tech/sendico/pkg/proto/common/pagination/v1"
|
paginationv1 "github.com/tech/sendico/pkg/proto/common/pagination/v1"
|
||||||
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
|
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
|
||||||
@@ -45,8 +47,15 @@ func (c *listManagedWalletsCommand) Execute(ctx context.Context, req *chainv1.Li
|
|||||||
|
|
||||||
result, err := c.deps.Storage.Wallets().List(ctx, filter)
|
result, err := c.deps.Storage.Wallets().List(ctx, filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.deps.Logger.Warn("Storage list failed", zap.Error(err))
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
return gsresponse.Auto[chainv1.ListManagedWalletsResponse](c.deps.Logger, mservice.ChainGateway, err)
|
result = &model.ManagedWalletList{}
|
||||||
|
} else {
|
||||||
|
c.deps.Logger.Warn("Storage list failed", zap.Error(err))
|
||||||
|
return gsresponse.Auto[chainv1.ListManagedWalletsResponse](c.deps.Logger, mservice.ChainGateway, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if result == nil {
|
||||||
|
result = &model.ManagedWalletList{}
|
||||||
}
|
}
|
||||||
|
|
||||||
protoWallets := make([]*chainv1.ManagedWallet, 0, len(result.Items))
|
protoWallets := make([]*chainv1.ManagedWallet, 0, len(result.Items))
|
||||||
|
|||||||
@@ -127,6 +127,20 @@ func TestListAccounts_OrganizationRefFilters(t *testing.T) {
|
|||||||
require.Equal(t, "org-1", orgField.GetStringValue())
|
require.Equal(t, "org-1", orgField.GetStringValue())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListAccounts_NoWalletsNotError(t *testing.T) {
|
||||||
|
baseRepo := newInMemoryRepository()
|
||||||
|
svc := newTestServiceWithRepository(t, &walletsNoDataRepository{inMemoryRepository: baseRepo})
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
resp, err := svc.ListAccounts(ctx, &connectorv1.ListAccountsRequest{
|
||||||
|
OrganizationRef: "org-1",
|
||||||
|
Kind: connectorv1.AccountKind_CHAIN_MANAGED_WALLET,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, resp)
|
||||||
|
require.Empty(t, resp.GetAccounts())
|
||||||
|
}
|
||||||
|
|
||||||
func TestSubmitTransfer_ManagedDestination(t *testing.T) {
|
func TestSubmitTransfer_ManagedDestination(t *testing.T) {
|
||||||
svc, repo := newTestService(t)
|
svc, repo := newTestService(t)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
@@ -253,6 +267,22 @@ func newInMemoryRepository() *inMemoryRepository {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type walletsNoDataRepository struct {
|
||||||
|
*inMemoryRepository
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *walletsNoDataRepository) Wallets() storage.WalletsStore {
|
||||||
|
return &walletsNoDataStore{WalletsStore: r.inMemoryRepository.wallets}
|
||||||
|
}
|
||||||
|
|
||||||
|
type walletsNoDataStore struct {
|
||||||
|
storage.WalletsStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *walletsNoDataStore) List(context.Context, model.ManagedWalletFilter) (*model.ManagedWalletList, error) {
|
||||||
|
return nil, merrors.NoData("no wallets")
|
||||||
|
}
|
||||||
|
|
||||||
func (r *inMemoryRepository) Ping(context.Context) error { return nil }
|
func (r *inMemoryRepository) Ping(context.Context) error { return nil }
|
||||||
func (r *inMemoryRepository) Wallets() storage.WalletsStore { return r.wallets }
|
func (r *inMemoryRepository) Wallets() storage.WalletsStore { return r.wallets }
|
||||||
func (r *inMemoryRepository) Transfers() storage.TransfersStore { return r.transfers }
|
func (r *inMemoryRepository) Transfers() storage.TransfersStore { return r.transfers }
|
||||||
@@ -625,6 +655,11 @@ func sanitizeLimit(requested int32, def, max int64) int64 {
|
|||||||
|
|
||||||
func newTestService(t *testing.T) (*Service, *inMemoryRepository) {
|
func newTestService(t *testing.T) (*Service, *inMemoryRepository) {
|
||||||
repo := newInMemoryRepository()
|
repo := newInMemoryRepository()
|
||||||
|
svc := newTestServiceWithRepository(t, repo)
|
||||||
|
return svc, repo
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestServiceWithRepository(t *testing.T, repo storage.Repository) *Service {
|
||||||
logger := zap.NewNop()
|
logger := zap.NewNop()
|
||||||
networks := []shared.Network{{
|
networks := []shared.Network{{
|
||||||
Name: "ethereum_mainnet",
|
Name: "ethereum_mainnet",
|
||||||
@@ -641,7 +676,7 @@ func newTestService(t *testing.T) (*Service, *inMemoryRepository) {
|
|||||||
WithServiceWallet(shared.ServiceWallet{Network: "ethereum_mainnet", Address: "0xservice"}),
|
WithServiceWallet(shared.ServiceWallet{Network: "ethereum_mainnet", Address: "0xservice"}),
|
||||||
WithDriverRegistry(driverRegistry),
|
WithDriverRegistry(driverRegistry),
|
||||||
)
|
)
|
||||||
return svc, repo
|
return svc
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeKeyManager struct{}
|
type fakeKeyManager struct{}
|
||||||
|
|||||||
@@ -183,9 +183,14 @@ func (w *Wallets) List(ctx context.Context, filter model.ManagedWalletFilter) (*
|
|||||||
query = query.Sort(repository.IDField(), true).Limit(&fetchLimit)
|
query = query.Sort(repository.IDField(), true).Limit(&fetchLimit)
|
||||||
|
|
||||||
wallets, listErr := mutil.GetObjects[model.ManagedWallet](ctx, w.logger, query, nil, w.walletRepo)
|
wallets, listErr := mutil.GetObjects[model.ManagedWallet](ctx, w.logger, query, nil, w.walletRepo)
|
||||||
if listErr != nil && !errors.Is(listErr, merrors.ErrNoData) {
|
if listErr != nil {
|
||||||
w.logger.Warn("Wallet list failed", append(fields, zap.Error(listErr))...)
|
if errors.Is(listErr, merrors.ErrNoData) {
|
||||||
return nil, listErr
|
wallets = make([]model.ManagedWallet, 0)
|
||||||
|
listErr = nil
|
||||||
|
} else {
|
||||||
|
w.logger.Warn("Wallet list failed", append(fields, zap.Error(listErr))...)
|
||||||
|
return nil, listErr
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nextCursor := ""
|
nextCursor := ""
|
||||||
@@ -204,7 +209,7 @@ func (w *Wallets) List(ctx context.Context, filter model.ManagedWalletFilter) (*
|
|||||||
zap.Int("count", len(result.Items)),
|
zap.Int("count", len(result.Items)),
|
||||||
zap.String("next_cursor", result.NextCursor),
|
zap.String("next_cursor", result.NextCursor),
|
||||||
)
|
)
|
||||||
if errors.Is(listErr, merrors.ErrNoData) {
|
if len(result.Items) == 0 {
|
||||||
w.logger.Debug("Wallet list empty", fields...)
|
w.logger.Debug("Wallet list empty", fields...)
|
||||||
} else {
|
} else {
|
||||||
w.logger.Debug("Wallet list fetched", fields...)
|
w.logger.Debug("Wallet list fetched", fields...)
|
||||||
|
|||||||
@@ -2,11 +2,13 @@ package wallet
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/tech/sendico/gateway/tron/shared"
|
"github.com/tech/sendico/gateway/tron/shared"
|
||||||
"github.com/tech/sendico/gateway/tron/storage/model"
|
"github.com/tech/sendico/gateway/tron/storage/model"
|
||||||
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
||||||
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
"github.com/tech/sendico/pkg/mservice"
|
"github.com/tech/sendico/pkg/mservice"
|
||||||
paginationv1 "github.com/tech/sendico/pkg/proto/common/pagination/v1"
|
paginationv1 "github.com/tech/sendico/pkg/proto/common/pagination/v1"
|
||||||
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
|
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
|
||||||
@@ -45,8 +47,15 @@ func (c *listManagedWalletsCommand) Execute(ctx context.Context, req *chainv1.Li
|
|||||||
|
|
||||||
result, err := c.deps.Storage.Wallets().List(ctx, filter)
|
result, err := c.deps.Storage.Wallets().List(ctx, filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.deps.Logger.Warn("Storage list failed", zap.Error(err))
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
return gsresponse.Auto[chainv1.ListManagedWalletsResponse](c.deps.Logger, mservice.ChainGateway, err)
|
result = &model.ManagedWalletList{}
|
||||||
|
} else {
|
||||||
|
c.deps.Logger.Warn("Storage list failed", zap.Error(err))
|
||||||
|
return gsresponse.Auto[chainv1.ListManagedWalletsResponse](c.deps.Logger, mservice.ChainGateway, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if result == nil {
|
||||||
|
result = &model.ManagedWalletList{}
|
||||||
}
|
}
|
||||||
|
|
||||||
protoWallets := make([]*chainv1.ManagedWallet, 0, len(result.Items))
|
protoWallets := make([]*chainv1.ManagedWallet, 0, len(result.Items))
|
||||||
|
|||||||
@@ -128,6 +128,20 @@ func TestListAccounts_OrganizationRefFilters(t *testing.T) {
|
|||||||
require.Equal(t, "org-1", orgField.GetStringValue())
|
require.Equal(t, "org-1", orgField.GetStringValue())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListAccounts_NoWalletsNotError(t *testing.T) {
|
||||||
|
baseRepo := newInMemoryRepository()
|
||||||
|
svc := newTestServiceWithRepository(t, &walletsNoDataRepository{inMemoryRepository: baseRepo})
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
resp, err := svc.ListAccounts(ctx, &connectorv1.ListAccountsRequest{
|
||||||
|
OrganizationRef: "org-1",
|
||||||
|
Kind: connectorv1.AccountKind_CHAIN_MANAGED_WALLET,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, resp)
|
||||||
|
require.Empty(t, resp.GetAccounts())
|
||||||
|
}
|
||||||
|
|
||||||
func TestSubmitTransfer_ManagedDestination(t *testing.T) {
|
func TestSubmitTransfer_ManagedDestination(t *testing.T) {
|
||||||
svc, repo := newTestService(t)
|
svc, repo := newTestService(t)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
@@ -256,6 +270,22 @@ func newInMemoryRepository() *inMemoryRepository {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type walletsNoDataRepository struct {
|
||||||
|
*inMemoryRepository
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *walletsNoDataRepository) Wallets() storage.WalletsStore {
|
||||||
|
return &walletsNoDataStore{WalletsStore: r.inMemoryRepository.wallets}
|
||||||
|
}
|
||||||
|
|
||||||
|
type walletsNoDataStore struct {
|
||||||
|
storage.WalletsStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *walletsNoDataStore) List(context.Context, model.ManagedWalletFilter) (*model.ManagedWalletList, error) {
|
||||||
|
return nil, merrors.NoData("no wallets")
|
||||||
|
}
|
||||||
|
|
||||||
func (r *inMemoryRepository) Ping(context.Context) error { return nil }
|
func (r *inMemoryRepository) Ping(context.Context) error { return nil }
|
||||||
func (r *inMemoryRepository) Wallets() storage.WalletsStore { return r.wallets }
|
func (r *inMemoryRepository) Wallets() storage.WalletsStore { return r.wallets }
|
||||||
func (r *inMemoryRepository) Transfers() storage.TransfersStore { return r.transfers }
|
func (r *inMemoryRepository) Transfers() storage.TransfersStore { return r.transfers }
|
||||||
@@ -628,6 +658,11 @@ func sanitizeLimit(requested int32, def, max int64) int64 {
|
|||||||
|
|
||||||
func newTestService(t *testing.T) (*Service, *inMemoryRepository) {
|
func newTestService(t *testing.T) (*Service, *inMemoryRepository) {
|
||||||
repo := newInMemoryRepository()
|
repo := newInMemoryRepository()
|
||||||
|
svc := newTestServiceWithRepository(t, repo)
|
||||||
|
return svc, repo
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestServiceWithRepository(t *testing.T, repo storage.Repository) *Service {
|
||||||
logger := zap.NewNop()
|
logger := zap.NewNop()
|
||||||
networks := []shared.Network{{
|
networks := []shared.Network{{
|
||||||
Name: pmodel.ChainNetworkTronMainnet,
|
Name: pmodel.ChainNetworkTronMainnet,
|
||||||
@@ -644,7 +679,7 @@ func newTestService(t *testing.T) (*Service, *inMemoryRepository) {
|
|||||||
WithServiceWallet(shared.ServiceWallet{Network: pmodel.ChainNetworkTronMainnet, Address: "TServiceWalletAddress"}),
|
WithServiceWallet(shared.ServiceWallet{Network: pmodel.ChainNetworkTronMainnet, Address: "TServiceWalletAddress"}),
|
||||||
WithDriverRegistry(driverRegistry),
|
WithDriverRegistry(driverRegistry),
|
||||||
)
|
)
|
||||||
return svc, repo
|
return svc
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeKeyManager struct{}
|
type fakeKeyManager struct{}
|
||||||
|
|||||||
@@ -187,9 +187,14 @@ func (w *Wallets) List(ctx context.Context, filter model.ManagedWalletFilter) (*
|
|||||||
query = query.Sort(repository.IDField(), true).Limit(&fetchLimit)
|
query = query.Sort(repository.IDField(), true).Limit(&fetchLimit)
|
||||||
|
|
||||||
wallets, listErr := mutil.GetObjects[model.ManagedWallet](ctx, w.logger, query, nil, w.walletRepo)
|
wallets, listErr := mutil.GetObjects[model.ManagedWallet](ctx, w.logger, query, nil, w.walletRepo)
|
||||||
if listErr != nil && !errors.Is(listErr, merrors.ErrNoData) {
|
if listErr != nil {
|
||||||
w.logger.Warn("Wallet list failed", append(fields, zap.Error(listErr))...)
|
if errors.Is(listErr, merrors.ErrNoData) {
|
||||||
return nil, listErr
|
wallets = make([]model.ManagedWallet, 0)
|
||||||
|
listErr = nil
|
||||||
|
} else {
|
||||||
|
w.logger.Warn("Wallet list failed", append(fields, zap.Error(listErr))...)
|
||||||
|
return nil, listErr
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nextCursor := ""
|
nextCursor := ""
|
||||||
@@ -208,7 +213,7 @@ func (w *Wallets) List(ctx context.Context, filter model.ManagedWalletFilter) (*
|
|||||||
zap.Int("count", len(result.Items)),
|
zap.Int("count", len(result.Items)),
|
||||||
zap.String("next_cursor", result.NextCursor),
|
zap.String("next_cursor", result.NextCursor),
|
||||||
)
|
)
|
||||||
if errors.Is(listErr, merrors.ErrNoData) {
|
if len(result.Items) == 0 {
|
||||||
w.logger.Debug("Wallet list empty", fields...)
|
w.logger.Debug("Wallet list empty", fields...)
|
||||||
} else {
|
} else {
|
||||||
w.logger.Debug("Wallet list fetched", fields...)
|
w.logger.Debug("Wallet list fetched", fields...)
|
||||||
|
|||||||
12
api/pkg/db/chainwalletroutes/routes.go
Normal file
12
api/pkg/db/chainwalletroutes/routes.go
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
package chainwalletroutes
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/pkg/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DB interface {
|
||||||
|
Get(ctx context.Context, organizationRef string, walletRef string) (*model.ChainWalletRoute, error)
|
||||||
|
Upsert(ctx context.Context, route *model.ChainWalletRoute) error
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"github.com/tech/sendico/pkg/auth"
|
"github.com/tech/sendico/pkg/auth"
|
||||||
"github.com/tech/sendico/pkg/db/account"
|
"github.com/tech/sendico/pkg/db/account"
|
||||||
"github.com/tech/sendico/pkg/db/chainassets"
|
"github.com/tech/sendico/pkg/db/chainassets"
|
||||||
|
"github.com/tech/sendico/pkg/db/chainwalletroutes"
|
||||||
mongoimpl "github.com/tech/sendico/pkg/db/internal/mongo"
|
mongoimpl "github.com/tech/sendico/pkg/db/internal/mongo"
|
||||||
"github.com/tech/sendico/pkg/db/invitation"
|
"github.com/tech/sendico/pkg/db/invitation"
|
||||||
"github.com/tech/sendico/pkg/db/organization"
|
"github.com/tech/sendico/pkg/db/organization"
|
||||||
@@ -22,6 +23,7 @@ type Factory interface {
|
|||||||
NewRefreshTokensDB() (refreshtokens.DB, error)
|
NewRefreshTokensDB() (refreshtokens.DB, error)
|
||||||
|
|
||||||
NewChainAsstesDB() (chainassets.DB, error)
|
NewChainAsstesDB() (chainassets.DB, error)
|
||||||
|
NewChainWalletRoutesDB() (chainwalletroutes.DB, error)
|
||||||
|
|
||||||
NewAccountDB() (account.DB, error)
|
NewAccountDB() (account.DB, error)
|
||||||
NewOrganizationDB() (organization.DB, error)
|
NewOrganizationDB() (organization.DB, error)
|
||||||
|
|||||||
111
api/pkg/db/internal/mongo/chainwalletroutesdb/db.go
Normal file
111
api/pkg/db/internal/mongo/chainwalletroutesdb/db.go
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
package chainwalletroutesdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/pkg/db/repository"
|
||||||
|
ri "github.com/tech/sendico/pkg/db/repository/index"
|
||||||
|
"github.com/tech/sendico/pkg/db/template"
|
||||||
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
|
"github.com/tech/sendico/pkg/mlogger"
|
||||||
|
"github.com/tech/sendico/pkg/model"
|
||||||
|
"github.com/tech/sendico/pkg/mservice"
|
||||||
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ChainWalletRoutesDB struct {
|
||||||
|
template.DBImp[*model.ChainWalletRoute]
|
||||||
|
}
|
||||||
|
|
||||||
|
func Create(logger mlogger.Logger, db *mongo.Database) (*ChainWalletRoutesDB, error) {
|
||||||
|
p := &ChainWalletRoutesDB{
|
||||||
|
DBImp: *template.Create[*model.ChainWalletRoute](logger, mservice.WalletRoutes, db),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.Repository.CreateIndex(&ri.Definition{
|
||||||
|
Name: "idx_org_wallet_unique",
|
||||||
|
Unique: true,
|
||||||
|
Keys: []ri.Key{
|
||||||
|
{Field: "organizationRef", Sort: ri.Asc},
|
||||||
|
{Field: "walletRef", Sort: ri.Asc},
|
||||||
|
},
|
||||||
|
}); err != nil {
|
||||||
|
p.Logger.Error("Failed to create unique organization/wallet route index", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.Repository.CreateIndex(&ri.Definition{
|
||||||
|
Name: "idx_wallet_ref",
|
||||||
|
Keys: []ri.Key{
|
||||||
|
{Field: "walletRef", Sort: ri.Asc},
|
||||||
|
},
|
||||||
|
}); err != nil {
|
||||||
|
p.Logger.Error("Failed to create wallet route lookup index", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *ChainWalletRoutesDB) Get(ctx context.Context, organizationRef string, walletRef string) (*model.ChainWalletRoute, error) {
|
||||||
|
org := model.ChainWalletRoute{OrganizationRef: organizationRef, WalletRef: walletRef}
|
||||||
|
org.Normalize()
|
||||||
|
if org.OrganizationRef == "" || org.WalletRef == "" {
|
||||||
|
return nil, merrors.InvalidArgument("wallet route requires organizationRef and walletRef")
|
||||||
|
}
|
||||||
|
|
||||||
|
var route model.ChainWalletRoute
|
||||||
|
query := repository.Query().
|
||||||
|
Filter(repository.Field("organizationRef"), org.OrganizationRef).
|
||||||
|
Filter(repository.Field("walletRef"), org.WalletRef)
|
||||||
|
return &route, db.FindOne(ctx, query, &route)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *ChainWalletRoutesDB) Upsert(ctx context.Context, route *model.ChainWalletRoute) error {
|
||||||
|
if route == nil {
|
||||||
|
return merrors.InvalidArgument("wallet route is nil")
|
||||||
|
}
|
||||||
|
route.Normalize()
|
||||||
|
if route.OrganizationRef == "" || route.WalletRef == "" {
|
||||||
|
return merrors.InvalidArgument("wallet route requires organizationRef and walletRef")
|
||||||
|
}
|
||||||
|
if route.Network == "" && route.GatewayID == "" {
|
||||||
|
return merrors.InvalidArgument("wallet route requires network or gatewayId")
|
||||||
|
}
|
||||||
|
|
||||||
|
existing, err := db.Get(ctx, route.OrganizationRef, route.WalletRef)
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, merrors.ErrNoData) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if createErr := db.Create(ctx, route); createErr != nil {
|
||||||
|
if errors.Is(createErr, merrors.ErrDataConflict) {
|
||||||
|
existing, err = db.Get(ctx, route.OrganizationRef, route.WalletRef)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return createErr
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
changed := false
|
||||||
|
if route.Network != "" && existing.Network != route.Network {
|
||||||
|
existing.Network = route.Network
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
if route.GatewayID != "" && existing.GatewayID != route.GatewayID {
|
||||||
|
existing.GatewayID = route.GatewayID
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
if !changed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.Update(ctx, existing)
|
||||||
|
}
|
||||||
@@ -11,8 +11,10 @@ import (
|
|||||||
"github.com/tech/sendico/pkg/auth"
|
"github.com/tech/sendico/pkg/auth"
|
||||||
"github.com/tech/sendico/pkg/db/account"
|
"github.com/tech/sendico/pkg/db/account"
|
||||||
"github.com/tech/sendico/pkg/db/chainassets"
|
"github.com/tech/sendico/pkg/db/chainassets"
|
||||||
|
"github.com/tech/sendico/pkg/db/chainwalletroutes"
|
||||||
"github.com/tech/sendico/pkg/db/internal/mongo/accountdb"
|
"github.com/tech/sendico/pkg/db/internal/mongo/accountdb"
|
||||||
"github.com/tech/sendico/pkg/db/internal/mongo/chainassetsdb"
|
"github.com/tech/sendico/pkg/db/internal/mongo/chainassetsdb"
|
||||||
|
"github.com/tech/sendico/pkg/db/internal/mongo/chainwalletroutesdb"
|
||||||
"github.com/tech/sendico/pkg/db/internal/mongo/invitationdb"
|
"github.com/tech/sendico/pkg/db/internal/mongo/invitationdb"
|
||||||
"github.com/tech/sendico/pkg/db/internal/mongo/organizationdb"
|
"github.com/tech/sendico/pkg/db/internal/mongo/organizationdb"
|
||||||
"github.com/tech/sendico/pkg/db/internal/mongo/policiesdb"
|
"github.com/tech/sendico/pkg/db/internal/mongo/policiesdb"
|
||||||
@@ -308,6 +310,10 @@ func (db *DB) NewChainAsstesDB() (chainassets.DB, error) {
|
|||||||
return chainassetsdb.Create(db.logger, db.db())
|
return chainassetsdb.Create(db.logger, db.db())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) NewChainWalletRoutesDB() (chainwalletroutes.DB, error) {
|
||||||
|
return chainwalletroutesdb.Create(db.logger, db.db())
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) Permissions() auth.Provider {
|
func (db *DB) Permissions() auth.Provider {
|
||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,6 +35,41 @@ type envConfig struct {
|
|||||||
|
|
||||||
const defaultConsumerBufferSize = 1024
|
const defaultConsumerBufferSize = 1024
|
||||||
|
|
||||||
|
func sanitizeNATSURL(rawURL string) string {
|
||||||
|
if rawURL == "" {
|
||||||
|
return rawURL
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := strings.Split(rawURL, ",")
|
||||||
|
sanitized := make([]string, 0, len(parts))
|
||||||
|
for _, part := range parts {
|
||||||
|
trimmed := strings.TrimSpace(part)
|
||||||
|
if trimmed == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !strings.Contains(trimmed, "://") {
|
||||||
|
sanitized = append(sanitized, trimmed)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
parsed, err := url.Parse(trimmed)
|
||||||
|
if err != nil {
|
||||||
|
sanitized = append(sanitized, trimmed)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if parsed.User == nil {
|
||||||
|
sanitized = append(sanitized, trimmed)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sanitized = append(sanitized, parsed.Redacted())
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(sanitized) == 0 {
|
||||||
|
return strings.TrimSpace(rawURL)
|
||||||
|
}
|
||||||
|
return strings.Join(sanitized, ",")
|
||||||
|
}
|
||||||
|
|
||||||
// loadEnv gathers and validates connection details from environment variables
|
// loadEnv gathers and validates connection details from environment variables
|
||||||
// listed in the Settings struct. Invalid or missing values surface as a typed
|
// listed in the Settings struct. Invalid or missing values surface as a typed
|
||||||
// InvalidArgument error so callers can decide how to handle them.
|
// InvalidArgument error so callers can decide how to handle them.
|
||||||
@@ -109,6 +144,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
|
|||||||
}
|
}
|
||||||
natsURL = u.String()
|
natsURL = u.String()
|
||||||
}
|
}
|
||||||
|
sanitizedNATSURL := sanitizeNATSURL(natsURL)
|
||||||
|
|
||||||
opts := []nats.Option{
|
opts := []nats.Option{
|
||||||
nats.Name(settings.NATSName),
|
nats.Name(settings.NATSName),
|
||||||
@@ -120,7 +156,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
|
|||||||
zap.String("broker", settings.NATSName),
|
zap.String("broker", settings.NATSName),
|
||||||
}
|
}
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
fields = append(fields, zap.String("connected_url", conn.ConnectedUrl()))
|
fields = append(fields, zap.String("connected_url", sanitizeNATSURL(conn.ConnectedUrl())))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fields = append(fields, zap.Error(err))
|
fields = append(fields, zap.Error(err))
|
||||||
@@ -132,7 +168,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
|
|||||||
zap.String("broker", settings.NATSName),
|
zap.String("broker", settings.NATSName),
|
||||||
}
|
}
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
fields = append(fields, zap.String("connected_url", conn.ConnectedUrl()))
|
fields = append(fields, zap.String("connected_url", sanitizeNATSURL(conn.ConnectedUrl())))
|
||||||
}
|
}
|
||||||
l.Info("Reconnected to NATS", fields...)
|
l.Info("Reconnected to NATS", fields...)
|
||||||
}),
|
}),
|
||||||
@@ -142,7 +178,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
|
|||||||
}
|
}
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
if url := conn.ConnectedUrl(); url != "" {
|
if url := conn.ConnectedUrl(); url != "" {
|
||||||
fields = append(fields, zap.String("connected_url", url))
|
fields = append(fields, zap.String("connected_url", sanitizeNATSURL(url)))
|
||||||
}
|
}
|
||||||
if err := conn.LastError(); err != nil {
|
if err := conn.LastError(); err != nil {
|
||||||
fields = append(fields, zap.Error(err))
|
fields = append(fields, zap.Error(err))
|
||||||
@@ -172,7 +208,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
if res.nc, err = nats.Connect(natsURL, opts...); err != nil {
|
if res.nc, err = nats.Connect(natsURL, opts...); err != nil {
|
||||||
l.Error("Failed to connect to NATS", zap.String("url", natsURL), zap.Error(err))
|
l.Error("Failed to connect to NATS", zap.String("url", sanitizedNATSURL), zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if res.js, err = res.nc.JetStream(); err != nil {
|
if res.js, err = res.nc.JetStream(); err != nil {
|
||||||
@@ -180,7 +216,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("Connected to NATS", zap.String("broker", settings.NATSName),
|
logger.Info("Connected to NATS", zap.String("broker", settings.NATSName),
|
||||||
zap.String("url", natsURL))
|
zap.String("url", sanitizedNATSURL))
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
58
api/pkg/messaging/internal/natsb/broker_test.go
Normal file
58
api/pkg/messaging/internal/natsb/broker_test.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package natsb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSanitizeNATSURL(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
t.Run("redacts single URL credentials", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
raw := "nats://alice:supersecret@localhost:4222"
|
||||||
|
sanitized := sanitizeNATSURL(raw)
|
||||||
|
|
||||||
|
if strings.Contains(sanitized, "supersecret") {
|
||||||
|
t.Fatalf("expected password to be redacted, got %q", sanitized)
|
||||||
|
}
|
||||||
|
if !strings.Contains(sanitized, "alice:xxxxx@") {
|
||||||
|
t.Fatalf("expected redacted URL to keep username, got %q", sanitized)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("keeps URL without credentials unchanged", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
raw := "nats://localhost:4222"
|
||||||
|
sanitized := sanitizeNATSURL(raw)
|
||||||
|
if sanitized != raw {
|
||||||
|
t.Fatalf("expected URL without credentials to remain unchanged, got %q", sanitized)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("redacts each URL in server list", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
raw := " nats://alice:one@localhost:4222, nats://bob:two@localhost:4223 "
|
||||||
|
sanitized := sanitizeNATSURL(raw)
|
||||||
|
|
||||||
|
if strings.Contains(sanitized, "one") || strings.Contains(sanitized, "two") {
|
||||||
|
t.Fatalf("expected passwords to be redacted, got %q", sanitized)
|
||||||
|
}
|
||||||
|
if !strings.Contains(sanitized, "alice:xxxxx@") || !strings.Contains(sanitized, "bob:xxxxx@") {
|
||||||
|
t.Fatalf("expected both URLs to be redacted, got %q", sanitized)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("returns invalid URL as-is", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
raw := "not a url"
|
||||||
|
sanitized := sanitizeNATSURL(raw)
|
||||||
|
if sanitized != raw {
|
||||||
|
t.Fatalf("expected invalid URL to remain unchanged, got %q", sanitized)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
29
api/pkg/model/chainwalletroute.go
Normal file
29
api/pkg/model/chainwalletroute.go
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/pkg/db/storable"
|
||||||
|
"github.com/tech/sendico/pkg/mservice"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ChainWalletRoute stores authoritative wallet-to-gateway routing metadata.
|
||||||
|
type ChainWalletRoute struct {
|
||||||
|
storable.Base `bson:",inline" json:",inline"`
|
||||||
|
|
||||||
|
OrganizationRef string `bson:"organizationRef" json:"organizationRef"`
|
||||||
|
WalletRef string `bson:"walletRef" json:"walletRef"`
|
||||||
|
Network string `bson:"network" json:"network"`
|
||||||
|
GatewayID string `bson:"gatewayId,omitempty" json:"gatewayId,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*ChainWalletRoute) Collection() string {
|
||||||
|
return mservice.WalletRoutes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ChainWalletRoute) Normalize() {
|
||||||
|
r.OrganizationRef = strings.TrimSpace(r.OrganizationRef)
|
||||||
|
r.WalletRef = strings.TrimSpace(r.WalletRef)
|
||||||
|
r.Network = strings.ToLower(strings.TrimSpace(r.Network))
|
||||||
|
r.GatewayID = strings.TrimSpace(r.GatewayID)
|
||||||
|
}
|
||||||
@@ -52,12 +52,13 @@ const (
|
|||||||
Tenants Type = "tenants" // Represents tenants managed in the system
|
Tenants Type = "tenants" // Represents tenants managed in the system
|
||||||
VerificationTokens Type = "verification_tokens" //Represents verification tokens managed in the system
|
VerificationTokens Type = "verification_tokens" //Represents verification tokens managed in the system
|
||||||
Wallets Type = "wallets" // Represents workflows for tasks or projects
|
Wallets Type = "wallets" // Represents workflows for tasks or projects
|
||||||
|
WalletRoutes Type = "wallet_routes" // Represents authoritative chain wallet gateway routing
|
||||||
Workflows Type = "workflows" // Represents workflows for tasks or projects
|
Workflows Type = "workflows" // Represents workflows for tasks or projects
|
||||||
)
|
)
|
||||||
|
|
||||||
func StringToSType(s string) (Type, error) {
|
func StringToSType(s string) (Type, error) {
|
||||||
switch Type(s) {
|
switch Type(s) {
|
||||||
case Accounts, Verification, Amplitude, Site, Changes, Clients, ChainGateway, ChainWallets, ChainWalletBalances,
|
case Accounts, Verification, Amplitude, Site, Changes, Clients, ChainGateway, ChainWallets, WalletRoutes, ChainWalletBalances,
|
||||||
ChainTransfers, ChainDeposits, MntxGateway, PaymentGateway, FXOracle, FeePlans, BillingDocuments, FilterProjects, Invitations, Invoices, Logo, Ledger,
|
ChainTransfers, ChainDeposits, MntxGateway, PaymentGateway, FXOracle, FeePlans, BillingDocuments, FilterProjects, Invitations, Invoices, Logo, Ledger,
|
||||||
LedgerAccounts, LedgerBalances, LedgerEntries, LedgerOutbox, LedgerParties, LedgerPlines, Notifications,
|
LedgerAccounts, LedgerBalances, LedgerEntries, LedgerOutbox, LedgerParties, LedgerPlines, Notifications,
|
||||||
Organizations, Payments, PaymentRoutes, PaymentPlanTemplates, PaymentOrchestrator, PaymentMethods, Permissions, Policies, PolicyAssignements,
|
Organizations, Payments, PaymentRoutes, PaymentPlanTemplates, PaymentOrchestrator, PaymentMethods, Permissions, Policies, PolicyAssignements,
|
||||||
|
|||||||
@@ -64,9 +64,83 @@ func (a *WalletAPI) getWalletBalance(r *http.Request, account *model.Account, to
|
|||||||
a.logger.Debug("No CRYPTO rail gateways found in discovery")
|
a.logger.Debug("No CRYPTO rail gateways found in discovery")
|
||||||
return response.Auto(a.logger, a.Name(), merrors.NoData("no crypto gateways available"))
|
return response.Auto(a.logger, a.Name(), merrors.NoData("no crypto gateways available"))
|
||||||
}
|
}
|
||||||
|
a.logger.Debug("Resolved CRYPTO gateways for wallet balance lookup",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.Int("gateway_count", len(cryptoGateways)))
|
||||||
|
|
||||||
// Query all gateways in parallel to find the wallet balance
|
route, routeErr := a.walletRoute(ctx, orgRef.Hex(), walletRef)
|
||||||
bal, err := a.queryBalanceFromGateways(ctx, cryptoGateways, walletRef)
|
if routeErr != nil {
|
||||||
|
a.logger.Warn("Failed to resolve wallet route", zap.Error(routeErr), zap.String("wallet_ref", walletRef), zap.String("organization_ref", orgRef.Hex()))
|
||||||
|
}
|
||||||
|
if route != nil {
|
||||||
|
a.logger.Debug("Resolved stored wallet route",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("route_network", route.Network),
|
||||||
|
zap.String("route_gateway_id", route.GatewayID))
|
||||||
|
preferred := findGatewayForRoute(cryptoGateways, route)
|
||||||
|
if preferred != nil {
|
||||||
|
a.logger.Debug("Using preferred gateway from stored wallet route",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("gateway_id", preferred.ID),
|
||||||
|
zap.String("network", preferred.Network),
|
||||||
|
zap.String("invoke_uri", preferred.InvokeURI))
|
||||||
|
bal, preferredErr := a.queryGatewayBalance(ctx, *preferred, walletRef)
|
||||||
|
if preferredErr == nil && bal != nil {
|
||||||
|
a.rememberWalletRoute(ctx, orgRef.Hex(), walletRef, preferred.Network, preferred.ID)
|
||||||
|
a.logger.Debug("Wallet balance resolved via preferred gateway",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("gateway_id", preferred.ID),
|
||||||
|
zap.String("network", preferred.Network))
|
||||||
|
return sresponse.WalletBalanceFromConnector(a.logger, bal, token)
|
||||||
|
}
|
||||||
|
|
||||||
|
if preferredErr != nil {
|
||||||
|
a.logger.Debug("Preferred gateway balance lookup failed, falling back to fan-out",
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("network", route.Network),
|
||||||
|
zap.String("gateway_id", preferred.ID),
|
||||||
|
zap.String("invoke_uri", preferred.InvokeURI),
|
||||||
|
zap.Error(preferredErr))
|
||||||
|
} else {
|
||||||
|
a.logger.Debug("Preferred gateway returned empty balance, falling back to fan-out",
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("network", route.Network),
|
||||||
|
zap.String("gateway_id", preferred.ID),
|
||||||
|
zap.String("invoke_uri", preferred.InvokeURI))
|
||||||
|
}
|
||||||
|
|
||||||
|
cryptoGateways = dropGatewayByInvokeURI(cryptoGateways, preferred.InvokeURI)
|
||||||
|
if len(cryptoGateways) == 0 {
|
||||||
|
if preferredErr != nil {
|
||||||
|
a.logger.Warn("Failed to fetch wallet balance from preferred gateway", zap.Error(preferredErr), zap.String("wallet_ref", walletRef))
|
||||||
|
return response.Auto(a.logger, a.Name(), preferredErr)
|
||||||
|
}
|
||||||
|
a.logger.Warn("Wallet balance not found on preferred gateway", zap.String("wallet_ref", walletRef))
|
||||||
|
return response.Auto(a.logger, a.Name(), merrors.NoData("wallet not found"))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
a.logger.Warn("Stored wallet route did not match any healthy discovery gateway",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("route_network", route.Network),
|
||||||
|
zap.String("route_gateway_id", route.GatewayID))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
a.logger.Debug("Stored wallet route not found; using gateway fallback",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.String("wallet_ref", walletRef))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fall back to querying remaining gateways in parallel.
|
||||||
|
a.logger.Debug("Starting fallback wallet balance fan-out",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.Int("gateway_count", len(cryptoGateways)))
|
||||||
|
bal, err := a.queryBalanceFromGateways(ctx, cryptoGateways, orgRef.Hex(), walletRef)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.logger.Warn("Failed to fetch wallet balance from gateways", zap.Error(err), zap.String("wallet_ref", walletRef))
|
a.logger.Warn("Failed to fetch wallet balance from gateways", zap.Error(err), zap.String("wallet_ref", walletRef))
|
||||||
return response.Auto(a.logger, a.Name(), err)
|
return response.Auto(a.logger, a.Name(), err)
|
||||||
@@ -80,11 +154,18 @@ func (a *WalletAPI) getWalletBalance(r *http.Request, account *model.Account, to
|
|||||||
return sresponse.WalletBalanceFromConnector(a.logger, bal, token)
|
return sresponse.WalletBalanceFromConnector(a.logger, bal, token)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []discovery.GatewaySummary, walletRef string) (*connectorv1.Balance, error) {
|
func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []discovery.GatewaySummary, organizationRef string, walletRef string) (*connectorv1.Balance, error) {
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var result *connectorv1.Balance
|
var result *connectorv1.Balance
|
||||||
var lastErr error
|
var lastErr error
|
||||||
|
selectedGatewayID := ""
|
||||||
|
selectedNetwork := ""
|
||||||
|
|
||||||
|
a.logger.Debug("Querying wallet balance across gateways",
|
||||||
|
zap.String("organization_ref", organizationRef),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.Int("gateway_count", len(gateways)))
|
||||||
|
|
||||||
for _, gw := range gateways {
|
for _, gw := range gateways {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@@ -108,6 +189,9 @@ func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []dis
|
|||||||
mu.Lock()
|
mu.Lock()
|
||||||
if result == nil {
|
if result == nil {
|
||||||
result = bal
|
result = bal
|
||||||
|
a.rememberWalletRoute(ctx, organizationRef, walletRef, gateway.Network, gateway.ID)
|
||||||
|
selectedGatewayID = gateway.ID
|
||||||
|
selectedNetwork = gateway.Network
|
||||||
a.logger.Debug("Found wallet balance on gateway",
|
a.logger.Debug("Found wallet balance on gateway",
|
||||||
zap.String("gateway_id", gateway.ID),
|
zap.String("gateway_id", gateway.ID),
|
||||||
zap.String("network", gateway.Network),
|
zap.String("network", gateway.Network),
|
||||||
@@ -121,11 +205,23 @@ func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []dis
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if result != nil {
|
if result != nil {
|
||||||
|
a.logger.Debug("Wallet balance fan-out completed with result",
|
||||||
|
zap.String("organization_ref", organizationRef),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("gateway_id", selectedGatewayID),
|
||||||
|
zap.String("network", selectedNetwork))
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
if lastErr != nil {
|
if lastErr != nil {
|
||||||
|
a.logger.Debug("Wallet balance fan-out completed with errors",
|
||||||
|
zap.String("organization_ref", organizationRef),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.Error(lastErr))
|
||||||
return nil, lastErr
|
return nil, lastErr
|
||||||
}
|
}
|
||||||
|
a.logger.Debug("Wallet balance fan-out completed without result",
|
||||||
|
zap.String("organization_ref", organizationRef),
|
||||||
|
zap.String("wallet_ref", walletRef))
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -80,6 +80,12 @@ func (a *WalletAPI) create(r *http.Request, account *model.Account, token *sresp
|
|||||||
zap.String("chain", string(sr.Asset.Chain)))
|
zap.String("chain", string(sr.Asset.Chain)))
|
||||||
return response.Auto(a.logger, a.Name(), merrors.InvalidArgument("no gateway available for network: "+networkName))
|
return response.Auto(a.logger, a.Name(), merrors.InvalidArgument("no gateway available for network: "+networkName))
|
||||||
}
|
}
|
||||||
|
a.logger.Debug("Selected gateway for wallet creation",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.String("network", networkName),
|
||||||
|
zap.String("gateway_id", gateway.ID),
|
||||||
|
zap.String("gateway_network", gateway.Network),
|
||||||
|
zap.String("invoke_uri", gateway.InvokeURI))
|
||||||
|
|
||||||
var ownerRef string
|
var ownerRef string
|
||||||
if sr.OwnerRef != nil && !sr.OwnerRef.IsZero() {
|
if sr.OwnerRef != nil && !sr.OwnerRef.IsZero() {
|
||||||
@@ -125,6 +131,13 @@ func (a *WalletAPI) create(r *http.Request, account *model.Account, token *sresp
|
|||||||
a.logger.Info("Managed wallet created for organization", mzap.ObjRef("organization_ref", orgRef),
|
a.logger.Info("Managed wallet created for organization", mzap.ObjRef("organization_ref", orgRef),
|
||||||
zap.String("wallet_ref", walletRef), mzap.StorableRef(account),
|
zap.String("wallet_ref", walletRef), mzap.StorableRef(account),
|
||||||
zap.String("gateway_id", gateway.ID), zap.String("network", gateway.Network))
|
zap.String("gateway_id", gateway.ID), zap.String("network", gateway.Network))
|
||||||
|
a.rememberWalletRoute(ctx, orgRef.Hex(), walletRef, networkName, gateway.ID)
|
||||||
|
a.rememberWalletRoute(ctx, orgRef.Hex(), walletRef, gateway.Network, gateway.ID)
|
||||||
|
a.logger.Debug("Persisted wallet route after wallet creation",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("network", networkName),
|
||||||
|
zap.String("gateway_id", gateway.ID))
|
||||||
|
|
||||||
return sresponse.Success(a.logger, token)
|
return sresponse.Success(a.logger, token)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,6 +58,9 @@ func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *
|
|||||||
a.logger.Debug("No CRYPTO rail gateways found in discovery")
|
a.logger.Debug("No CRYPTO rail gateways found in discovery")
|
||||||
return sresponse.Wallets(a.logger, nil, token)
|
return sresponse.Wallets(a.logger, nil, token)
|
||||||
}
|
}
|
||||||
|
a.logger.Debug("Resolved CRYPTO gateways for wallet list",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.Int("gateway_count", len(cryptoGateways)))
|
||||||
|
|
||||||
// Build request
|
// Build request
|
||||||
req := &connectorv1.ListAccountsRequest{
|
req := &connectorv1.ListAccountsRequest{
|
||||||
@@ -76,11 +79,19 @@ func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token *
|
|||||||
// Query all gateways in parallel
|
// Query all gateways in parallel
|
||||||
allAccounts := a.queryAllGateways(ctx, cryptoGateways, req)
|
allAccounts := a.queryAllGateways(ctx, cryptoGateways, req)
|
||||||
dedupedAccounts := dedupeAccountsByWalletRef(allAccounts)
|
dedupedAccounts := dedupeAccountsByWalletRef(allAccounts)
|
||||||
|
a.logger.Debug("Wallet list fan-out completed",
|
||||||
|
zap.String("organization_ref", orgRef.Hex()),
|
||||||
|
zap.Int("accounts_raw", len(allAccounts)),
|
||||||
|
zap.Int("accounts_deduped", len(dedupedAccounts)),
|
||||||
|
zap.Int("gateway_count", len(cryptoGateways)))
|
||||||
if len(dedupedAccounts) != len(allAccounts) {
|
if len(dedupedAccounts) != len(allAccounts) {
|
||||||
a.logger.Debug("Deduplicated duplicate wallets from gateway fan-out",
|
a.logger.Debug("Deduplicated duplicate wallets from gateway fan-out",
|
||||||
zap.Int("before", len(allAccounts)),
|
zap.Int("before", len(allAccounts)),
|
||||||
zap.Int("after", len(dedupedAccounts)))
|
zap.Int("after", len(dedupedAccounts)))
|
||||||
}
|
}
|
||||||
|
for _, account := range dedupedAccounts {
|
||||||
|
a.rememberWalletRoute(ctx, orgRef.Hex(), accountWalletRef(account), accountNetwork(account), "")
|
||||||
|
}
|
||||||
|
|
||||||
return sresponse.WalletsFromAccounts(a.logger, dedupedAccounts, token)
|
return sresponse.WalletsFromAccounts(a.logger, dedupedAccounts, token)
|
||||||
}
|
}
|
||||||
@@ -162,6 +173,9 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G
|
|||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
allAccounts := make([]*connectorv1.Account, 0)
|
allAccounts := make([]*connectorv1.Account, 0)
|
||||||
|
a.logger.Debug("Starting wallet list gateway fan-out",
|
||||||
|
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
|
||||||
|
zap.Int("gateway_count", len(gateways)))
|
||||||
|
|
||||||
for _, gw := range gateways {
|
for _, gw := range gateways {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@@ -181,6 +195,9 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G
|
|||||||
mu.Lock()
|
mu.Lock()
|
||||||
allAccounts = append(allAccounts, accounts...)
|
allAccounts = append(allAccounts, accounts...)
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
|
for _, account := range accounts {
|
||||||
|
a.rememberWalletRoute(ctx, strings.TrimSpace(req.GetOrganizationRef()), accountWalletRef(account), accountNetwork(account), gateway.ID)
|
||||||
|
}
|
||||||
|
|
||||||
a.logger.Debug("Queried gateway successfully",
|
a.logger.Debug("Queried gateway successfully",
|
||||||
zap.String("gateway_id", gateway.ID),
|
zap.String("gateway_id", gateway.ID),
|
||||||
@@ -190,6 +207,10 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G
|
|||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
a.logger.Debug("Finished wallet list gateway fan-out",
|
||||||
|
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
|
||||||
|
zap.Int("accounts_raw", len(allAccounts)),
|
||||||
|
zap.Int("gateway_count", len(gateways)))
|
||||||
return allAccounts
|
return allAccounts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
136
api/server/internal/server/walletapiimp/routing.go
Normal file
136
api/server/internal/server/walletapiimp/routing.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
package walletapiimp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
|
"github.com/tech/sendico/pkg/model"
|
||||||
|
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func normalizeNetworkName(raw string) string {
|
||||||
|
value := strings.TrimSpace(raw)
|
||||||
|
if value == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if idx := strings.Index(value, "-"); idx > 0 {
|
||||||
|
value = value[:idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
value = strings.ToLower(value)
|
||||||
|
value = strings.TrimPrefix(value, "chain_network_")
|
||||||
|
return strings.TrimSpace(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *WalletAPI) rememberWalletRoute(ctx context.Context, organizationRef string, walletRef string, network string, gatewayID string) {
|
||||||
|
if a.routes == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
walletRef = strings.TrimSpace(walletRef)
|
||||||
|
organizationRef = strings.TrimSpace(organizationRef)
|
||||||
|
network = normalizeNetworkName(network)
|
||||||
|
gatewayID = strings.TrimSpace(gatewayID)
|
||||||
|
|
||||||
|
if walletRef == "" || organizationRef == "" || (network == "" && gatewayID == "") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.routes.Upsert(ctx, &model.ChainWalletRoute{
|
||||||
|
OrganizationRef: organizationRef,
|
||||||
|
WalletRef: walletRef,
|
||||||
|
Network: network,
|
||||||
|
GatewayID: gatewayID,
|
||||||
|
}); err != nil {
|
||||||
|
a.logger.Warn("Failed to persist wallet route",
|
||||||
|
zap.String("organization_ref", organizationRef),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("network", network),
|
||||||
|
zap.String("gateway_id", gatewayID),
|
||||||
|
zap.Error(err))
|
||||||
|
} else {
|
||||||
|
a.logger.Debug("Persisted wallet route",
|
||||||
|
zap.String("organization_ref", organizationRef),
|
||||||
|
zap.String("wallet_ref", walletRef),
|
||||||
|
zap.String("network", network),
|
||||||
|
zap.String("gateway_id", gatewayID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *WalletAPI) walletRoute(ctx context.Context, organizationRef string, walletRef string) (*model.ChainWalletRoute, error) {
|
||||||
|
if a.routes == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
walletRef = strings.TrimSpace(walletRef)
|
||||||
|
organizationRef = strings.TrimSpace(organizationRef)
|
||||||
|
if walletRef == "" || organizationRef == "" {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
route, err := a.routes.Get(ctx, organizationRef, walletRef)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, merrors.ErrNoData) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return route, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func findGatewayForRoute(gateways []discovery.GatewaySummary, route *model.ChainWalletRoute) *discovery.GatewaySummary {
|
||||||
|
if route == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
gatewayID := strings.TrimSpace(route.GatewayID)
|
||||||
|
if gatewayID != "" {
|
||||||
|
for _, gw := range gateways {
|
||||||
|
if strings.EqualFold(strings.TrimSpace(gw.ID), gatewayID) &&
|
||||||
|
strings.EqualFold(gw.Rail, cryptoRail) &&
|
||||||
|
gw.Healthy &&
|
||||||
|
strings.TrimSpace(gw.InvokeURI) != "" {
|
||||||
|
return &gw
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return findGatewayForNetwork(gateways, route.Network)
|
||||||
|
}
|
||||||
|
|
||||||
|
func accountNetwork(account *connectorv1.Account) string {
|
||||||
|
if account == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
if details := account.GetProviderDetails(); details != nil {
|
||||||
|
if field, ok := details.GetFields()["network"]; ok && field != nil {
|
||||||
|
if network := normalizeNetworkName(field.GetStringValue()); network != "" {
|
||||||
|
return network
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return normalizeNetworkName(account.GetAsset())
|
||||||
|
}
|
||||||
|
|
||||||
|
func dropGatewayByInvokeURI(gateways []discovery.GatewaySummary, invokeURI string) []discovery.GatewaySummary {
|
||||||
|
invokeURI = strings.ToLower(strings.TrimSpace(invokeURI))
|
||||||
|
if invokeURI == "" || len(gateways) == 0 {
|
||||||
|
return gateways
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]discovery.GatewaySummary, 0, len(gateways))
|
||||||
|
for _, gw := range gateways {
|
||||||
|
if strings.ToLower(strings.TrimSpace(gw.InvokeURI)) == invokeURI {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
result = append(result, gw)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
93
api/server/internal/server/walletapiimp/routing_test.go
Normal file
93
api/server/internal/server/walletapiimp/routing_test.go
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
package walletapiimp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
|
"github.com/tech/sendico/pkg/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFindGatewayForRoute_PrefersGatewayID(t *testing.T) {
|
||||||
|
gateways := []discovery.GatewaySummary{
|
||||||
|
{
|
||||||
|
ID: "gw-fallback",
|
||||||
|
Rail: "CRYPTO",
|
||||||
|
Healthy: true,
|
||||||
|
InvokeURI: "chain-gw:50070",
|
||||||
|
Network: "ethereum_mainnet",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "gw-route",
|
||||||
|
Rail: "CRYPTO",
|
||||||
|
Healthy: true,
|
||||||
|
InvokeURI: "tron-gw:50071",
|
||||||
|
Network: "tron_mainnet",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
route := &model.ChainWalletRoute{
|
||||||
|
WalletRef: "wallet-1",
|
||||||
|
Network: "ethereum_mainnet",
|
||||||
|
GatewayID: "gw-route",
|
||||||
|
}
|
||||||
|
|
||||||
|
selected := findGatewayForRoute(gateways, route)
|
||||||
|
if selected == nil {
|
||||||
|
t.Fatal("expected selected gateway")
|
||||||
|
}
|
||||||
|
if selected.ID != "gw-route" {
|
||||||
|
t.Fatalf("expected gw-route, got %q", selected.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindGatewayForRoute_FallsBackToNetwork(t *testing.T) {
|
||||||
|
gateways := []discovery.GatewaySummary{
|
||||||
|
{
|
||||||
|
ID: "gw-chain",
|
||||||
|
Rail: "CRYPTO",
|
||||||
|
Healthy: true,
|
||||||
|
InvokeURI: "chain-gw:50070",
|
||||||
|
Network: "ethereum_mainnet",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "gw-tron",
|
||||||
|
Rail: "CRYPTO",
|
||||||
|
Healthy: true,
|
||||||
|
InvokeURI: "tron-gw:50071",
|
||||||
|
Network: "tron_mainnet",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
route := &model.ChainWalletRoute{
|
||||||
|
WalletRef: "wallet-1",
|
||||||
|
Network: "tron_mainnet",
|
||||||
|
GatewayID: "unknown",
|
||||||
|
}
|
||||||
|
|
||||||
|
selected := findGatewayForRoute(gateways, route)
|
||||||
|
if selected == nil {
|
||||||
|
t.Fatal("expected selected gateway")
|
||||||
|
}
|
||||||
|
if selected.ID != "gw-tron" {
|
||||||
|
t.Fatalf("expected gw-tron, got %q", selected.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeNetworkName(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
in string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{in: "CHAIN_NETWORK_TRON_MAINNET", want: "tron_mainnet"},
|
||||||
|
{in: "tron_mainnet-USDT", want: "tron_mainnet"},
|
||||||
|
{in: " ethereum_mainnet ", want: "ethereum_mainnet"},
|
||||||
|
{in: "", want: ""},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
got := normalizeNetworkName(tc.in)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Fatalf("normalizeNetworkName(%q) = %q, want %q", tc.in, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
api "github.com/tech/sendico/pkg/api/http"
|
api "github.com/tech/sendico/pkg/api/http"
|
||||||
"github.com/tech/sendico/pkg/auth"
|
"github.com/tech/sendico/pkg/auth"
|
||||||
"github.com/tech/sendico/pkg/db/chainassets"
|
"github.com/tech/sendico/pkg/db/chainassets"
|
||||||
|
"github.com/tech/sendico/pkg/db/chainwalletroutes"
|
||||||
"github.com/tech/sendico/pkg/discovery"
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
"github.com/tech/sendico/pkg/merrors"
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
msg "github.com/tech/sendico/pkg/messaging"
|
msg "github.com/tech/sendico/pkg/messaging"
|
||||||
@@ -34,6 +35,7 @@ type WalletAPI struct {
|
|||||||
walletsPermissionRef bson.ObjectID
|
walletsPermissionRef bson.ObjectID
|
||||||
balancesPermissionRef bson.ObjectID
|
balancesPermissionRef bson.ObjectID
|
||||||
assets chainassets.DB
|
assets chainassets.DB
|
||||||
|
routes chainwalletroutes.DB
|
||||||
|
|
||||||
// Gateway connection settings
|
// Gateway connection settings
|
||||||
dialTimeout time.Duration
|
dialTimeout time.Duration
|
||||||
@@ -66,6 +68,10 @@ func CreateAPI(apiCtx eapi.API) (*WalletAPI, error) {
|
|||||||
p.logger.Warn("Failed to create asstes db", zap.Error(err))
|
p.logger.Warn("Failed to create asstes db", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if p.routes, err = apiCtx.DBFactory().NewChainWalletRoutesDB(); err != nil {
|
||||||
|
p.logger.Warn("Failed to create chain wallet routes db", zap.Error(err))
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
walletsPolicy, err := apiCtx.Permissions().GetPolicyDescription(context.Background(), mservice.ChainWallets)
|
walletsPolicy, err := apiCtx.Permissions().GetPolicyDescription(context.Background(), mservice.ChainWallets)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user