From 671ccc55a0f1137a1e12f17e8c968e0e60313999 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Fri, 20 Feb 2026 15:38:22 +0100 Subject: [PATCH] cached gateway routing --- .../fees/internal/resolver/resolver_test.go | 14 ++ .../fees/internal/service/fees/service.go | 1 + .../internal/service/fees/service_test.go | 21 +++ .../service/gateway/commands/wallet/list.go | 13 +- .../internal/service/gateway/service_test.go | 37 ++++- .../chain/storage/mongo/store/wallets.go | 13 +- .../service/gateway/commands/wallet/list.go | 13 +- .../internal/service/gateway/service_test.go | 37 ++++- .../tron/storage/mongo/store/wallets.go | 13 +- api/pkg/db/chainwalletroutes/routes.go | 12 ++ api/pkg/db/factory.go | 2 + .../internal/mongo/chainwalletroutesdb/db.go | 111 ++++++++++++++ api/pkg/db/internal/mongo/db.go | 6 + api/pkg/messaging/internal/natsb/broker.go | 46 +++++- .../messaging/internal/natsb/broker_test.go | 58 ++++++++ api/pkg/model/chainwalletroute.go | 29 ++++ api/pkg/mservice/services.go | 3 +- .../internal/server/walletapiimp/balance.go | 102 ++++++++++++- .../internal/server/walletapiimp/create.go | 13 ++ .../internal/server/walletapiimp/list.go | 21 +++ .../internal/server/walletapiimp/routing.go | 136 ++++++++++++++++++ .../server/walletapiimp/routing_test.go | 93 ++++++++++++ .../internal/server/walletapiimp/service.go | 6 + 23 files changed, 777 insertions(+), 23 deletions(-) create mode 100644 api/pkg/db/chainwalletroutes/routes.go create mode 100644 api/pkg/db/internal/mongo/chainwalletroutesdb/db.go create mode 100644 api/pkg/messaging/internal/natsb/broker_test.go create mode 100644 api/pkg/model/chainwalletroute.go create mode 100644 api/server/internal/server/walletapiimp/routing.go create mode 100644 api/server/internal/server/walletapiimp/routing_test.go diff --git a/api/billing/fees/internal/service/fees/internal/resolver/resolver_test.go b/api/billing/fees/internal/service/fees/internal/resolver/resolver_test.go index 3f3487ad..2a0760e4 100644 --- a/api/billing/fees/internal/service/fees/internal/resolver/resolver_test.go +++ b/api/billing/fees/internal/service/fees/internal/resolver/resolver_test.go @@ -33,6 +33,7 @@ func TestResolver_GlobalFallbackWhenOrgMissing(t *testing.T) { if err != nil { t.Fatalf("expected fallback to global, got error: %v", err) } + if plan.OrganizationRef != nil && !plan.OrganizationRef.IsZero() { t.Fatalf("expected global plan, got orgRef %s", plan.OrganizationRef.Hex()) } @@ -158,6 +159,7 @@ func TestResolver_EffectiveDateFiltering(t *testing.T) { if err != nil { t.Fatalf("expected fallback to global, got error: %v", err) } + if rule.RuleID != "current" { t.Fatalf("expected current global rule, got %s", rule.RuleID) } @@ -182,6 +184,7 @@ func TestResolver_AppliesToFiltering(t *testing.T) { if err != nil { t.Fatalf("expected card rule, got error: %v", err) } + if rule.RuleID != "card" { t.Fatalf("expected card rule, got %s", rule.RuleID) } @@ -216,6 +219,7 @@ func TestResolver_AppliesToFilteringSupportsListsAndWildcard(t *testing.T) { if err != nil { t.Fatalf("expected list match rule, got error: %v", err) } + if rule.RuleID != "network_multi" { t.Fatalf("expected network list rule, got %s", rule.RuleID) } @@ -224,6 +228,7 @@ func TestResolver_AppliesToFilteringSupportsListsAndWildcard(t *testing.T) { if err != nil { t.Fatalf("expected wildcard rule, got error: %v", err) } + if rule.RuleID != "asset_any" { t.Fatalf("expected asset wildcard rule, got %s", rule.RuleID) } @@ -232,6 +237,7 @@ func TestResolver_AppliesToFilteringSupportsListsAndWildcard(t *testing.T) { if err != nil { t.Fatalf("expected default rule, got error: %v", err) } + if rule.RuleID != "default" { t.Fatalf("expected default rule, got %s", rule.RuleID) } @@ -315,9 +321,11 @@ func (m *memoryPlansStore) FindActiveOrgPlan(_ context.Context, orgRef bson.Obje if plan == nil || plan.OrganizationRef == nil || plan.OrganizationRef.IsZero() || (*plan.OrganizationRef != orgRef) { continue } + if !plan.Active { continue } + if plan.EffectiveFrom.After(asOf) { continue } @@ -328,9 +336,11 @@ func (m *memoryPlansStore) FindActiveOrgPlan(_ context.Context, orgRef bson.Obje matches = append(matches, plan) } + if len(matches) == 0 { return nil, storage.ErrFeePlanNotFound } + if len(matches) > 1 { return nil, storage.ErrConflictingFeePlans } @@ -349,18 +359,22 @@ func (m *memoryPlansStore) FindActiveGlobalPlan(_ context.Context, asOf time.Tim if !plan.Active { continue } + if plan.EffectiveFrom.After(asOf) { continue } + if plan.EffectiveTo != nil && !plan.EffectiveTo.After(asOf) { continue } matches = append(matches, plan) } + if len(matches) == 0 { return nil, storage.ErrFeePlanNotFound } + if len(matches) > 1 { return nil, storage.ErrConflictingFeePlans } diff --git a/api/billing/fees/internal/service/fees/service.go b/api/billing/fees/internal/service/fees/service.go index b13dabd3..aac2b5de 100644 --- a/api/billing/fees/internal/service/fees/service.go +++ b/api/billing/fees/internal/service/fees/service.go @@ -479,6 +479,7 @@ func (s *Service) observePrecomputeFees(logger mlogger.Logger, err error, resp * if !expiresAt.IsZero() { logFields = append(logFields, zap.Time("expires_at", expiresAt)) } + if err != nil { logger.Warn("PrecomputeFees finished", append(logFields, zap.Error(err))...) diff --git a/api/billing/fees/internal/service/fees/service_test.go b/api/billing/fees/internal/service/fees/service_test.go index 0b61b0cf..47bad274 100644 --- a/api/billing/fees/internal/service/fees/service_test.go +++ b/api/billing/fees/internal/service/fees/service_test.go @@ -93,9 +93,11 @@ func TestQuoteFees_ComputesDerivedLines(t *testing.T) { if got := line.GetMoney().GetAmount(); got != "3.20" { t.Fatalf("expected fee amount 3.20, got %s", got) } + if line.GetMoney().GetCurrency() != "USD" { t.Fatalf("expected currency USD, got %s", line.GetMoney().GetCurrency()) } + if line.GetLedgerAccountRef() != "acct:fees" { t.Fatalf("unexpected ledger account ref %s", line.GetLedgerAccountRef()) } @@ -111,9 +113,11 @@ func TestQuoteFees_ComputesDerivedLines(t *testing.T) { if applied.GetTaxCode() != "VAT" || applied.GetTaxRate() != "0.20" { t.Fatalf("applied rule metadata mismatch: %+v", applied) } + if applied.GetRounding() != moneyv1.RoundingMode_ROUND_HALF_UP { t.Fatalf("expected rounding HALF_UP, got %v", applied.GetRounding()) } + if applied.GetParameters()["scale"] != "2" { t.Fatalf("expected parameters to carry metadata scale, got %+v", applied.GetParameters()) } @@ -189,6 +193,7 @@ func TestQuoteFees_FiltersByAttributesAndDates(t *testing.T) { if err != nil { t.Fatalf("QuoteFees returned error: %v", err) } + if len(resp.GetLines()) != 1 { t.Fatalf("expected only base rule to fire, got %d lines", len(resp.GetLines())) } @@ -197,6 +202,7 @@ func TestQuoteFees_FiltersByAttributesAndDates(t *testing.T) { if line.GetLedgerAccountRef() != "acct:base" { t.Fatalf("expected base rule to apply, got %s", line.GetLedgerAccountRef()) } + if line.GetMoney().GetAmount() != "5.00" { t.Fatalf("expected 5.00 amount, got %s", line.GetMoney().GetAmount()) } @@ -250,9 +256,11 @@ func TestQuoteFees_RoundingDown(t *testing.T) { if err != nil { t.Fatalf("QuoteFees returned error: %v", err) } + if len(resp.GetLines()) != 1 { t.Fatalf("expected single derived line, got %d", len(resp.GetLines())) } + if resp.GetLines()[0].GetMoney().GetAmount() != "0.01" { t.Fatalf("expected rounding down to 0.01, got %s", resp.GetLines()[0].GetMoney().GetAmount()) } @@ -317,15 +325,19 @@ func TestQuoteFees_UsesInjectedCalculator(t *testing.T) { if err != nil { t.Fatalf("QuoteFees returned error: %v", err) } + if !calc.called { t.Fatalf("expected calculator to be invoked") } + if calc.gotPlan != plan { t.Fatalf("expected calculator to receive plan pointer") } + if len(resp.GetLines()) != len(result.Lines) { t.Fatalf("expected %d lines, got %d", len(result.Lines), len(resp.GetLines())) } + if resp.GetLines()[0].GetLedgerAccountRef() != "acct:stub" { t.Fatalf("unexpected ledger account in response: %s", resp.GetLines()[0].GetLedgerAccountRef()) } @@ -405,6 +417,7 @@ func TestQuoteFees_PopulatesFxUsed(t *testing.T) { if fx.GetProvider() != "TestProvider" || fx.GetRate().GetValue() != "1.2300" { t.Fatalf("unexpected FxUsed payload: %+v", fx) } + if fx.GetPair().GetBase() != "USD" || fx.GetPair().GetQuote() != "EUR" { t.Fatalf("unexpected currency pair: %+v", fx.GetPair()) } @@ -447,6 +460,7 @@ func (s *stubPlansStore) GetActivePlan(ctx context.Context, orgRef bson.ObjectID return nil, err } } + return s.FindActiveGlobalPlan(ctx, asOf) } @@ -454,15 +468,19 @@ func (s *stubPlansStore) FindActiveOrgPlan(_ context.Context, orgRef bson.Object if s.plan == nil { return nil, storage.ErrFeePlanNotFound } + if (s.plan.OrganizationRef != nil) && (*s.plan.OrganizationRef != orgRef) { return nil, storage.ErrFeePlanNotFound } + if !s.plan.Active { return nil, storage.ErrFeePlanNotFound } + if s.plan.EffectiveFrom.After(asOf) { return nil, storage.ErrFeePlanNotFound } + if s.plan.EffectiveTo != nil && !s.plan.EffectiveTo.After(asOf) { return nil, storage.ErrFeePlanNotFound } @@ -474,12 +492,15 @@ func (s *stubPlansStore) FindActiveGlobalPlan(_ context.Context, asOf time.Time) if s.globalPlan == nil { return nil, storage.ErrFeePlanNotFound } + if !s.globalPlan.Active { return nil, storage.ErrFeePlanNotFound } + if s.globalPlan.EffectiveFrom.After(asOf) { return nil, storage.ErrFeePlanNotFound } + if s.globalPlan.EffectiveTo != nil && !s.globalPlan.EffectiveTo.After(asOf) { return nil, storage.ErrFeePlanNotFound } diff --git a/api/gateway/chain/internal/service/gateway/commands/wallet/list.go b/api/gateway/chain/internal/service/gateway/commands/wallet/list.go index 2513a2b5..9506ab15 100644 --- a/api/gateway/chain/internal/service/gateway/commands/wallet/list.go +++ b/api/gateway/chain/internal/service/gateway/commands/wallet/list.go @@ -2,11 +2,13 @@ package wallet import ( "context" + "errors" "strings" "github.com/tech/sendico/gateway/chain/internal/service/gateway/shared" "github.com/tech/sendico/gateway/chain/storage/model" "github.com/tech/sendico/pkg/api/routers/gsresponse" + "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mservice" paginationv1 "github.com/tech/sendico/pkg/proto/common/pagination/v1" chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1" @@ -45,8 +47,15 @@ func (c *listManagedWalletsCommand) Execute(ctx context.Context, req *chainv1.Li result, err := c.deps.Storage.Wallets().List(ctx, filter) if err != nil { - c.deps.Logger.Warn("Storage list failed", zap.Error(err)) - return gsresponse.Auto[chainv1.ListManagedWalletsResponse](c.deps.Logger, mservice.ChainGateway, err) + if errors.Is(err, merrors.ErrNoData) { + result = &model.ManagedWalletList{} + } else { + c.deps.Logger.Warn("Storage list failed", zap.Error(err)) + return gsresponse.Auto[chainv1.ListManagedWalletsResponse](c.deps.Logger, mservice.ChainGateway, err) + } + } + if result == nil { + result = &model.ManagedWalletList{} } protoWallets := make([]*chainv1.ManagedWallet, 0, len(result.Items)) diff --git a/api/gateway/chain/internal/service/gateway/service_test.go b/api/gateway/chain/internal/service/gateway/service_test.go index eb501266..dfb06391 100644 --- a/api/gateway/chain/internal/service/gateway/service_test.go +++ b/api/gateway/chain/internal/service/gateway/service_test.go @@ -127,6 +127,20 @@ func TestListAccounts_OrganizationRefFilters(t *testing.T) { require.Equal(t, "org-1", orgField.GetStringValue()) } +func TestListAccounts_NoWalletsNotError(t *testing.T) { + baseRepo := newInMemoryRepository() + svc := newTestServiceWithRepository(t, &walletsNoDataRepository{inMemoryRepository: baseRepo}) + ctx := context.Background() + + resp, err := svc.ListAccounts(ctx, &connectorv1.ListAccountsRequest{ + OrganizationRef: "org-1", + Kind: connectorv1.AccountKind_CHAIN_MANAGED_WALLET, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Empty(t, resp.GetAccounts()) +} + func TestSubmitTransfer_ManagedDestination(t *testing.T) { svc, repo := newTestService(t) ctx := context.Background() @@ -253,6 +267,22 @@ func newInMemoryRepository() *inMemoryRepository { } } +type walletsNoDataRepository struct { + *inMemoryRepository +} + +func (r *walletsNoDataRepository) Wallets() storage.WalletsStore { + return &walletsNoDataStore{WalletsStore: r.inMemoryRepository.wallets} +} + +type walletsNoDataStore struct { + storage.WalletsStore +} + +func (w *walletsNoDataStore) List(context.Context, model.ManagedWalletFilter) (*model.ManagedWalletList, error) { + return nil, merrors.NoData("no wallets") +} + func (r *inMemoryRepository) Ping(context.Context) error { return nil } func (r *inMemoryRepository) Wallets() storage.WalletsStore { return r.wallets } func (r *inMemoryRepository) Transfers() storage.TransfersStore { return r.transfers } @@ -625,6 +655,11 @@ func sanitizeLimit(requested int32, def, max int64) int64 { func newTestService(t *testing.T) (*Service, *inMemoryRepository) { repo := newInMemoryRepository() + svc := newTestServiceWithRepository(t, repo) + return svc, repo +} + +func newTestServiceWithRepository(t *testing.T, repo storage.Repository) *Service { logger := zap.NewNop() networks := []shared.Network{{ Name: "ethereum_mainnet", @@ -641,7 +676,7 @@ func newTestService(t *testing.T) (*Service, *inMemoryRepository) { WithServiceWallet(shared.ServiceWallet{Network: "ethereum_mainnet", Address: "0xservice"}), WithDriverRegistry(driverRegistry), ) - return svc, repo + return svc } type fakeKeyManager struct{} diff --git a/api/gateway/chain/storage/mongo/store/wallets.go b/api/gateway/chain/storage/mongo/store/wallets.go index f7c3fb15..d4b44f12 100644 --- a/api/gateway/chain/storage/mongo/store/wallets.go +++ b/api/gateway/chain/storage/mongo/store/wallets.go @@ -183,9 +183,14 @@ func (w *Wallets) List(ctx context.Context, filter model.ManagedWalletFilter) (* query = query.Sort(repository.IDField(), true).Limit(&fetchLimit) wallets, listErr := mutil.GetObjects[model.ManagedWallet](ctx, w.logger, query, nil, w.walletRepo) - if listErr != nil && !errors.Is(listErr, merrors.ErrNoData) { - w.logger.Warn("Wallet list failed", append(fields, zap.Error(listErr))...) - return nil, listErr + if listErr != nil { + if errors.Is(listErr, merrors.ErrNoData) { + wallets = make([]model.ManagedWallet, 0) + listErr = nil + } else { + w.logger.Warn("Wallet list failed", append(fields, zap.Error(listErr))...) + return nil, listErr + } } nextCursor := "" @@ -204,7 +209,7 @@ func (w *Wallets) List(ctx context.Context, filter model.ManagedWalletFilter) (* zap.Int("count", len(result.Items)), zap.String("next_cursor", result.NextCursor), ) - if errors.Is(listErr, merrors.ErrNoData) { + if len(result.Items) == 0 { w.logger.Debug("Wallet list empty", fields...) } else { w.logger.Debug("Wallet list fetched", fields...) diff --git a/api/gateway/tron/internal/service/gateway/commands/wallet/list.go b/api/gateway/tron/internal/service/gateway/commands/wallet/list.go index 3fdace22..f7fd1313 100644 --- a/api/gateway/tron/internal/service/gateway/commands/wallet/list.go +++ b/api/gateway/tron/internal/service/gateway/commands/wallet/list.go @@ -2,11 +2,13 @@ package wallet import ( "context" + "errors" "strings" "github.com/tech/sendico/gateway/tron/shared" "github.com/tech/sendico/gateway/tron/storage/model" "github.com/tech/sendico/pkg/api/routers/gsresponse" + "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mservice" paginationv1 "github.com/tech/sendico/pkg/proto/common/pagination/v1" chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1" @@ -45,8 +47,15 @@ func (c *listManagedWalletsCommand) Execute(ctx context.Context, req *chainv1.Li result, err := c.deps.Storage.Wallets().List(ctx, filter) if err != nil { - c.deps.Logger.Warn("Storage list failed", zap.Error(err)) - return gsresponse.Auto[chainv1.ListManagedWalletsResponse](c.deps.Logger, mservice.ChainGateway, err) + if errors.Is(err, merrors.ErrNoData) { + result = &model.ManagedWalletList{} + } else { + c.deps.Logger.Warn("Storage list failed", zap.Error(err)) + return gsresponse.Auto[chainv1.ListManagedWalletsResponse](c.deps.Logger, mservice.ChainGateway, err) + } + } + if result == nil { + result = &model.ManagedWalletList{} } protoWallets := make([]*chainv1.ManagedWallet, 0, len(result.Items)) diff --git a/api/gateway/tron/internal/service/gateway/service_test.go b/api/gateway/tron/internal/service/gateway/service_test.go index 59dc7b97..f1195b49 100644 --- a/api/gateway/tron/internal/service/gateway/service_test.go +++ b/api/gateway/tron/internal/service/gateway/service_test.go @@ -128,6 +128,20 @@ func TestListAccounts_OrganizationRefFilters(t *testing.T) { require.Equal(t, "org-1", orgField.GetStringValue()) } +func TestListAccounts_NoWalletsNotError(t *testing.T) { + baseRepo := newInMemoryRepository() + svc := newTestServiceWithRepository(t, &walletsNoDataRepository{inMemoryRepository: baseRepo}) + ctx := context.Background() + + resp, err := svc.ListAccounts(ctx, &connectorv1.ListAccountsRequest{ + OrganizationRef: "org-1", + Kind: connectorv1.AccountKind_CHAIN_MANAGED_WALLET, + }) + require.NoError(t, err) + require.NotNil(t, resp) + require.Empty(t, resp.GetAccounts()) +} + func TestSubmitTransfer_ManagedDestination(t *testing.T) { svc, repo := newTestService(t) ctx := context.Background() @@ -256,6 +270,22 @@ func newInMemoryRepository() *inMemoryRepository { } } +type walletsNoDataRepository struct { + *inMemoryRepository +} + +func (r *walletsNoDataRepository) Wallets() storage.WalletsStore { + return &walletsNoDataStore{WalletsStore: r.inMemoryRepository.wallets} +} + +type walletsNoDataStore struct { + storage.WalletsStore +} + +func (w *walletsNoDataStore) List(context.Context, model.ManagedWalletFilter) (*model.ManagedWalletList, error) { + return nil, merrors.NoData("no wallets") +} + func (r *inMemoryRepository) Ping(context.Context) error { return nil } func (r *inMemoryRepository) Wallets() storage.WalletsStore { return r.wallets } func (r *inMemoryRepository) Transfers() storage.TransfersStore { return r.transfers } @@ -628,6 +658,11 @@ func sanitizeLimit(requested int32, def, max int64) int64 { func newTestService(t *testing.T) (*Service, *inMemoryRepository) { repo := newInMemoryRepository() + svc := newTestServiceWithRepository(t, repo) + return svc, repo +} + +func newTestServiceWithRepository(t *testing.T, repo storage.Repository) *Service { logger := zap.NewNop() networks := []shared.Network{{ Name: pmodel.ChainNetworkTronMainnet, @@ -644,7 +679,7 @@ func newTestService(t *testing.T) (*Service, *inMemoryRepository) { WithServiceWallet(shared.ServiceWallet{Network: pmodel.ChainNetworkTronMainnet, Address: "TServiceWalletAddress"}), WithDriverRegistry(driverRegistry), ) - return svc, repo + return svc } type fakeKeyManager struct{} diff --git a/api/gateway/tron/storage/mongo/store/wallets.go b/api/gateway/tron/storage/mongo/store/wallets.go index 3ae97a59..e2e8aa57 100644 --- a/api/gateway/tron/storage/mongo/store/wallets.go +++ b/api/gateway/tron/storage/mongo/store/wallets.go @@ -187,9 +187,14 @@ func (w *Wallets) List(ctx context.Context, filter model.ManagedWalletFilter) (* query = query.Sort(repository.IDField(), true).Limit(&fetchLimit) wallets, listErr := mutil.GetObjects[model.ManagedWallet](ctx, w.logger, query, nil, w.walletRepo) - if listErr != nil && !errors.Is(listErr, merrors.ErrNoData) { - w.logger.Warn("Wallet list failed", append(fields, zap.Error(listErr))...) - return nil, listErr + if listErr != nil { + if errors.Is(listErr, merrors.ErrNoData) { + wallets = make([]model.ManagedWallet, 0) + listErr = nil + } else { + w.logger.Warn("Wallet list failed", append(fields, zap.Error(listErr))...) + return nil, listErr + } } nextCursor := "" @@ -208,7 +213,7 @@ func (w *Wallets) List(ctx context.Context, filter model.ManagedWalletFilter) (* zap.Int("count", len(result.Items)), zap.String("next_cursor", result.NextCursor), ) - if errors.Is(listErr, merrors.ErrNoData) { + if len(result.Items) == 0 { w.logger.Debug("Wallet list empty", fields...) } else { w.logger.Debug("Wallet list fetched", fields...) diff --git a/api/pkg/db/chainwalletroutes/routes.go b/api/pkg/db/chainwalletroutes/routes.go new file mode 100644 index 00000000..6fc4fb35 --- /dev/null +++ b/api/pkg/db/chainwalletroutes/routes.go @@ -0,0 +1,12 @@ +package chainwalletroutes + +import ( + "context" + + "github.com/tech/sendico/pkg/model" +) + +type DB interface { + Get(ctx context.Context, organizationRef string, walletRef string) (*model.ChainWalletRoute, error) + Upsert(ctx context.Context, route *model.ChainWalletRoute) error +} diff --git a/api/pkg/db/factory.go b/api/pkg/db/factory.go index 4026bab1..db45addb 100644 --- a/api/pkg/db/factory.go +++ b/api/pkg/db/factory.go @@ -4,6 +4,7 @@ import ( "github.com/tech/sendico/pkg/auth" "github.com/tech/sendico/pkg/db/account" "github.com/tech/sendico/pkg/db/chainassets" + "github.com/tech/sendico/pkg/db/chainwalletroutes" mongoimpl "github.com/tech/sendico/pkg/db/internal/mongo" "github.com/tech/sendico/pkg/db/invitation" "github.com/tech/sendico/pkg/db/organization" @@ -22,6 +23,7 @@ type Factory interface { NewRefreshTokensDB() (refreshtokens.DB, error) NewChainAsstesDB() (chainassets.DB, error) + NewChainWalletRoutesDB() (chainwalletroutes.DB, error) NewAccountDB() (account.DB, error) NewOrganizationDB() (organization.DB, error) diff --git a/api/pkg/db/internal/mongo/chainwalletroutesdb/db.go b/api/pkg/db/internal/mongo/chainwalletroutesdb/db.go new file mode 100644 index 00000000..541e28d3 --- /dev/null +++ b/api/pkg/db/internal/mongo/chainwalletroutesdb/db.go @@ -0,0 +1,111 @@ +package chainwalletroutesdb + +import ( + "context" + "errors" + + "github.com/tech/sendico/pkg/db/repository" + ri "github.com/tech/sendico/pkg/db/repository/index" + "github.com/tech/sendico/pkg/db/template" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" + "github.com/tech/sendico/pkg/model" + "github.com/tech/sendico/pkg/mservice" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.uber.org/zap" +) + +type ChainWalletRoutesDB struct { + template.DBImp[*model.ChainWalletRoute] +} + +func Create(logger mlogger.Logger, db *mongo.Database) (*ChainWalletRoutesDB, error) { + p := &ChainWalletRoutesDB{ + DBImp: *template.Create[*model.ChainWalletRoute](logger, mservice.WalletRoutes, db), + } + + if err := p.Repository.CreateIndex(&ri.Definition{ + Name: "idx_org_wallet_unique", + Unique: true, + Keys: []ri.Key{ + {Field: "organizationRef", Sort: ri.Asc}, + {Field: "walletRef", Sort: ri.Asc}, + }, + }); err != nil { + p.Logger.Error("Failed to create unique organization/wallet route index", zap.Error(err)) + return nil, err + } + + if err := p.Repository.CreateIndex(&ri.Definition{ + Name: "idx_wallet_ref", + Keys: []ri.Key{ + {Field: "walletRef", Sort: ri.Asc}, + }, + }); err != nil { + p.Logger.Error("Failed to create wallet route lookup index", zap.Error(err)) + return nil, err + } + + return p, nil +} + +func (db *ChainWalletRoutesDB) Get(ctx context.Context, organizationRef string, walletRef string) (*model.ChainWalletRoute, error) { + org := model.ChainWalletRoute{OrganizationRef: organizationRef, WalletRef: walletRef} + org.Normalize() + if org.OrganizationRef == "" || org.WalletRef == "" { + return nil, merrors.InvalidArgument("wallet route requires organizationRef and walletRef") + } + + var route model.ChainWalletRoute + query := repository.Query(). + Filter(repository.Field("organizationRef"), org.OrganizationRef). + Filter(repository.Field("walletRef"), org.WalletRef) + return &route, db.FindOne(ctx, query, &route) +} + +func (db *ChainWalletRoutesDB) Upsert(ctx context.Context, route *model.ChainWalletRoute) error { + if route == nil { + return merrors.InvalidArgument("wallet route is nil") + } + route.Normalize() + if route.OrganizationRef == "" || route.WalletRef == "" { + return merrors.InvalidArgument("wallet route requires organizationRef and walletRef") + } + if route.Network == "" && route.GatewayID == "" { + return merrors.InvalidArgument("wallet route requires network or gatewayId") + } + + existing, err := db.Get(ctx, route.OrganizationRef, route.WalletRef) + if err != nil { + if !errors.Is(err, merrors.ErrNoData) { + return err + } + if createErr := db.Create(ctx, route); createErr != nil { + if errors.Is(createErr, merrors.ErrDataConflict) { + existing, err = db.Get(ctx, route.OrganizationRef, route.WalletRef) + if err != nil { + return err + } + } else { + return createErr + } + } else { + return nil + } + } + + changed := false + if route.Network != "" && existing.Network != route.Network { + existing.Network = route.Network + changed = true + } + if route.GatewayID != "" && existing.GatewayID != route.GatewayID { + existing.GatewayID = route.GatewayID + changed = true + } + if !changed { + return nil + } + + return db.Update(ctx, existing) +} diff --git a/api/pkg/db/internal/mongo/db.go b/api/pkg/db/internal/mongo/db.go index acd60767..414dfa1b 100755 --- a/api/pkg/db/internal/mongo/db.go +++ b/api/pkg/db/internal/mongo/db.go @@ -11,8 +11,10 @@ import ( "github.com/tech/sendico/pkg/auth" "github.com/tech/sendico/pkg/db/account" "github.com/tech/sendico/pkg/db/chainassets" + "github.com/tech/sendico/pkg/db/chainwalletroutes" "github.com/tech/sendico/pkg/db/internal/mongo/accountdb" "github.com/tech/sendico/pkg/db/internal/mongo/chainassetsdb" + "github.com/tech/sendico/pkg/db/internal/mongo/chainwalletroutesdb" "github.com/tech/sendico/pkg/db/internal/mongo/invitationdb" "github.com/tech/sendico/pkg/db/internal/mongo/organizationdb" "github.com/tech/sendico/pkg/db/internal/mongo/policiesdb" @@ -308,6 +310,10 @@ func (db *DB) NewChainAsstesDB() (chainassets.DB, error) { return chainassetsdb.Create(db.logger, db.db()) } +func (db *DB) NewChainWalletRoutesDB() (chainwalletroutes.DB, error) { + return chainwalletroutesdb.Create(db.logger, db.db()) +} + func (db *DB) Permissions() auth.Provider { return db } diff --git a/api/pkg/messaging/internal/natsb/broker.go b/api/pkg/messaging/internal/natsb/broker.go index 829de692..4ea7fef2 100644 --- a/api/pkg/messaging/internal/natsb/broker.go +++ b/api/pkg/messaging/internal/natsb/broker.go @@ -35,6 +35,41 @@ type envConfig struct { const defaultConsumerBufferSize = 1024 +func sanitizeNATSURL(rawURL string) string { + if rawURL == "" { + return rawURL + } + + parts := strings.Split(rawURL, ",") + sanitized := make([]string, 0, len(parts)) + for _, part := range parts { + trimmed := strings.TrimSpace(part) + if trimmed == "" { + continue + } + if !strings.Contains(trimmed, "://") { + sanitized = append(sanitized, trimmed) + continue + } + + parsed, err := url.Parse(trimmed) + if err != nil { + sanitized = append(sanitized, trimmed) + continue + } + if parsed.User == nil { + sanitized = append(sanitized, trimmed) + continue + } + sanitized = append(sanitized, parsed.Redacted()) + } + + if len(sanitized) == 0 { + return strings.TrimSpace(rawURL) + } + return strings.Join(sanitized, ",") +} + // loadEnv gathers and validates connection details from environment variables // listed in the Settings struct. Invalid or missing values surface as a typed // InvalidArgument error so callers can decide how to handle them. @@ -109,6 +144,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e } natsURL = u.String() } + sanitizedNATSURL := sanitizeNATSURL(natsURL) opts := []nats.Option{ nats.Name(settings.NATSName), @@ -120,7 +156,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e zap.String("broker", settings.NATSName), } if conn != nil { - fields = append(fields, zap.String("connected_url", conn.ConnectedUrl())) + fields = append(fields, zap.String("connected_url", sanitizeNATSURL(conn.ConnectedUrl()))) } if err != nil { fields = append(fields, zap.Error(err)) @@ -132,7 +168,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e zap.String("broker", settings.NATSName), } if conn != nil { - fields = append(fields, zap.String("connected_url", conn.ConnectedUrl())) + fields = append(fields, zap.String("connected_url", sanitizeNATSURL(conn.ConnectedUrl()))) } l.Info("Reconnected to NATS", fields...) }), @@ -142,7 +178,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e } if conn != nil { if url := conn.ConnectedUrl(); url != "" { - fields = append(fields, zap.String("connected_url", url)) + fields = append(fields, zap.String("connected_url", sanitizeNATSURL(url))) } if err := conn.LastError(); err != nil { fields = append(fields, zap.Error(err)) @@ -172,7 +208,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e } if res.nc, err = nats.Connect(natsURL, opts...); err != nil { - l.Error("Failed to connect to NATS", zap.String("url", natsURL), zap.Error(err)) + l.Error("Failed to connect to NATS", zap.String("url", sanitizedNATSURL), zap.Error(err)) return nil, err } if res.js, err = res.nc.JetStream(); err != nil { @@ -180,7 +216,7 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e } logger.Info("Connected to NATS", zap.String("broker", settings.NATSName), - zap.String("url", natsURL)) + zap.String("url", sanitizedNATSURL)) return res, nil } diff --git a/api/pkg/messaging/internal/natsb/broker_test.go b/api/pkg/messaging/internal/natsb/broker_test.go new file mode 100644 index 00000000..a1212363 --- /dev/null +++ b/api/pkg/messaging/internal/natsb/broker_test.go @@ -0,0 +1,58 @@ +package natsb + +import ( + "strings" + "testing" +) + +func TestSanitizeNATSURL(t *testing.T) { + t.Parallel() + + t.Run("redacts single URL credentials", func(t *testing.T) { + t.Parallel() + + raw := "nats://alice:supersecret@localhost:4222" + sanitized := sanitizeNATSURL(raw) + + if strings.Contains(sanitized, "supersecret") { + t.Fatalf("expected password to be redacted, got %q", sanitized) + } + if !strings.Contains(sanitized, "alice:xxxxx@") { + t.Fatalf("expected redacted URL to keep username, got %q", sanitized) + } + }) + + t.Run("keeps URL without credentials unchanged", func(t *testing.T) { + t.Parallel() + + raw := "nats://localhost:4222" + sanitized := sanitizeNATSURL(raw) + if sanitized != raw { + t.Fatalf("expected URL without credentials to remain unchanged, got %q", sanitized) + } + }) + + t.Run("redacts each URL in server list", func(t *testing.T) { + t.Parallel() + + raw := " nats://alice:one@localhost:4222, nats://bob:two@localhost:4223 " + sanitized := sanitizeNATSURL(raw) + + if strings.Contains(sanitized, "one") || strings.Contains(sanitized, "two") { + t.Fatalf("expected passwords to be redacted, got %q", sanitized) + } + if !strings.Contains(sanitized, "alice:xxxxx@") || !strings.Contains(sanitized, "bob:xxxxx@") { + t.Fatalf("expected both URLs to be redacted, got %q", sanitized) + } + }) + + t.Run("returns invalid URL as-is", func(t *testing.T) { + t.Parallel() + + raw := "not a url" + sanitized := sanitizeNATSURL(raw) + if sanitized != raw { + t.Fatalf("expected invalid URL to remain unchanged, got %q", sanitized) + } + }) +} diff --git a/api/pkg/model/chainwalletroute.go b/api/pkg/model/chainwalletroute.go new file mode 100644 index 00000000..d5f16f43 --- /dev/null +++ b/api/pkg/model/chainwalletroute.go @@ -0,0 +1,29 @@ +package model + +import ( + "strings" + + "github.com/tech/sendico/pkg/db/storable" + "github.com/tech/sendico/pkg/mservice" +) + +// ChainWalletRoute stores authoritative wallet-to-gateway routing metadata. +type ChainWalletRoute struct { + storable.Base `bson:",inline" json:",inline"` + + OrganizationRef string `bson:"organizationRef" json:"organizationRef"` + WalletRef string `bson:"walletRef" json:"walletRef"` + Network string `bson:"network" json:"network"` + GatewayID string `bson:"gatewayId,omitempty" json:"gatewayId,omitempty"` +} + +func (*ChainWalletRoute) Collection() string { + return mservice.WalletRoutes +} + +func (r *ChainWalletRoute) Normalize() { + r.OrganizationRef = strings.TrimSpace(r.OrganizationRef) + r.WalletRef = strings.TrimSpace(r.WalletRef) + r.Network = strings.ToLower(strings.TrimSpace(r.Network)) + r.GatewayID = strings.TrimSpace(r.GatewayID) +} diff --git a/api/pkg/mservice/services.go b/api/pkg/mservice/services.go index eb6a7177..5ac96793 100644 --- a/api/pkg/mservice/services.go +++ b/api/pkg/mservice/services.go @@ -52,12 +52,13 @@ const ( Tenants Type = "tenants" // Represents tenants managed in the system VerificationTokens Type = "verification_tokens" //Represents verification tokens managed in the system Wallets Type = "wallets" // Represents workflows for tasks or projects + WalletRoutes Type = "wallet_routes" // Represents authoritative chain wallet gateway routing Workflows Type = "workflows" // Represents workflows for tasks or projects ) func StringToSType(s string) (Type, error) { switch Type(s) { - case Accounts, Verification, Amplitude, Site, Changes, Clients, ChainGateway, ChainWallets, ChainWalletBalances, + case Accounts, Verification, Amplitude, Site, Changes, Clients, ChainGateway, ChainWallets, WalletRoutes, ChainWalletBalances, ChainTransfers, ChainDeposits, MntxGateway, PaymentGateway, FXOracle, FeePlans, BillingDocuments, FilterProjects, Invitations, Invoices, Logo, Ledger, LedgerAccounts, LedgerBalances, LedgerEntries, LedgerOutbox, LedgerParties, LedgerPlines, Notifications, Organizations, Payments, PaymentRoutes, PaymentPlanTemplates, PaymentOrchestrator, PaymentMethods, Permissions, Policies, PolicyAssignements, diff --git a/api/server/internal/server/walletapiimp/balance.go b/api/server/internal/server/walletapiimp/balance.go index 11101921..fc45960a 100644 --- a/api/server/internal/server/walletapiimp/balance.go +++ b/api/server/internal/server/walletapiimp/balance.go @@ -64,9 +64,83 @@ func (a *WalletAPI) getWalletBalance(r *http.Request, account *model.Account, to a.logger.Debug("No CRYPTO rail gateways found in discovery") return response.Auto(a.logger, a.Name(), merrors.NoData("no crypto gateways available")) } + a.logger.Debug("Resolved CRYPTO gateways for wallet balance lookup", + zap.String("organization_ref", orgRef.Hex()), + zap.String("wallet_ref", walletRef), + zap.Int("gateway_count", len(cryptoGateways))) - // Query all gateways in parallel to find the wallet balance - bal, err := a.queryBalanceFromGateways(ctx, cryptoGateways, walletRef) + route, routeErr := a.walletRoute(ctx, orgRef.Hex(), walletRef) + if routeErr != nil { + a.logger.Warn("Failed to resolve wallet route", zap.Error(routeErr), zap.String("wallet_ref", walletRef), zap.String("organization_ref", orgRef.Hex())) + } + if route != nil { + a.logger.Debug("Resolved stored wallet route", + zap.String("organization_ref", orgRef.Hex()), + zap.String("wallet_ref", walletRef), + zap.String("route_network", route.Network), + zap.String("route_gateway_id", route.GatewayID)) + preferred := findGatewayForRoute(cryptoGateways, route) + if preferred != nil { + a.logger.Debug("Using preferred gateway from stored wallet route", + zap.String("organization_ref", orgRef.Hex()), + zap.String("wallet_ref", walletRef), + zap.String("gateway_id", preferred.ID), + zap.String("network", preferred.Network), + zap.String("invoke_uri", preferred.InvokeURI)) + bal, preferredErr := a.queryGatewayBalance(ctx, *preferred, walletRef) + if preferredErr == nil && bal != nil { + a.rememberWalletRoute(ctx, orgRef.Hex(), walletRef, preferred.Network, preferred.ID) + a.logger.Debug("Wallet balance resolved via preferred gateway", + zap.String("organization_ref", orgRef.Hex()), + zap.String("wallet_ref", walletRef), + zap.String("gateway_id", preferred.ID), + zap.String("network", preferred.Network)) + return sresponse.WalletBalanceFromConnector(a.logger, bal, token) + } + + if preferredErr != nil { + a.logger.Debug("Preferred gateway balance lookup failed, falling back to fan-out", + zap.String("wallet_ref", walletRef), + zap.String("network", route.Network), + zap.String("gateway_id", preferred.ID), + zap.String("invoke_uri", preferred.InvokeURI), + zap.Error(preferredErr)) + } else { + a.logger.Debug("Preferred gateway returned empty balance, falling back to fan-out", + zap.String("wallet_ref", walletRef), + zap.String("network", route.Network), + zap.String("gateway_id", preferred.ID), + zap.String("invoke_uri", preferred.InvokeURI)) + } + + cryptoGateways = dropGatewayByInvokeURI(cryptoGateways, preferred.InvokeURI) + if len(cryptoGateways) == 0 { + if preferredErr != nil { + a.logger.Warn("Failed to fetch wallet balance from preferred gateway", zap.Error(preferredErr), zap.String("wallet_ref", walletRef)) + return response.Auto(a.logger, a.Name(), preferredErr) + } + a.logger.Warn("Wallet balance not found on preferred gateway", zap.String("wallet_ref", walletRef)) + return response.Auto(a.logger, a.Name(), merrors.NoData("wallet not found")) + } + } else { + a.logger.Warn("Stored wallet route did not match any healthy discovery gateway", + zap.String("organization_ref", orgRef.Hex()), + zap.String("wallet_ref", walletRef), + zap.String("route_network", route.Network), + zap.String("route_gateway_id", route.GatewayID)) + } + } else { + a.logger.Debug("Stored wallet route not found; using gateway fallback", + zap.String("organization_ref", orgRef.Hex()), + zap.String("wallet_ref", walletRef)) + } + + // Fall back to querying remaining gateways in parallel. + a.logger.Debug("Starting fallback wallet balance fan-out", + zap.String("organization_ref", orgRef.Hex()), + zap.String("wallet_ref", walletRef), + zap.Int("gateway_count", len(cryptoGateways))) + bal, err := a.queryBalanceFromGateways(ctx, cryptoGateways, orgRef.Hex(), walletRef) if err != nil { a.logger.Warn("Failed to fetch wallet balance from gateways", zap.Error(err), zap.String("wallet_ref", walletRef)) return response.Auto(a.logger, a.Name(), err) @@ -80,11 +154,18 @@ func (a *WalletAPI) getWalletBalance(r *http.Request, account *model.Account, to return sresponse.WalletBalanceFromConnector(a.logger, bal, token) } -func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []discovery.GatewaySummary, walletRef string) (*connectorv1.Balance, error) { +func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []discovery.GatewaySummary, organizationRef string, walletRef string) (*connectorv1.Balance, error) { var mu sync.Mutex var wg sync.WaitGroup var result *connectorv1.Balance var lastErr error + selectedGatewayID := "" + selectedNetwork := "" + + a.logger.Debug("Querying wallet balance across gateways", + zap.String("organization_ref", organizationRef), + zap.String("wallet_ref", walletRef), + zap.Int("gateway_count", len(gateways))) for _, gw := range gateways { wg.Add(1) @@ -108,6 +189,9 @@ func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []dis mu.Lock() if result == nil { result = bal + a.rememberWalletRoute(ctx, organizationRef, walletRef, gateway.Network, gateway.ID) + selectedGatewayID = gateway.ID + selectedNetwork = gateway.Network a.logger.Debug("Found wallet balance on gateway", zap.String("gateway_id", gateway.ID), zap.String("network", gateway.Network), @@ -121,11 +205,23 @@ func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []dis wg.Wait() if result != nil { + a.logger.Debug("Wallet balance fan-out completed with result", + zap.String("organization_ref", organizationRef), + zap.String("wallet_ref", walletRef), + zap.String("gateway_id", selectedGatewayID), + zap.String("network", selectedNetwork)) return result, nil } if lastErr != nil { + a.logger.Debug("Wallet balance fan-out completed with errors", + zap.String("organization_ref", organizationRef), + zap.String("wallet_ref", walletRef), + zap.Error(lastErr)) return nil, lastErr } + a.logger.Debug("Wallet balance fan-out completed without result", + zap.String("organization_ref", organizationRef), + zap.String("wallet_ref", walletRef)) return nil, nil } diff --git a/api/server/internal/server/walletapiimp/create.go b/api/server/internal/server/walletapiimp/create.go index 3e7e5ab9..8d22f7c0 100644 --- a/api/server/internal/server/walletapiimp/create.go +++ b/api/server/internal/server/walletapiimp/create.go @@ -80,6 +80,12 @@ func (a *WalletAPI) create(r *http.Request, account *model.Account, token *sresp zap.String("chain", string(sr.Asset.Chain))) return response.Auto(a.logger, a.Name(), merrors.InvalidArgument("no gateway available for network: "+networkName)) } + a.logger.Debug("Selected gateway for wallet creation", + zap.String("organization_ref", orgRef.Hex()), + zap.String("network", networkName), + zap.String("gateway_id", gateway.ID), + zap.String("gateway_network", gateway.Network), + zap.String("invoke_uri", gateway.InvokeURI)) var ownerRef string if sr.OwnerRef != nil && !sr.OwnerRef.IsZero() { @@ -125,6 +131,13 @@ func (a *WalletAPI) create(r *http.Request, account *model.Account, token *sresp a.logger.Info("Managed wallet created for organization", mzap.ObjRef("organization_ref", orgRef), zap.String("wallet_ref", walletRef), mzap.StorableRef(account), zap.String("gateway_id", gateway.ID), zap.String("network", gateway.Network)) + a.rememberWalletRoute(ctx, orgRef.Hex(), walletRef, networkName, gateway.ID) + a.rememberWalletRoute(ctx, orgRef.Hex(), walletRef, gateway.Network, gateway.ID) + a.logger.Debug("Persisted wallet route after wallet creation", + zap.String("organization_ref", orgRef.Hex()), + zap.String("wallet_ref", walletRef), + zap.String("network", networkName), + zap.String("gateway_id", gateway.ID)) return sresponse.Success(a.logger, token) } diff --git a/api/server/internal/server/walletapiimp/list.go b/api/server/internal/server/walletapiimp/list.go index 195d2a0e..07c3d6eb 100644 --- a/api/server/internal/server/walletapiimp/list.go +++ b/api/server/internal/server/walletapiimp/list.go @@ -58,6 +58,9 @@ func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token * a.logger.Debug("No CRYPTO rail gateways found in discovery") return sresponse.Wallets(a.logger, nil, token) } + a.logger.Debug("Resolved CRYPTO gateways for wallet list", + zap.String("organization_ref", orgRef.Hex()), + zap.Int("gateway_count", len(cryptoGateways))) // Build request req := &connectorv1.ListAccountsRequest{ @@ -76,11 +79,19 @@ func (a *WalletAPI) listWallets(r *http.Request, account *model.Account, token * // Query all gateways in parallel allAccounts := a.queryAllGateways(ctx, cryptoGateways, req) dedupedAccounts := dedupeAccountsByWalletRef(allAccounts) + a.logger.Debug("Wallet list fan-out completed", + zap.String("organization_ref", orgRef.Hex()), + zap.Int("accounts_raw", len(allAccounts)), + zap.Int("accounts_deduped", len(dedupedAccounts)), + zap.Int("gateway_count", len(cryptoGateways))) if len(dedupedAccounts) != len(allAccounts) { a.logger.Debug("Deduplicated duplicate wallets from gateway fan-out", zap.Int("before", len(allAccounts)), zap.Int("after", len(dedupedAccounts))) } + for _, account := range dedupedAccounts { + a.rememberWalletRoute(ctx, orgRef.Hex(), accountWalletRef(account), accountNetwork(account), "") + } return sresponse.WalletsFromAccounts(a.logger, dedupedAccounts, token) } @@ -162,6 +173,9 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G var mu sync.Mutex var wg sync.WaitGroup allAccounts := make([]*connectorv1.Account, 0) + a.logger.Debug("Starting wallet list gateway fan-out", + zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), + zap.Int("gateway_count", len(gateways))) for _, gw := range gateways { wg.Add(1) @@ -181,6 +195,9 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G mu.Lock() allAccounts = append(allAccounts, accounts...) mu.Unlock() + for _, account := range accounts { + a.rememberWalletRoute(ctx, strings.TrimSpace(req.GetOrganizationRef()), accountWalletRef(account), accountNetwork(account), gateway.ID) + } a.logger.Debug("Queried gateway successfully", zap.String("gateway_id", gateway.ID), @@ -190,6 +207,10 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G } wg.Wait() + a.logger.Debug("Finished wallet list gateway fan-out", + zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), + zap.Int("accounts_raw", len(allAccounts)), + zap.Int("gateway_count", len(gateways))) return allAccounts } diff --git a/api/server/internal/server/walletapiimp/routing.go b/api/server/internal/server/walletapiimp/routing.go new file mode 100644 index 00000000..f9072f74 --- /dev/null +++ b/api/server/internal/server/walletapiimp/routing.go @@ -0,0 +1,136 @@ +package walletapiimp + +import ( + "context" + "errors" + "strings" + + "github.com/tech/sendico/pkg/discovery" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/model" + connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" + "go.uber.org/zap" +) + +func normalizeNetworkName(raw string) string { + value := strings.TrimSpace(raw) + if value == "" { + return "" + } + + if idx := strings.Index(value, "-"); idx > 0 { + value = value[:idx] + } + + value = strings.ToLower(value) + value = strings.TrimPrefix(value, "chain_network_") + return strings.TrimSpace(value) +} + +func (a *WalletAPI) rememberWalletRoute(ctx context.Context, organizationRef string, walletRef string, network string, gatewayID string) { + if a.routes == nil { + return + } + + walletRef = strings.TrimSpace(walletRef) + organizationRef = strings.TrimSpace(organizationRef) + network = normalizeNetworkName(network) + gatewayID = strings.TrimSpace(gatewayID) + + if walletRef == "" || organizationRef == "" || (network == "" && gatewayID == "") { + return + } + + if err := a.routes.Upsert(ctx, &model.ChainWalletRoute{ + OrganizationRef: organizationRef, + WalletRef: walletRef, + Network: network, + GatewayID: gatewayID, + }); err != nil { + a.logger.Warn("Failed to persist wallet route", + zap.String("organization_ref", organizationRef), + zap.String("wallet_ref", walletRef), + zap.String("network", network), + zap.String("gateway_id", gatewayID), + zap.Error(err)) + } else { + a.logger.Debug("Persisted wallet route", + zap.String("organization_ref", organizationRef), + zap.String("wallet_ref", walletRef), + zap.String("network", network), + zap.String("gateway_id", gatewayID)) + } +} + +func (a *WalletAPI) walletRoute(ctx context.Context, organizationRef string, walletRef string) (*model.ChainWalletRoute, error) { + if a.routes == nil { + return nil, nil + } + + walletRef = strings.TrimSpace(walletRef) + organizationRef = strings.TrimSpace(organizationRef) + if walletRef == "" || organizationRef == "" { + return nil, nil + } + + route, err := a.routes.Get(ctx, organizationRef, walletRef) + if err != nil { + if errors.Is(err, merrors.ErrNoData) { + return nil, nil + } + return nil, err + } + return route, nil +} + +func findGatewayForRoute(gateways []discovery.GatewaySummary, route *model.ChainWalletRoute) *discovery.GatewaySummary { + if route == nil { + return nil + } + + gatewayID := strings.TrimSpace(route.GatewayID) + if gatewayID != "" { + for _, gw := range gateways { + if strings.EqualFold(strings.TrimSpace(gw.ID), gatewayID) && + strings.EqualFold(gw.Rail, cryptoRail) && + gw.Healthy && + strings.TrimSpace(gw.InvokeURI) != "" { + return &gw + } + } + } + + return findGatewayForNetwork(gateways, route.Network) +} + +func accountNetwork(account *connectorv1.Account) string { + if account == nil { + return "" + } + + if details := account.GetProviderDetails(); details != nil { + if field, ok := details.GetFields()["network"]; ok && field != nil { + if network := normalizeNetworkName(field.GetStringValue()); network != "" { + return network + } + } + } + + return normalizeNetworkName(account.GetAsset()) +} + +func dropGatewayByInvokeURI(gateways []discovery.GatewaySummary, invokeURI string) []discovery.GatewaySummary { + invokeURI = strings.ToLower(strings.TrimSpace(invokeURI)) + if invokeURI == "" || len(gateways) == 0 { + return gateways + } + + result := make([]discovery.GatewaySummary, 0, len(gateways)) + for _, gw := range gateways { + if strings.ToLower(strings.TrimSpace(gw.InvokeURI)) == invokeURI { + continue + } + result = append(result, gw) + } + return result +} diff --git a/api/server/internal/server/walletapiimp/routing_test.go b/api/server/internal/server/walletapiimp/routing_test.go new file mode 100644 index 00000000..32913f95 --- /dev/null +++ b/api/server/internal/server/walletapiimp/routing_test.go @@ -0,0 +1,93 @@ +package walletapiimp + +import ( + "testing" + + "github.com/tech/sendico/pkg/discovery" + "github.com/tech/sendico/pkg/model" +) + +func TestFindGatewayForRoute_PrefersGatewayID(t *testing.T) { + gateways := []discovery.GatewaySummary{ + { + ID: "gw-fallback", + Rail: "CRYPTO", + Healthy: true, + InvokeURI: "chain-gw:50070", + Network: "ethereum_mainnet", + }, + { + ID: "gw-route", + Rail: "CRYPTO", + Healthy: true, + InvokeURI: "tron-gw:50071", + Network: "tron_mainnet", + }, + } + + route := &model.ChainWalletRoute{ + WalletRef: "wallet-1", + Network: "ethereum_mainnet", + GatewayID: "gw-route", + } + + selected := findGatewayForRoute(gateways, route) + if selected == nil { + t.Fatal("expected selected gateway") + } + if selected.ID != "gw-route" { + t.Fatalf("expected gw-route, got %q", selected.ID) + } +} + +func TestFindGatewayForRoute_FallsBackToNetwork(t *testing.T) { + gateways := []discovery.GatewaySummary{ + { + ID: "gw-chain", + Rail: "CRYPTO", + Healthy: true, + InvokeURI: "chain-gw:50070", + Network: "ethereum_mainnet", + }, + { + ID: "gw-tron", + Rail: "CRYPTO", + Healthy: true, + InvokeURI: "tron-gw:50071", + Network: "tron_mainnet", + }, + } + + route := &model.ChainWalletRoute{ + WalletRef: "wallet-1", + Network: "tron_mainnet", + GatewayID: "unknown", + } + + selected := findGatewayForRoute(gateways, route) + if selected == nil { + t.Fatal("expected selected gateway") + } + if selected.ID != "gw-tron" { + t.Fatalf("expected gw-tron, got %q", selected.ID) + } +} + +func TestNormalizeNetworkName(t *testing.T) { + tests := []struct { + in string + want string + }{ + {in: "CHAIN_NETWORK_TRON_MAINNET", want: "tron_mainnet"}, + {in: "tron_mainnet-USDT", want: "tron_mainnet"}, + {in: " ethereum_mainnet ", want: "ethereum_mainnet"}, + {in: "", want: ""}, + } + + for _, tc := range tests { + got := normalizeNetworkName(tc.in) + if got != tc.want { + t.Fatalf("normalizeNetworkName(%q) = %q, want %q", tc.in, got, tc.want) + } + } +} diff --git a/api/server/internal/server/walletapiimp/service.go b/api/server/internal/server/walletapiimp/service.go index 451a2af7..4007acdd 100644 --- a/api/server/internal/server/walletapiimp/service.go +++ b/api/server/internal/server/walletapiimp/service.go @@ -7,6 +7,7 @@ import ( api "github.com/tech/sendico/pkg/api/http" "github.com/tech/sendico/pkg/auth" "github.com/tech/sendico/pkg/db/chainassets" + "github.com/tech/sendico/pkg/db/chainwalletroutes" "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" msg "github.com/tech/sendico/pkg/messaging" @@ -34,6 +35,7 @@ type WalletAPI struct { walletsPermissionRef bson.ObjectID balancesPermissionRef bson.ObjectID assets chainassets.DB + routes chainwalletroutes.DB // Gateway connection settings dialTimeout time.Duration @@ -66,6 +68,10 @@ func CreateAPI(apiCtx eapi.API) (*WalletAPI, error) { p.logger.Warn("Failed to create asstes db", zap.Error(err)) return nil, err } + if p.routes, err = apiCtx.DBFactory().NewChainWalletRoutesDB(); err != nil { + p.logger.Warn("Failed to create chain wallet routes db", zap.Error(err)) + return nil, err + } walletsPolicy, err := apiCtx.Permissions().GetPolicyDescription(context.Background(), mservice.ChainWallets) if err != nil { -- 2.49.1