improved ledger account discovery #307

Merged
tech merged 1 commits from ledger-306 into main 2026-01-22 19:06:11 +00:00
23 changed files with 480 additions and 53 deletions

View File

@@ -52,10 +52,6 @@ func (i *Imp) startMetrics(cfg *metricsConfig) {
ReadHeaderTimeout: 5 * time.Second,
}
if healthRouter != nil {
healthRouter.SetStatus(health.SSRunning)
}
go func() {
i.logger.Info("Prometheus endpoint listening", zap.String("address", address))
if err := i.metricsSrv.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
@@ -83,3 +79,10 @@ func (i *Imp) shutdownMetrics(ctx context.Context) {
}
i.metricsSrv = nil
}
func (i *Imp) setMetricsStatus(status health.ServiceStatus) {
if i == nil || i.metricsHealth == nil {
return
}
i.metricsHealth.SetStatus(status)
}

View File

@@ -5,6 +5,7 @@ import (
"strings"
"time"
"github.com/tech/sendico/pkg/api/routers/health"
"github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap"
)
@@ -46,12 +47,14 @@ func (i *Imp) Start() error {
if err := i.startDiscovery(cfg); err != nil {
i.stopDiscovery()
i.setMetricsStatus(health.SSTerminating)
ctx, cancel := context.WithTimeout(context.Background(), i.shutdownTimeout())
i.shutdownMetrics(ctx)
cancel()
return err
}
i.setMetricsStatus(health.SSRunning)
i.logger.Info("Discovery service ready", zap.String("messaging_driver", messagingDriver))
<-i.stopCh

View File

@@ -176,7 +176,7 @@ func buildQuoteMeta(meta *oraclev1.RequestMeta) *model.QuoteMeta {
if trace != nil {
qm.RequestRef = trace.GetRequestRef()
qm.TraceRef = trace.GetTraceRef()
qm.IdempotencyKey = trace.GetIdempotencyKey()
qm.IdempotencyKey = strings.TrimSpace(trace.GetIdempotencyKey())
}
if org := strings.TrimSpace(meta.GetOrganizationRef()); org != "" {
if objID, err := primitive.ObjectIDFromHex(org); err == nil {

View File

@@ -32,6 +32,17 @@ func NewQuotes(logger mlogger.Logger, db *mongo.Database, txFactory transaction.
},
Unique: true,
},
{
Keys: []ri.Key{
{Field: "meta.organizationRef", Sort: ri.Asc},
{Field: "meta.idempotencyKey", Sort: ri.Asc},
},
Unique: true,
Name: "quotes_meta_org_idempotency_key",
PartialFilter: repository.Query().
Comparison(repository.Field("meta.idempotencyKey"), builder.Ne, "").
Comparison(repository.Field("meta.organizationRef"), builder.Exists, true),
},
{
Keys: []ri.Key{
{Field: "status", Sort: ri.Asc},

View File

@@ -31,6 +31,7 @@ type Client interface {
CreateAccount(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error)
ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error)
ListConnectorAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error)
PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error)
PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error)
TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error)
@@ -231,7 +232,7 @@ func (c *ledgerClient) ListAccounts(ctx context.Context, req *ledgerv1.ListAccou
if req == nil || strings.TrimSpace(req.GetOrganizationRef()) == "" {
return nil, merrors.InvalidArgument("ledger: organization_ref is required")
}
resp, err := c.client.ListAccounts(ctx, &connectorv1.ListAccountsRequest{OwnerRef: strings.TrimSpace(req.GetOrganizationRef())})
resp, err := c.client.ListAccounts(ctx, &connectorv1.ListAccountsRequest{OrganizationRef: strings.TrimSpace(req.GetOrganizationRef())})
if err != nil {
return nil, err
}
@@ -242,6 +243,15 @@ func (c *ledgerClient) ListAccounts(ctx context.Context, req *ledgerv1.ListAccou
return &ledgerv1.ListAccountsResponse{Accounts: accounts}, nil
}
func (c *ledgerClient) ListConnectorAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
ctx, cancel := c.callContext(ctx)
defer cancel()
if req == nil {
return nil, merrors.InvalidArgument("ledger: request is required")
}
return c.client.ListAccounts(ctx, req)
}
func (c *ledgerClient) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
return c.submitLedgerOperation(ctx, connectorv1.OperationType_CREDIT, "", req.GetLedgerAccountRef(), req.GetMoney(), req)
}
@@ -481,6 +491,10 @@ func ledgerAccountFromConnector(account *connectorv1.Account) *ledgerv1.LedgerAc
if ref := account.GetRef(); ref != nil {
accountID = strings.TrimSpace(ref.GetAccountId())
}
organizationRef := strings.TrimSpace(account.GetOwnerRef())
if v := strings.TrimSpace(fmt.Sprint(details["organization_ref"])); v != "" {
organizationRef = v
}
describable := account.GetDescribable()
label := strings.TrimSpace(account.GetLabel())
if describable == nil && label != "" {
@@ -495,7 +509,7 @@ func ledgerAccountFromConnector(account *connectorv1.Account) *ledgerv1.LedgerAc
}
return &ledgerv1.LedgerAccount{
LedgerAccountRef: accountID,
OrganizationRef: strings.TrimSpace(account.GetOwnerRef()),
OrganizationRef: organizationRef,
AccountCode: accountCode,
AccountType: accountType,
Currency: strings.TrimSpace(account.GetAsset()),

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/tech/sendico/pkg/payments/rail"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
)
@@ -15,6 +16,7 @@ type Fake struct {
HoldBalanceFn func(ctx context.Context, accountID string, amount string) error
CreateAccountFn func(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error)
ListAccountsFn func(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error)
ListConnectorAccountsFn func(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error)
PostCreditWithChargesFn func(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error)
PostDebitWithChargesFn func(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error)
TransferInternalFn func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error)
@@ -60,6 +62,13 @@ func (f *Fake) ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsReque
return &ledgerv1.ListAccountsResponse{}, nil
}
func (f *Fake) ListConnectorAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
if f.ListConnectorAccountsFn != nil {
return f.ListConnectorAccountsFn(ctx, req)
}
return &connectorv1.ListAccountsResponse{}, nil
}
func (f *Fake) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
if f.PostCreditWithChargesFn != nil {
return f.PostCreditWithChargesFn(ctx, req)

View File

@@ -121,10 +121,17 @@ func (c *connectorAdapter) GetAccount(ctx context.Context, req *connectorv1.GetA
}
func (c *connectorAdapter) ListAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
if req == nil || strings.TrimSpace(req.GetOwnerRef()) == "" {
return nil, merrors.InvalidArgument("list_accounts: owner_ref is required")
if req == nil {
return nil, merrors.InvalidArgument("list_accounts: request is required")
}
resp, err := c.svc.ListAccounts(ctx, &ledgerv1.ListAccountsRequest{OrganizationRef: strings.TrimSpace(req.GetOwnerRef())})
orgRef := strings.TrimSpace(req.GetOrganizationRef())
if orgRef == "" {
orgRef = strings.TrimSpace(req.GetOwnerRef())
}
if orgRef == "" {
return nil, merrors.InvalidArgument("list_accounts: organization_ref is required")
}
resp, err := c.svc.ListAccounts(ctx, &ledgerv1.ListAccountsRequest{OrganizationRef: orgRef})
if err != nil {
return nil, err
}
@@ -343,6 +350,7 @@ func ledgerAccountToConnector(account *ledgerv1.LedgerAccount) *connectorv1.Acco
"status": account.GetStatus().String(),
"allow_negative": account.GetAllowNegative(),
"is_settlement": account.GetIsSettlement(),
"organization_ref": strings.TrimSpace(account.GetOrganizationRef()),
})
describable := ledgerAccountDescribable(account)
return &connectorv1.Account{
@@ -354,7 +362,7 @@ func ledgerAccountToConnector(account *ledgerv1.LedgerAccount) *connectorv1.Acco
Asset: strings.TrimSpace(account.GetCurrency()),
State: ledgerAccountState(account.GetStatus()),
Label: strings.TrimSpace(account.GetAccountCode()),
OwnerRef: strings.TrimSpace(account.GetOrganizationRef()),
OwnerRef: "",
ProviderDetails: details,
CreatedAt: account.GetCreatedAt(),
UpdatedAt: account.GetUpdatedAt(),

View File

@@ -35,6 +35,9 @@ messaging:
reconnect_wait: 5
buffer_size: 1024
# Retain quote records after expiry to allow long-running payments to complete.
quote_retention_hours: 72
# Service endpoints are sourced from discovery; no static overrides.
card_gateways:
monetix:

View File

@@ -22,6 +22,7 @@ type config struct {
CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"`
FeeAccounts map[string]string `yaml:"fee_ledger_accounts"`
GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"`
QuoteRetentionHrs int `yaml:"quote_retention_hours"`
}
type clientConfig struct {
@@ -84,6 +85,13 @@ func (c clientConfig) callTimeout() time.Duration {
return time.Duration(c.CallTimeoutSecs) * time.Second
}
func (c *config) quoteRetention() time.Duration {
if c == nil || c.QuoteRetentionHrs <= 0 {
return 72 * time.Hour
}
return time.Duration(c.QuoteRetentionHrs) * time.Hour
}
func (i *Imp) loadConfig() (*config, error) {
data, err := os.ReadFile(i.file)
if err != nil {

View File

@@ -9,6 +9,7 @@ import (
"github.com/tech/sendico/pkg/payments/rail"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
@@ -109,6 +110,14 @@ func (c *discoveryLedgerClient) ListAccounts(ctx context.Context, req *ledgerv1.
return client.ListAccounts(ctx, req)
}
func (c *discoveryLedgerClient) ListConnectorAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.ListConnectorAccounts(ctx, req)
}
func (c *discoveryLedgerClient) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {

View File

@@ -38,8 +38,9 @@ func (i *Imp) Start() error {
i.initDiscovery(cfg)
quoteRetention := cfg.quoteRetention()
repoFactory := func(logger mlogger.Logger, conn *db.MongoConnection) (storage.Repository, error) {
return mongostorage.New(logger, conn)
return mongostorage.New(logger, conn, mongostorage.WithQuoteRetention(quoteRetention))
}
var broker mb.Broker

View File

@@ -122,7 +122,7 @@ func (h *quotePaymentCommand) quotePayment(
return quote, nil
}
existing, err := quotesStore.GetByIdempotencyKey(ctx, qc.idempotencyKey)
existing, err := quotesStore.GetByIdempotencyKey(ctx, qc.orgRef, qc.idempotencyKey)
if err != nil && !errors.Is(err, storage.ErrQuoteNotFound) {
h.logger.Warn(
"Failed to lookup quote by idempotency key",
@@ -172,7 +172,7 @@ func (h *quotePaymentCommand) quotePayment(
if err := quotesStore.Create(ctx, record); err != nil {
if errors.Is(err, storage.ErrDuplicateQuote) {
existing, getErr := quotesStore.GetByIdempotencyKey(ctx, qc.idempotencyKey)
existing, getErr := quotesStore.GetByIdempotencyKey(ctx, qc.orgRef, qc.idempotencyKey)
if getErr == nil && existing != nil {
if existing.Hash != qc.hash {
return nil, errIdempotencyParamMismatch
@@ -372,7 +372,7 @@ func (h *quotePaymentsCommand) tryReuse(
qc *quotePaymentsCtx,
) (*model.PaymentQuoteRecord, bool, error) {
rec, err := quotesStore.GetByIdempotencyKey(ctx, qc.idempotencyKey)
rec, err := quotesStore.GetByIdempotencyKey(ctx, qc.orgRef, qc.idempotencyKey)
if err != nil {
if errors.Is(err, storage.ErrQuoteNotFound) {
return nil, false, nil

View File

@@ -8,6 +8,7 @@ import (
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/payments/rail"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -23,7 +24,7 @@ func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Pa
p.logger.Error("Ledger client unavailable", zap.String("action", "debit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
tx, err := p.ledgerTxForAction(payment, amount, charges, idempotencyKey, idx, model.RailOperationDebit, quote)
tx, err := p.ledgerTxForAction(ctx, payment, amount, charges, idempotencyKey, idx, model.RailOperationDebit, quote)
if err != nil {
p.logger.Warn("Ledger debit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
@@ -45,7 +46,7 @@ func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.P
p.logger.Error("Ledger client unavailable", zap.String("action", "credit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable")
}
tx, err := p.ledgerTxForAction(payment, amount, nil, idempotencyKey, idx, model.RailOperationCredit, quote)
tx, err := p.ledgerTxForAction(ctx, payment, amount, nil, idempotencyKey, idx, model.RailOperationCredit, quote)
if err != nil {
p.logger.Warn("Ledger credit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err
@@ -174,7 +175,7 @@ func (p *paymentExecutor) postLedgerRelease(ctx context.Context, payment *model.
return entryRef, nil
}
func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (rail.LedgerTx, error) {
func (p *paymentExecutor) ledgerTxForAction(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (rail.LedgerTx, error) {
if payment == nil {
return rail.LedgerTx{}, merrors.InvalidArgument("ledger: payment is required")
}
@@ -205,6 +206,9 @@ func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *mone
fromRail = model.RailLedger
toRail = ledgerStepToRail(payment.PaymentPlan, idx, destRail)
accountRef, contraRef, err = ledgerDebitAccount(payment)
if err != nil {
accountRef, contraRef, err = p.resolveLedgerAccountRef(ctx, payment, amount, action)
}
if err == nil {
if blockRef := ledgerBlockAccountIfConfirmed(payment); blockRef != "" {
accountRef = blockRef
@@ -215,6 +219,9 @@ func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *mone
fromRail = ledgerStepFromRail(payment.PaymentPlan, idx, sourceRail)
toRail = model.RailLedger
accountRef, contraRef, err = ledgerCreditAccount(payment)
if err != nil {
accountRef, contraRef, err = p.resolveLedgerAccountRef(ctx, payment, amount, action)
}
externalRef = ledgerExternalReference(payment.ExecutionPlan, idx)
default:
return rail.LedgerTx{}, merrors.InvalidArgument("ledger: unsupported action")
@@ -321,6 +328,116 @@ func ledgerExternalReference(plan *model.ExecutionPlan, idx int) string {
return ""
}
func (p *paymentExecutor) resolveLedgerAccountRef(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, action model.RailOperation) (string, string, error) {
if payment == nil {
return "", "", merrors.InvalidArgument("ledger: payment is required")
}
if amount == nil || strings.TrimSpace(amount.GetCurrency()) == "" {
return "", "", merrors.InvalidArgument("ledger: amount is required")
}
switch action {
case model.RailOperationCredit:
if account, _, err := ledgerDebitAccount(payment); err == nil && strings.TrimSpace(account) != "" {
setLedgerAccountAttributes(payment, account)
return account, "", nil
}
case model.RailOperationDebit:
if account, _, err := ledgerCreditAccount(payment); err == nil && strings.TrimSpace(account) != "" {
setLedgerAccountAttributes(payment, account)
return account, "", nil
}
}
account, err := p.resolveOrgOwnedLedgerAccount(ctx, payment, amount)
if err != nil {
return "", "", err
}
setLedgerAccountAttributes(payment, account)
return account, "", nil
}
func (p *paymentExecutor) resolveOrgOwnedLedgerAccount(ctx context.Context, payment *model.Payment, amount *moneyv1.Money) (string, error) {
if payment == nil {
return "", merrors.InvalidArgument("ledger: payment is required")
}
if payment.OrganizationRef == primitive.NilObjectID {
return "", merrors.InvalidArgument("ledger: organization_ref is required")
}
if amount == nil || strings.TrimSpace(amount.GetCurrency()) == "" {
return "", merrors.InvalidArgument("ledger: amount is required")
}
if p == nil || p.deps == nil || p.deps.ledger.client == nil {
return "", merrors.Internal("ledger_client_unavailable")
}
currency := strings.TrimSpace(amount.GetCurrency())
resp, err := p.deps.ledger.client.ListConnectorAccounts(ctx, &connectorv1.ListAccountsRequest{
OrganizationRef: payment.OrganizationRef.Hex(),
Kind: connectorv1.AccountKind_LEDGER_ACCOUNT,
Asset: currency,
})
if err != nil {
return "", err
}
for _, account := range resp.GetAccounts() {
if account == nil {
continue
}
if account.GetKind() != connectorv1.AccountKind_LEDGER_ACCOUNT {
continue
}
asset := strings.TrimSpace(account.GetAsset())
if asset == "" || !strings.EqualFold(asset, currency) {
continue
}
if strings.TrimSpace(account.GetOwnerRef()) != "" {
continue
}
if connectorAccountIsSettlement(account) {
continue
}
if ref := account.GetRef(); ref != nil {
if accountID := strings.TrimSpace(ref.GetAccountId()); accountID != "" {
return accountID, nil
}
}
}
return "", merrors.InvalidArgument("ledger: org-owned account not found")
}
func connectorAccountIsSettlement(account *connectorv1.Account) bool {
if account == nil || account.GetProviderDetails() == nil {
return false
}
details := account.GetProviderDetails().AsMap()
val, ok := details["is_settlement"]
if !ok {
return false
}
switch v := val.(type) {
case bool:
return v
case string:
return strings.EqualFold(strings.TrimSpace(v), "true")
default:
return false
}
}
func setLedgerAccountAttributes(payment *model.Payment, accountRef string) {
if payment == nil || strings.TrimSpace(accountRef) == "" {
return
}
if payment.Intent.Attributes == nil {
payment.Intent.Attributes = map[string]string{}
}
if attributeLookup(payment.Intent.Attributes, "ledger_debit_account_ref", "ledgerDebitAccountRef") == "" {
payment.Intent.Attributes["ledger_debit_account_ref"] = accountRef
}
if attributeLookup(payment.Intent.Attributes, "ledger_credit_account_ref", "ledgerCreditAccountRef") == "" {
payment.Intent.Attributes["ledger_credit_account_ref"] = accountRef
}
}
func ledgerDebitAccount(payment *model.Payment) (string, string, error) {
if payment == nil {
return "", "", merrors.InvalidArgument("ledger: payment is required")

View File

@@ -0,0 +1,96 @@
package orchestrator
import (
"context"
"testing"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/payments/rail"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
)
func TestLedgerAccountResolution_UsesOrgOwnedAccount(t *testing.T) {
ctx := context.Background()
accountID := "ledger:org:usd"
providerDetails, err := structpb.NewStruct(map[string]interface{}{
"is_settlement": false,
})
if err != nil {
t.Fatalf("provider details build error: %v", err)
}
listCalls := 0
ledgerAccountRefs := make([]string, 0, 2)
ledgerFake := &ledgerclient.Fake{
ListConnectorAccountsFn: func(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
listCalls++
return &connectorv1.ListAccountsResponse{
Accounts: []*connectorv1.Account{
{
Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: accountID},
Kind: connectorv1.AccountKind_LEDGER_ACCOUNT,
Asset: "USD",
OwnerRef: "",
ProviderDetails: providerDetails,
},
},
}, nil
},
CreateTransactionFn: func(ctx context.Context, tx rail.LedgerTx) (string, error) {
ledgerAccountRefs = append(ledgerAccountRefs, tx.LedgerAccountRef)
return "entry-1", nil
},
}
svc := &Service{
logger: zap.NewNop(),
deps: serviceDependencies{
ledger: ledgerDependency{
client: ledgerFake,
internal: ledgerFake,
},
},
}
executor := newPaymentExecutor(&svc.deps, svc.logger, svc)
amount := &paymenttypes.Money{Currency: "USD", Amount: "10"}
payment := &model.Payment{
PaymentRef: "pay-1",
IdempotencyKey: "pay-1",
Intent: model.PaymentIntent{
Kind: model.PaymentKindPayout,
},
PaymentPlan: &model.PaymentPlan{
ID: "pay-1",
IdempotencyKey: "pay-1",
Steps: []*model.PaymentStep{
{StepID: "ledger_debit", Rail: model.RailLedger, Action: model.RailOperationDebit, Amount: cloneMoney(amount)},
{StepID: "ledger_credit", Rail: model.RailLedger, Action: model.RailOperationCredit, DependsOn: []string{"ledger_debit"}, Amount: cloneMoney(amount)},
},
},
}
payment.OrganizationRef = primitive.NewObjectID()
store := newStubPaymentsStore()
store.payments[payment.PaymentRef] = payment
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan error: %v", err)
}
if listCalls == 0 {
t.Fatalf("expected ledger accounts lookup")
}
if len(ledgerAccountRefs) != 2 {
t.Fatalf("expected two ledger transactions, got %d", len(ledgerAccountRefs))
}
if ledgerAccountRefs[0] != accountID || ledgerAccountRefs[1] != accountID {
t.Fatalf("unexpected ledger account refs: %+v", ledgerAccountRefs)
}
}

View File

@@ -430,11 +430,14 @@ func (s *helperQuotesStore) GetByRef(_ context.Context, _ primitive.ObjectID, re
return nil, storage.ErrQuoteNotFound
}
func (s *helperQuotesStore) GetByIdempotencyKey(_ context.Context, ref string) (*model.PaymentQuoteRecord, error) {
func (s *helperQuotesStore) GetByIdempotencyKey(_ context.Context, orgRef primitive.ObjectID, ref string) (*model.PaymentQuoteRecord, error) {
if s.records == nil {
return nil, storage.ErrQuoteNotFound
}
for _, rec := range s.records {
if rec.OrganizationRef != orgRef {
continue
}
if rec.IdempotencyKey == ref {
return rec, nil
}

View File

@@ -423,11 +423,14 @@ func (s *stubQuotesStore) GetByRef(ctx context.Context, orgRef primitive.ObjectI
return nil, storage.ErrQuoteNotFound
}
func (s *stubQuotesStore) GetByIdempotencyKey(ctx context.Context, idempotencyKey string) (*model.PaymentQuoteRecord, error) {
func (s *stubQuotesStore) GetByIdempotencyKey(ctx context.Context, orgRef primitive.ObjectID, idempotencyKey string) (*model.PaymentQuoteRecord, error) {
if s.quotes == nil {
return nil, storage.ErrQuoteNotFound
}
for _, q := range s.quotes {
if q.OrganizationRef != orgRef {
continue
}
if q.IdempotencyKey == idempotencyKey {
return q, nil
}

View File

@@ -19,6 +19,7 @@ type PaymentQuoteRecord struct {
Quote *PaymentQuoteSnapshot `bson:"quote,omitempty" json:"quote,omitempty"`
Quotes []*PaymentQuoteSnapshot `bson:"quotes,omitempty" json:"quotes,omitempty"`
ExpiresAt time.Time `bson:"expiresAt" json:"expiresAt"`
PurgeAt time.Time `bson:"purgeAt,omitempty" json:"purgeAt,omitempty"`
Hash string `bson:"hash" json:"hash"`
}

View File

@@ -2,6 +2,7 @@ package mongo
import (
"context"
"time"
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
@@ -23,8 +24,22 @@ type Store struct {
plans storage.PlanTemplatesStore
}
type options struct {
quoteRetention time.Duration
}
// Option configures the Mongo-backed payments repository.
type Option func(*options)
// WithQuoteRetention sets how long payment quote records are retained after expiry.
func WithQuoteRetention(retention time.Duration) Option {
return func(opts *options) {
opts.quoteRetention = retention
}
}
// New constructs a Mongo-backed payments repository from a Mongo connection.
func New(logger mlogger.Logger, conn *db.MongoConnection) (*Store, error) {
func New(logger mlogger.Logger, conn *db.MongoConnection, opts ...Option) (*Store, error) {
if conn == nil {
return nil, merrors.InvalidArgument("payments.storage.mongo: connection is nil")
}
@@ -32,11 +47,11 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Store, error) {
quotesRepo := repository.CreateMongoRepository(conn.Database(), (&model.PaymentQuoteRecord{}).Collection())
routesRepo := repository.CreateMongoRepository(conn.Database(), (&model.PaymentRoute{}).Collection())
plansRepo := repository.CreateMongoRepository(conn.Database(), (&model.PaymentPlanTemplate{}).Collection())
return NewWithRepository(logger, conn.Ping, paymentsRepo, quotesRepo, routesRepo, plansRepo)
return NewWithRepository(logger, conn.Ping, paymentsRepo, quotesRepo, routesRepo, plansRepo, opts...)
}
// NewWithRepository constructs a payments repository using the provided primitives.
func NewWithRepository(logger mlogger.Logger, ping func(context.Context) error, paymentsRepo repository.Repository, quotesRepo repository.Repository, routesRepo repository.Repository, plansRepo repository.Repository) (*Store, error) {
func NewWithRepository(logger mlogger.Logger, ping func(context.Context) error, paymentsRepo repository.Repository, quotesRepo repository.Repository, routesRepo repository.Repository, plansRepo repository.Repository, opts ...Option) (*Store, error) {
if ping == nil {
return nil, merrors.InvalidArgument("payments.storage.mongo: ping func is nil")
}
@@ -53,12 +68,19 @@ func NewWithRepository(logger mlogger.Logger, ping func(context.Context) error,
return nil, merrors.InvalidArgument("payments.storage.mongo: plan templates repository is nil")
}
cfg := options{}
for _, opt := range opts {
if opt != nil {
opt(&cfg)
}
}
childLogger := logger.Named("storage").Named("mongo")
paymentsStore, err := store.NewPayments(childLogger, paymentsRepo)
if err != nil {
return nil, err
}
quotesStore, err := store.NewQuotes(childLogger, quotesRepo)
quotesStore, err := store.NewQuotes(childLogger, quotesRepo, cfg.quoteRetention)
if err != nil {
return nil, err
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/db/repository"
"github.com/tech/sendico/pkg/db/repository/builder"
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
@@ -19,25 +20,43 @@ import (
type Quotes struct {
logger mlogger.Logger
repo repository.Repository
retention time.Duration
}
const defaultPaymentQuoteRetention = 72 * time.Hour
// NewQuotes constructs a Mongo-backed quotes store.
func NewQuotes(logger mlogger.Logger, repo repository.Repository) (*Quotes, error) {
func NewQuotes(logger mlogger.Logger, repo repository.Repository, retention time.Duration) (*Quotes, error) {
if repo == nil {
return nil, merrors.InvalidArgument("quotesStore: repository is nil")
}
if retention <= 0 {
logger.Info("Using default retention duration", zap.Duration("default_retention", defaultPaymentQuoteRetention))
retention = defaultPaymentQuoteRetention
}
logger.Info("Using retention duration", zap.Duration("retention", retention))
indexes := []*ri.Definition{
{
Keys: []ri.Key{{Field: "quoteRef", Sort: ri.Asc}},
Unique: true,
},
{
Keys: []ri.Key{
{Field: "organizationRef", Sort: ri.Asc},
{Field: "idempotencyKey", Sort: ri.Asc},
},
Unique: true,
Name: "payment_quotes_org_idempotency_key",
PartialFilter: repository.Query().Comparison(repository.Field("idempotencyKey"), builder.Ne, ""),
},
{
Keys: []ri.Key{{Field: "organizationRef", Sort: ri.Asc}},
},
{
Keys: []ri.Key{{Field: "expiresAt", Sort: ri.Asc}},
Keys: []ri.Key{{Field: "purgeAt", Sort: ri.Asc}},
TTL: int32Ptr(0),
Name: "payment_quotes_purge_at_ttl",
},
}
@@ -51,6 +70,7 @@ func NewQuotes(logger mlogger.Logger, repo repository.Repository) (*Quotes, erro
return &Quotes{
logger: logger.Named("quotes"),
repo: repo,
retention: retention,
}, nil
}
@@ -65,12 +85,16 @@ func (q *Quotes) Create(ctx context.Context, quote *model.PaymentQuoteRecord) er
if quote.OrganizationRef == primitive.NilObjectID {
return merrors.InvalidArgument("quotesStore: organization_ref is required")
}
quote.IdempotencyKey = strings.TrimSpace(quote.IdempotencyKey)
if quote.IdempotencyKey == "" {
return merrors.InvalidArgument("quotesStore: idempotency key is required")
}
if quote.ExpiresAt.IsZero() {
return merrors.InvalidArgument("quotesStore: expires_at is required")
}
if quote.PurgeAt.IsZero() || quote.PurgeAt.Before(quote.ExpiresAt) {
quote.PurgeAt = quote.ExpiresAt.Add(q.retention)
}
if quote.Intent.Attributes != nil {
for k, v := range quote.Intent.Attributes {
quote.Intent.Attributes[k] = strings.TrimSpace(v)
@@ -123,13 +147,16 @@ func (q *Quotes) GetByRef(ctx context.Context, orgRef primitive.ObjectID, quoteR
return entity, nil
}
func (q *Quotes) GetByIdempotencyKey(ctx context.Context, idempotencyKey string) (*model.PaymentQuoteRecord, error) {
func (q *Quotes) GetByIdempotencyKey(ctx context.Context, orgRef primitive.ObjectID, idempotencyKey string) (*model.PaymentQuoteRecord, error) {
idempotencyKey = strings.TrimSpace(idempotencyKey)
if idempotencyKey == "" {
return nil, merrors.InvalidArgument("quotesStore: empty idempotency key")
}
if orgRef == primitive.NilObjectID {
return nil, merrors.InvalidArgument("quotesStore: organization_ref is required")
}
entity := &model.PaymentQuoteRecord{}
query := repository.Filter("idempotencyKey", idempotencyKey)
query := repository.OrgFilter(orgRef).And(repository.Filter("idempotencyKey", idempotencyKey))
if err := q.repo.FindOneByFilter(ctx, query, entity); err != nil {
if errors.Is(err, merrors.ErrNoData) {
return nil, storage.ErrQuoteNotFound

View File

@@ -55,7 +55,7 @@ type PaymentsStore interface {
type QuotesStore interface {
Create(ctx context.Context, quote *model.PaymentQuoteRecord) error
GetByRef(ctx context.Context, orgRef primitive.ObjectID, quoteRef string) (*model.PaymentQuoteRecord, error)
GetByIdempotencyKey(ctx context.Context, idempotencyKey string) (*model.PaymentQuoteRecord, error)
GetByIdempotencyKey(ctx context.Context, orgRef primitive.ObjectID, idempotencyKey string) (*model.PaymentQuoteRecord, error)
}
// RoutesStore manages allowed routing transitions.

View File

@@ -0,0 +1,65 @@
package discovery
import (
"errors"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/tech/sendico/pkg/merrors"
)
var (
metricsOnce sync.Once
eventLatency *prometheus.HistogramVec
eventStatus *prometheus.CounterVec
)
func initMetrics() {
metricsOnce.Do(func() {
eventLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "sendico",
Subsystem: "discovery",
Name: "event_latency_seconds",
Help: "Latency distribution for discovery event handling.",
Buckets: prometheus.DefBuckets,
}, []string{"event"})
eventStatus = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sendico",
Subsystem: "discovery",
Name: "event_requests_total",
Help: "Total number of discovery events handled, grouped by event and status.",
}, []string{"event", "status"})
})
}
func observeEvent(event string, err error, duration time.Duration) {
if eventLatency != nil {
eventLatency.WithLabelValues(event).Observe(duration.Seconds())
}
if eventStatus != nil {
eventStatus.WithLabelValues(event, statusLabel(err)).Inc()
}
}
func statusLabel(err error) string {
switch {
case err == nil:
return "ok"
case errors.Is(err, merrors.ErrInvalidArg):
return "invalid_argument"
case errors.Is(err, merrors.ErrNoData):
return "not_found"
case errors.Is(err, merrors.ErrDataConflict):
return "conflict"
case errors.Is(err, merrors.ErrAccessDenied):
return "denied"
case errors.Is(err, merrors.ErrInternal):
return "internal"
default:
return "error"
}
}

View File

@@ -59,6 +59,7 @@ func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg
return nil, merrors.InvalidArgument("discovery registry: no logger provided", "logger")
}
logger = logger.Named("discovery_registry")
initMetrics()
sender = strings.TrimSpace(sender)
if sender == "" {
sender = "discovery"
@@ -147,9 +148,14 @@ func (s *RegistryService) Stop() {
})
}
func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) error {
func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) (err error) {
start := time.Now()
defer func() {
observeEvent("announce", err, time.Since(start))
}()
var payload Announcement
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
if err = json.Unmarshal(env.GetData(), &payload); err != nil {
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery announce payload", fields...)
return err
@@ -174,9 +180,14 @@ func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) err
return nil
}
func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) error {
func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) (err error) {
start := time.Now()
defer func() {
observeEvent("heartbeat", err, time.Since(start))
}()
var payload Heartbeat
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
if err = json.Unmarshal(env.GetData(), &payload); err != nil {
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery heartbeat payload", fields...)
return err
@@ -208,13 +219,18 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) er
return nil
}
func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) error {
func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) (err error) {
start := time.Now()
defer func() {
observeEvent("lookup", err, time.Since(start))
}()
if s.producer == nil {
s.logWarn("Discovery lookup request ignored: producer not configured", envelopeFields(env)...)
return nil
}
var payload LookupRequest
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
if err = json.Unmarshal(env.GetData(), &payload); err != nil {
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery lookup payload", fields...)
return err
@@ -222,7 +238,7 @@ func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) error
resp := s.registry.Lookup(time.Now())
resp.RequestID = strings.TrimSpace(payload.RequestID)
s.logDebug("Discovery lookup prepared", zap.String("request_id", resp.RequestID), zap.Int("services", len(resp.Services)), zap.Int("gateways", len(resp.Gateways)))
if err := s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil {
if err = s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil {
fields := []zap.Field{zap.String("request_id", resp.RequestID), zap.Error(err)}
s.logWarn("Failed to publish discovery lookup response", fields...)
return err

View File

@@ -6,17 +6,25 @@ import 'package:pshared/models/payment/type.dart';
IconData iconForPaymentType(PaymentType type) {
switch (type) {
case PaymentType.bankAccount:
return Icons.account_balance;
return Icons.account_balance; // bank / institution
case PaymentType.iban:
return Icons.language;
return Icons.public; // cross-border / international account
case PaymentType.ledger:
return Icons.account_balance; // internal ledger account (best fit)
case PaymentType.wallet:
return Icons.account_balance_wallet;
return Icons.account_balance_wallet; // user wallet / stored funds
case PaymentType.card:
return Icons.credit_card;
case PaymentType.cardToken:
return Icons.credit_card; // card rail
case PaymentType.externalChain:
return Icons.currency_bitcoin;
//TODO: define new payment methods
default:
return Icons.question_mark;
return Icons.hub; // network / blockchain, not "bitcoin"
case PaymentType.managedWallet:
return Icons.currency_bitcoin; // custodial / managed / controlled
}
}