improved ledger account discovery #307

Merged
tech merged 1 commits from ledger-306 into main 2026-01-22 19:06:11 +00:00
23 changed files with 480 additions and 53 deletions

View File

@@ -52,10 +52,6 @@ func (i *Imp) startMetrics(cfg *metricsConfig) {
ReadHeaderTimeout: 5 * time.Second, ReadHeaderTimeout: 5 * time.Second,
} }
if healthRouter != nil {
healthRouter.SetStatus(health.SSRunning)
}
go func() { go func() {
i.logger.Info("Prometheus endpoint listening", zap.String("address", address)) i.logger.Info("Prometheus endpoint listening", zap.String("address", address))
if err := i.metricsSrv.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { if err := i.metricsSrv.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
@@ -83,3 +79,10 @@ func (i *Imp) shutdownMetrics(ctx context.Context) {
} }
i.metricsSrv = nil i.metricsSrv = nil
} }
func (i *Imp) setMetricsStatus(status health.ServiceStatus) {
if i == nil || i.metricsHealth == nil {
return
}
i.metricsHealth.SetStatus(status)
}

View File

@@ -5,6 +5,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/tech/sendico/pkg/api/routers/health"
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap" "go.uber.org/zap"
) )
@@ -46,12 +47,14 @@ func (i *Imp) Start() error {
if err := i.startDiscovery(cfg); err != nil { if err := i.startDiscovery(cfg); err != nil {
i.stopDiscovery() i.stopDiscovery()
i.setMetricsStatus(health.SSTerminating)
ctx, cancel := context.WithTimeout(context.Background(), i.shutdownTimeout()) ctx, cancel := context.WithTimeout(context.Background(), i.shutdownTimeout())
i.shutdownMetrics(ctx) i.shutdownMetrics(ctx)
cancel() cancel()
return err return err
} }
i.setMetricsStatus(health.SSRunning)
i.logger.Info("Discovery service ready", zap.String("messaging_driver", messagingDriver)) i.logger.Info("Discovery service ready", zap.String("messaging_driver", messagingDriver))
<-i.stopCh <-i.stopCh

View File

@@ -176,7 +176,7 @@ func buildQuoteMeta(meta *oraclev1.RequestMeta) *model.QuoteMeta {
if trace != nil { if trace != nil {
qm.RequestRef = trace.GetRequestRef() qm.RequestRef = trace.GetRequestRef()
qm.TraceRef = trace.GetTraceRef() qm.TraceRef = trace.GetTraceRef()
qm.IdempotencyKey = trace.GetIdempotencyKey() qm.IdempotencyKey = strings.TrimSpace(trace.GetIdempotencyKey())
} }
if org := strings.TrimSpace(meta.GetOrganizationRef()); org != "" { if org := strings.TrimSpace(meta.GetOrganizationRef()); org != "" {
if objID, err := primitive.ObjectIDFromHex(org); err == nil { if objID, err := primitive.ObjectIDFromHex(org); err == nil {

View File

@@ -32,6 +32,17 @@ func NewQuotes(logger mlogger.Logger, db *mongo.Database, txFactory transaction.
}, },
Unique: true, Unique: true,
}, },
{
Keys: []ri.Key{
{Field: "meta.organizationRef", Sort: ri.Asc},
{Field: "meta.idempotencyKey", Sort: ri.Asc},
},
Unique: true,
Name: "quotes_meta_org_idempotency_key",
PartialFilter: repository.Query().
Comparison(repository.Field("meta.idempotencyKey"), builder.Ne, "").
Comparison(repository.Field("meta.organizationRef"), builder.Exists, true),
},
{ {
Keys: []ri.Key{ Keys: []ri.Key{
{Field: "status", Sort: ri.Asc}, {Field: "status", Sort: ri.Asc},

View File

@@ -31,6 +31,7 @@ type Client interface {
CreateAccount(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error) CreateAccount(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error)
ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error) ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error)
ListConnectorAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error)
PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error)
PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error)
TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error)
@@ -231,7 +232,7 @@ func (c *ledgerClient) ListAccounts(ctx context.Context, req *ledgerv1.ListAccou
if req == nil || strings.TrimSpace(req.GetOrganizationRef()) == "" { if req == nil || strings.TrimSpace(req.GetOrganizationRef()) == "" {
return nil, merrors.InvalidArgument("ledger: organization_ref is required") return nil, merrors.InvalidArgument("ledger: organization_ref is required")
} }
resp, err := c.client.ListAccounts(ctx, &connectorv1.ListAccountsRequest{OwnerRef: strings.TrimSpace(req.GetOrganizationRef())}) resp, err := c.client.ListAccounts(ctx, &connectorv1.ListAccountsRequest{OrganizationRef: strings.TrimSpace(req.GetOrganizationRef())})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -242,6 +243,15 @@ func (c *ledgerClient) ListAccounts(ctx context.Context, req *ledgerv1.ListAccou
return &ledgerv1.ListAccountsResponse{Accounts: accounts}, nil return &ledgerv1.ListAccountsResponse{Accounts: accounts}, nil
} }
func (c *ledgerClient) ListConnectorAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
ctx, cancel := c.callContext(ctx)
defer cancel()
if req == nil {
return nil, merrors.InvalidArgument("ledger: request is required")
}
return c.client.ListAccounts(ctx, req)
}
func (c *ledgerClient) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) { func (c *ledgerClient) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
return c.submitLedgerOperation(ctx, connectorv1.OperationType_CREDIT, "", req.GetLedgerAccountRef(), req.GetMoney(), req) return c.submitLedgerOperation(ctx, connectorv1.OperationType_CREDIT, "", req.GetLedgerAccountRef(), req.GetMoney(), req)
} }
@@ -481,6 +491,10 @@ func ledgerAccountFromConnector(account *connectorv1.Account) *ledgerv1.LedgerAc
if ref := account.GetRef(); ref != nil { if ref := account.GetRef(); ref != nil {
accountID = strings.TrimSpace(ref.GetAccountId()) accountID = strings.TrimSpace(ref.GetAccountId())
} }
organizationRef := strings.TrimSpace(account.GetOwnerRef())
if v := strings.TrimSpace(fmt.Sprint(details["organization_ref"])); v != "" {
organizationRef = v
}
describable := account.GetDescribable() describable := account.GetDescribable()
label := strings.TrimSpace(account.GetLabel()) label := strings.TrimSpace(account.GetLabel())
if describable == nil && label != "" { if describable == nil && label != "" {
@@ -495,7 +509,7 @@ func ledgerAccountFromConnector(account *connectorv1.Account) *ledgerv1.LedgerAc
} }
return &ledgerv1.LedgerAccount{ return &ledgerv1.LedgerAccount{
LedgerAccountRef: accountID, LedgerAccountRef: accountID,
OrganizationRef: strings.TrimSpace(account.GetOwnerRef()), OrganizationRef: organizationRef,
AccountCode: accountCode, AccountCode: accountCode,
AccountType: accountType, AccountType: accountType,
Currency: strings.TrimSpace(account.GetAsset()), Currency: strings.TrimSpace(account.GetAsset()),

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/tech/sendico/pkg/payments/rail" "github.com/tech/sendico/pkg/payments/rail"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
) )
@@ -15,6 +16,7 @@ type Fake struct {
HoldBalanceFn func(ctx context.Context, accountID string, amount string) error HoldBalanceFn func(ctx context.Context, accountID string, amount string) error
CreateAccountFn func(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error) CreateAccountFn func(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error)
ListAccountsFn func(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error) ListAccountsFn func(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error)
ListConnectorAccountsFn func(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error)
PostCreditWithChargesFn func(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) PostCreditWithChargesFn func(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error)
PostDebitWithChargesFn func(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) PostDebitWithChargesFn func(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error)
TransferInternalFn func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) TransferInternalFn func(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error)
@@ -60,6 +62,13 @@ func (f *Fake) ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsReque
return &ledgerv1.ListAccountsResponse{}, nil return &ledgerv1.ListAccountsResponse{}, nil
} }
func (f *Fake) ListConnectorAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
if f.ListConnectorAccountsFn != nil {
return f.ListConnectorAccountsFn(ctx, req)
}
return &connectorv1.ListAccountsResponse{}, nil
}
func (f *Fake) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) { func (f *Fake) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
if f.PostCreditWithChargesFn != nil { if f.PostCreditWithChargesFn != nil {
return f.PostCreditWithChargesFn(ctx, req) return f.PostCreditWithChargesFn(ctx, req)

View File

@@ -121,10 +121,17 @@ func (c *connectorAdapter) GetAccount(ctx context.Context, req *connectorv1.GetA
} }
func (c *connectorAdapter) ListAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) { func (c *connectorAdapter) ListAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
if req == nil || strings.TrimSpace(req.GetOwnerRef()) == "" { if req == nil {
return nil, merrors.InvalidArgument("list_accounts: owner_ref is required") return nil, merrors.InvalidArgument("list_accounts: request is required")
} }
resp, err := c.svc.ListAccounts(ctx, &ledgerv1.ListAccountsRequest{OrganizationRef: strings.TrimSpace(req.GetOwnerRef())}) orgRef := strings.TrimSpace(req.GetOrganizationRef())
if orgRef == "" {
orgRef = strings.TrimSpace(req.GetOwnerRef())
}
if orgRef == "" {
return nil, merrors.InvalidArgument("list_accounts: organization_ref is required")
}
resp, err := c.svc.ListAccounts(ctx, &ledgerv1.ListAccountsRequest{OrganizationRef: orgRef})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -343,6 +350,7 @@ func ledgerAccountToConnector(account *ledgerv1.LedgerAccount) *connectorv1.Acco
"status": account.GetStatus().String(), "status": account.GetStatus().String(),
"allow_negative": account.GetAllowNegative(), "allow_negative": account.GetAllowNegative(),
"is_settlement": account.GetIsSettlement(), "is_settlement": account.GetIsSettlement(),
"organization_ref": strings.TrimSpace(account.GetOrganizationRef()),
}) })
describable := ledgerAccountDescribable(account) describable := ledgerAccountDescribable(account)
return &connectorv1.Account{ return &connectorv1.Account{
@@ -354,7 +362,7 @@ func ledgerAccountToConnector(account *ledgerv1.LedgerAccount) *connectorv1.Acco
Asset: strings.TrimSpace(account.GetCurrency()), Asset: strings.TrimSpace(account.GetCurrency()),
State: ledgerAccountState(account.GetStatus()), State: ledgerAccountState(account.GetStatus()),
Label: strings.TrimSpace(account.GetAccountCode()), Label: strings.TrimSpace(account.GetAccountCode()),
OwnerRef: strings.TrimSpace(account.GetOrganizationRef()), OwnerRef: "",
ProviderDetails: details, ProviderDetails: details,
CreatedAt: account.GetCreatedAt(), CreatedAt: account.GetCreatedAt(),
UpdatedAt: account.GetUpdatedAt(), UpdatedAt: account.GetUpdatedAt(),

View File

@@ -35,6 +35,9 @@ messaging:
reconnect_wait: 5 reconnect_wait: 5
buffer_size: 1024 buffer_size: 1024
# Retain quote records after expiry to allow long-running payments to complete.
quote_retention_hours: 72
# Service endpoints are sourced from discovery; no static overrides. # Service endpoints are sourced from discovery; no static overrides.
card_gateways: card_gateways:
monetix: monetix:

View File

@@ -22,6 +22,7 @@ type config struct {
CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"` CardGateways map[string]cardGatewayRouteConfig `yaml:"card_gateways"`
FeeAccounts map[string]string `yaml:"fee_ledger_accounts"` FeeAccounts map[string]string `yaml:"fee_ledger_accounts"`
GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"` GatewayInstances []gatewayInstanceConfig `yaml:"gateway_instances"`
QuoteRetentionHrs int `yaml:"quote_retention_hours"`
} }
type clientConfig struct { type clientConfig struct {
@@ -84,6 +85,13 @@ func (c clientConfig) callTimeout() time.Duration {
return time.Duration(c.CallTimeoutSecs) * time.Second return time.Duration(c.CallTimeoutSecs) * time.Second
} }
func (c *config) quoteRetention() time.Duration {
if c == nil || c.QuoteRetentionHrs <= 0 {
return 72 * time.Hour
}
return time.Duration(c.QuoteRetentionHrs) * time.Hour
}
func (i *Imp) loadConfig() (*config, error) { func (i *Imp) loadConfig() (*config, error) {
data, err := os.ReadFile(i.file) data, err := os.ReadFile(i.file)
if err != nil { if err != nil {

View File

@@ -9,6 +9,7 @@ import (
"github.com/tech/sendico/pkg/payments/rail" "github.com/tech/sendico/pkg/payments/rail"
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1" feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1" chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1" mntxv1 "github.com/tech/sendico/pkg/proto/gateway/mntx/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
@@ -109,6 +110,14 @@ func (c *discoveryLedgerClient) ListAccounts(ctx context.Context, req *ledgerv1.
return client.ListAccounts(ctx, req) return client.ListAccounts(ctx, req)
} }
func (c *discoveryLedgerClient) ListConnectorAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
client, err := c.resolver.LedgerClient(ctx)
if err != nil {
return nil, err
}
return client.ListConnectorAccounts(ctx, req)
}
func (c *discoveryLedgerClient) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) { func (c *discoveryLedgerClient) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
client, err := c.resolver.LedgerClient(ctx) client, err := c.resolver.LedgerClient(ctx)
if err != nil { if err != nil {

View File

@@ -38,8 +38,9 @@ func (i *Imp) Start() error {
i.initDiscovery(cfg) i.initDiscovery(cfg)
quoteRetention := cfg.quoteRetention()
repoFactory := func(logger mlogger.Logger, conn *db.MongoConnection) (storage.Repository, error) { repoFactory := func(logger mlogger.Logger, conn *db.MongoConnection) (storage.Repository, error) {
return mongostorage.New(logger, conn) return mongostorage.New(logger, conn, mongostorage.WithQuoteRetention(quoteRetention))
} }
var broker mb.Broker var broker mb.Broker

View File

@@ -122,7 +122,7 @@ func (h *quotePaymentCommand) quotePayment(
return quote, nil return quote, nil
} }
existing, err := quotesStore.GetByIdempotencyKey(ctx, qc.idempotencyKey) existing, err := quotesStore.GetByIdempotencyKey(ctx, qc.orgRef, qc.idempotencyKey)
if err != nil && !errors.Is(err, storage.ErrQuoteNotFound) { if err != nil && !errors.Is(err, storage.ErrQuoteNotFound) {
h.logger.Warn( h.logger.Warn(
"Failed to lookup quote by idempotency key", "Failed to lookup quote by idempotency key",
@@ -172,7 +172,7 @@ func (h *quotePaymentCommand) quotePayment(
if err := quotesStore.Create(ctx, record); err != nil { if err := quotesStore.Create(ctx, record); err != nil {
if errors.Is(err, storage.ErrDuplicateQuote) { if errors.Is(err, storage.ErrDuplicateQuote) {
existing, getErr := quotesStore.GetByIdempotencyKey(ctx, qc.idempotencyKey) existing, getErr := quotesStore.GetByIdempotencyKey(ctx, qc.orgRef, qc.idempotencyKey)
if getErr == nil && existing != nil { if getErr == nil && existing != nil {
if existing.Hash != qc.hash { if existing.Hash != qc.hash {
return nil, errIdempotencyParamMismatch return nil, errIdempotencyParamMismatch
@@ -372,7 +372,7 @@ func (h *quotePaymentsCommand) tryReuse(
qc *quotePaymentsCtx, qc *quotePaymentsCtx,
) (*model.PaymentQuoteRecord, bool, error) { ) (*model.PaymentQuoteRecord, bool, error) {
rec, err := quotesStore.GetByIdempotencyKey(ctx, qc.idempotencyKey) rec, err := quotesStore.GetByIdempotencyKey(ctx, qc.orgRef, qc.idempotencyKey)
if err != nil { if err != nil {
if errors.Is(err, storage.ErrQuoteNotFound) { if errors.Is(err, storage.ErrQuoteNotFound) {
return nil, false, nil return nil, false, nil

View File

@@ -8,6 +8,7 @@ import (
"github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/payments/rail" "github.com/tech/sendico/pkg/payments/rail"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1" orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
@@ -23,7 +24,7 @@ func (p *paymentExecutor) postLedgerDebit(ctx context.Context, payment *model.Pa
p.logger.Error("Ledger client unavailable", zap.String("action", "debit"), zap.String("payment_ref", paymentRef)) p.logger.Error("Ledger client unavailable", zap.String("action", "debit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable") return "", merrors.Internal("ledger_client_unavailable")
} }
tx, err := p.ledgerTxForAction(payment, amount, charges, idempotencyKey, idx, model.RailOperationDebit, quote) tx, err := p.ledgerTxForAction(ctx, payment, amount, charges, idempotencyKey, idx, model.RailOperationDebit, quote)
if err != nil { if err != nil {
p.logger.Warn("Ledger debit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err)) p.logger.Warn("Ledger debit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err return "", err
@@ -45,7 +46,7 @@ func (p *paymentExecutor) postLedgerCredit(ctx context.Context, payment *model.P
p.logger.Error("Ledger client unavailable", zap.String("action", "credit"), zap.String("payment_ref", paymentRef)) p.logger.Error("Ledger client unavailable", zap.String("action", "credit"), zap.String("payment_ref", paymentRef))
return "", merrors.Internal("ledger_client_unavailable") return "", merrors.Internal("ledger_client_unavailable")
} }
tx, err := p.ledgerTxForAction(payment, amount, nil, idempotencyKey, idx, model.RailOperationCredit, quote) tx, err := p.ledgerTxForAction(ctx, payment, amount, nil, idempotencyKey, idx, model.RailOperationCredit, quote)
if err != nil { if err != nil {
p.logger.Warn("Ledger credit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err)) p.logger.Warn("Ledger credit preparation failed", zap.String("payment_ref", paymentRef), zap.Int("step_index", idx), zap.Error(err))
return "", err return "", err
@@ -174,7 +175,7 @@ func (p *paymentExecutor) postLedgerRelease(ctx context.Context, payment *model.
return entryRef, nil return entryRef, nil
} }
func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (rail.LedgerTx, error) { func (p *paymentExecutor) ledgerTxForAction(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, charges []*ledgerv1.PostingLine, idempotencyKey string, idx int, action model.RailOperation, quote *orchestratorv1.PaymentQuote) (rail.LedgerTx, error) {
if payment == nil { if payment == nil {
return rail.LedgerTx{}, merrors.InvalidArgument("ledger: payment is required") return rail.LedgerTx{}, merrors.InvalidArgument("ledger: payment is required")
} }
@@ -205,6 +206,9 @@ func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *mone
fromRail = model.RailLedger fromRail = model.RailLedger
toRail = ledgerStepToRail(payment.PaymentPlan, idx, destRail) toRail = ledgerStepToRail(payment.PaymentPlan, idx, destRail)
accountRef, contraRef, err = ledgerDebitAccount(payment) accountRef, contraRef, err = ledgerDebitAccount(payment)
if err != nil {
accountRef, contraRef, err = p.resolveLedgerAccountRef(ctx, payment, amount, action)
}
if err == nil { if err == nil {
if blockRef := ledgerBlockAccountIfConfirmed(payment); blockRef != "" { if blockRef := ledgerBlockAccountIfConfirmed(payment); blockRef != "" {
accountRef = blockRef accountRef = blockRef
@@ -215,6 +219,9 @@ func (p *paymentExecutor) ledgerTxForAction(payment *model.Payment, amount *mone
fromRail = ledgerStepFromRail(payment.PaymentPlan, idx, sourceRail) fromRail = ledgerStepFromRail(payment.PaymentPlan, idx, sourceRail)
toRail = model.RailLedger toRail = model.RailLedger
accountRef, contraRef, err = ledgerCreditAccount(payment) accountRef, contraRef, err = ledgerCreditAccount(payment)
if err != nil {
accountRef, contraRef, err = p.resolveLedgerAccountRef(ctx, payment, amount, action)
}
externalRef = ledgerExternalReference(payment.ExecutionPlan, idx) externalRef = ledgerExternalReference(payment.ExecutionPlan, idx)
default: default:
return rail.LedgerTx{}, merrors.InvalidArgument("ledger: unsupported action") return rail.LedgerTx{}, merrors.InvalidArgument("ledger: unsupported action")
@@ -321,6 +328,116 @@ func ledgerExternalReference(plan *model.ExecutionPlan, idx int) string {
return "" return ""
} }
func (p *paymentExecutor) resolveLedgerAccountRef(ctx context.Context, payment *model.Payment, amount *moneyv1.Money, action model.RailOperation) (string, string, error) {
if payment == nil {
return "", "", merrors.InvalidArgument("ledger: payment is required")
}
if amount == nil || strings.TrimSpace(amount.GetCurrency()) == "" {
return "", "", merrors.InvalidArgument("ledger: amount is required")
}
switch action {
case model.RailOperationCredit:
if account, _, err := ledgerDebitAccount(payment); err == nil && strings.TrimSpace(account) != "" {
setLedgerAccountAttributes(payment, account)
return account, "", nil
}
case model.RailOperationDebit:
if account, _, err := ledgerCreditAccount(payment); err == nil && strings.TrimSpace(account) != "" {
setLedgerAccountAttributes(payment, account)
return account, "", nil
}
}
account, err := p.resolveOrgOwnedLedgerAccount(ctx, payment, amount)
if err != nil {
return "", "", err
}
setLedgerAccountAttributes(payment, account)
return account, "", nil
}
func (p *paymentExecutor) resolveOrgOwnedLedgerAccount(ctx context.Context, payment *model.Payment, amount *moneyv1.Money) (string, error) {
if payment == nil {
return "", merrors.InvalidArgument("ledger: payment is required")
}
if payment.OrganizationRef == primitive.NilObjectID {
return "", merrors.InvalidArgument("ledger: organization_ref is required")
}
if amount == nil || strings.TrimSpace(amount.GetCurrency()) == "" {
return "", merrors.InvalidArgument("ledger: amount is required")
}
if p == nil || p.deps == nil || p.deps.ledger.client == nil {
return "", merrors.Internal("ledger_client_unavailable")
}
currency := strings.TrimSpace(amount.GetCurrency())
resp, err := p.deps.ledger.client.ListConnectorAccounts(ctx, &connectorv1.ListAccountsRequest{
OrganizationRef: payment.OrganizationRef.Hex(),
Kind: connectorv1.AccountKind_LEDGER_ACCOUNT,
Asset: currency,
})
if err != nil {
return "", err
}
for _, account := range resp.GetAccounts() {
if account == nil {
continue
}
if account.GetKind() != connectorv1.AccountKind_LEDGER_ACCOUNT {
continue
}
asset := strings.TrimSpace(account.GetAsset())
if asset == "" || !strings.EqualFold(asset, currency) {
continue
}
if strings.TrimSpace(account.GetOwnerRef()) != "" {
continue
}
if connectorAccountIsSettlement(account) {
continue
}
if ref := account.GetRef(); ref != nil {
if accountID := strings.TrimSpace(ref.GetAccountId()); accountID != "" {
return accountID, nil
}
}
}
return "", merrors.InvalidArgument("ledger: org-owned account not found")
}
func connectorAccountIsSettlement(account *connectorv1.Account) bool {
if account == nil || account.GetProviderDetails() == nil {
return false
}
details := account.GetProviderDetails().AsMap()
val, ok := details["is_settlement"]
if !ok {
return false
}
switch v := val.(type) {
case bool:
return v
case string:
return strings.EqualFold(strings.TrimSpace(v), "true")
default:
return false
}
}
func setLedgerAccountAttributes(payment *model.Payment, accountRef string) {
if payment == nil || strings.TrimSpace(accountRef) == "" {
return
}
if payment.Intent.Attributes == nil {
payment.Intent.Attributes = map[string]string{}
}
if attributeLookup(payment.Intent.Attributes, "ledger_debit_account_ref", "ledgerDebitAccountRef") == "" {
payment.Intent.Attributes["ledger_debit_account_ref"] = accountRef
}
if attributeLookup(payment.Intent.Attributes, "ledger_credit_account_ref", "ledgerCreditAccountRef") == "" {
payment.Intent.Attributes["ledger_credit_account_ref"] = accountRef
}
}
func ledgerDebitAccount(payment *model.Payment) (string, string, error) { func ledgerDebitAccount(payment *model.Payment) (string, string, error) {
if payment == nil { if payment == nil {
return "", "", merrors.InvalidArgument("ledger: payment is required") return "", "", merrors.InvalidArgument("ledger: payment is required")

View File

@@ -0,0 +1,96 @@
package orchestrator
import (
"context"
"testing"
ledgerclient "github.com/tech/sendico/ledger/client"
"github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/payments/rail"
paymenttypes "github.com/tech/sendico/pkg/payments/types"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
)
func TestLedgerAccountResolution_UsesOrgOwnedAccount(t *testing.T) {
ctx := context.Background()
accountID := "ledger:org:usd"
providerDetails, err := structpb.NewStruct(map[string]interface{}{
"is_settlement": false,
})
if err != nil {
t.Fatalf("provider details build error: %v", err)
}
listCalls := 0
ledgerAccountRefs := make([]string, 0, 2)
ledgerFake := &ledgerclient.Fake{
ListConnectorAccountsFn: func(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
listCalls++
return &connectorv1.ListAccountsResponse{
Accounts: []*connectorv1.Account{
{
Ref: &connectorv1.AccountRef{ConnectorId: "ledger", AccountId: accountID},
Kind: connectorv1.AccountKind_LEDGER_ACCOUNT,
Asset: "USD",
OwnerRef: "",
ProviderDetails: providerDetails,
},
},
}, nil
},
CreateTransactionFn: func(ctx context.Context, tx rail.LedgerTx) (string, error) {
ledgerAccountRefs = append(ledgerAccountRefs, tx.LedgerAccountRef)
return "entry-1", nil
},
}
svc := &Service{
logger: zap.NewNop(),
deps: serviceDependencies{
ledger: ledgerDependency{
client: ledgerFake,
internal: ledgerFake,
},
},
}
executor := newPaymentExecutor(&svc.deps, svc.logger, svc)
amount := &paymenttypes.Money{Currency: "USD", Amount: "10"}
payment := &model.Payment{
PaymentRef: "pay-1",
IdempotencyKey: "pay-1",
Intent: model.PaymentIntent{
Kind: model.PaymentKindPayout,
},
PaymentPlan: &model.PaymentPlan{
ID: "pay-1",
IdempotencyKey: "pay-1",
Steps: []*model.PaymentStep{
{StepID: "ledger_debit", Rail: model.RailLedger, Action: model.RailOperationDebit, Amount: cloneMoney(amount)},
{StepID: "ledger_credit", Rail: model.RailLedger, Action: model.RailOperationCredit, DependsOn: []string{"ledger_debit"}, Amount: cloneMoney(amount)},
},
},
}
payment.OrganizationRef = primitive.NewObjectID()
store := newStubPaymentsStore()
store.payments[payment.PaymentRef] = payment
if err := executor.executePaymentPlan(ctx, store, payment, &orchestratorv1.PaymentQuote{}); err != nil {
t.Fatalf("executePaymentPlan error: %v", err)
}
if listCalls == 0 {
t.Fatalf("expected ledger accounts lookup")
}
if len(ledgerAccountRefs) != 2 {
t.Fatalf("expected two ledger transactions, got %d", len(ledgerAccountRefs))
}
if ledgerAccountRefs[0] != accountID || ledgerAccountRefs[1] != accountID {
t.Fatalf("unexpected ledger account refs: %+v", ledgerAccountRefs)
}
}

View File

@@ -430,11 +430,14 @@ func (s *helperQuotesStore) GetByRef(_ context.Context, _ primitive.ObjectID, re
return nil, storage.ErrQuoteNotFound return nil, storage.ErrQuoteNotFound
} }
func (s *helperQuotesStore) GetByIdempotencyKey(_ context.Context, ref string) (*model.PaymentQuoteRecord, error) { func (s *helperQuotesStore) GetByIdempotencyKey(_ context.Context, orgRef primitive.ObjectID, ref string) (*model.PaymentQuoteRecord, error) {
if s.records == nil { if s.records == nil {
return nil, storage.ErrQuoteNotFound return nil, storage.ErrQuoteNotFound
} }
for _, rec := range s.records { for _, rec := range s.records {
if rec.OrganizationRef != orgRef {
continue
}
if rec.IdempotencyKey == ref { if rec.IdempotencyKey == ref {
return rec, nil return rec, nil
} }

View File

@@ -423,11 +423,14 @@ func (s *stubQuotesStore) GetByRef(ctx context.Context, orgRef primitive.ObjectI
return nil, storage.ErrQuoteNotFound return nil, storage.ErrQuoteNotFound
} }
func (s *stubQuotesStore) GetByIdempotencyKey(ctx context.Context, idempotencyKey string) (*model.PaymentQuoteRecord, error) { func (s *stubQuotesStore) GetByIdempotencyKey(ctx context.Context, orgRef primitive.ObjectID, idempotencyKey string) (*model.PaymentQuoteRecord, error) {
if s.quotes == nil { if s.quotes == nil {
return nil, storage.ErrQuoteNotFound return nil, storage.ErrQuoteNotFound
} }
for _, q := range s.quotes { for _, q := range s.quotes {
if q.OrganizationRef != orgRef {
continue
}
if q.IdempotencyKey == idempotencyKey { if q.IdempotencyKey == idempotencyKey {
return q, nil return q, nil
} }

View File

@@ -19,6 +19,7 @@ type PaymentQuoteRecord struct {
Quote *PaymentQuoteSnapshot `bson:"quote,omitempty" json:"quote,omitempty"` Quote *PaymentQuoteSnapshot `bson:"quote,omitempty" json:"quote,omitempty"`
Quotes []*PaymentQuoteSnapshot `bson:"quotes,omitempty" json:"quotes,omitempty"` Quotes []*PaymentQuoteSnapshot `bson:"quotes,omitempty" json:"quotes,omitempty"`
ExpiresAt time.Time `bson:"expiresAt" json:"expiresAt"` ExpiresAt time.Time `bson:"expiresAt" json:"expiresAt"`
PurgeAt time.Time `bson:"purgeAt,omitempty" json:"purgeAt,omitempty"`
Hash string `bson:"hash" json:"hash"` Hash string `bson:"hash" json:"hash"`
} }

View File

@@ -2,6 +2,7 @@ package mongo
import ( import (
"context" "context"
"time"
"github.com/tech/sendico/payments/orchestrator/storage" "github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model" "github.com/tech/sendico/payments/orchestrator/storage/model"
@@ -23,8 +24,22 @@ type Store struct {
plans storage.PlanTemplatesStore plans storage.PlanTemplatesStore
} }
type options struct {
quoteRetention time.Duration
}
// Option configures the Mongo-backed payments repository.
type Option func(*options)
// WithQuoteRetention sets how long payment quote records are retained after expiry.
func WithQuoteRetention(retention time.Duration) Option {
return func(opts *options) {
opts.quoteRetention = retention
}
}
// New constructs a Mongo-backed payments repository from a Mongo connection. // New constructs a Mongo-backed payments repository from a Mongo connection.
func New(logger mlogger.Logger, conn *db.MongoConnection) (*Store, error) { func New(logger mlogger.Logger, conn *db.MongoConnection, opts ...Option) (*Store, error) {
if conn == nil { if conn == nil {
return nil, merrors.InvalidArgument("payments.storage.mongo: connection is nil") return nil, merrors.InvalidArgument("payments.storage.mongo: connection is nil")
} }
@@ -32,11 +47,11 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Store, error) {
quotesRepo := repository.CreateMongoRepository(conn.Database(), (&model.PaymentQuoteRecord{}).Collection()) quotesRepo := repository.CreateMongoRepository(conn.Database(), (&model.PaymentQuoteRecord{}).Collection())
routesRepo := repository.CreateMongoRepository(conn.Database(), (&model.PaymentRoute{}).Collection()) routesRepo := repository.CreateMongoRepository(conn.Database(), (&model.PaymentRoute{}).Collection())
plansRepo := repository.CreateMongoRepository(conn.Database(), (&model.PaymentPlanTemplate{}).Collection()) plansRepo := repository.CreateMongoRepository(conn.Database(), (&model.PaymentPlanTemplate{}).Collection())
return NewWithRepository(logger, conn.Ping, paymentsRepo, quotesRepo, routesRepo, plansRepo) return NewWithRepository(logger, conn.Ping, paymentsRepo, quotesRepo, routesRepo, plansRepo, opts...)
} }
// NewWithRepository constructs a payments repository using the provided primitives. // NewWithRepository constructs a payments repository using the provided primitives.
func NewWithRepository(logger mlogger.Logger, ping func(context.Context) error, paymentsRepo repository.Repository, quotesRepo repository.Repository, routesRepo repository.Repository, plansRepo repository.Repository) (*Store, error) { func NewWithRepository(logger mlogger.Logger, ping func(context.Context) error, paymentsRepo repository.Repository, quotesRepo repository.Repository, routesRepo repository.Repository, plansRepo repository.Repository, opts ...Option) (*Store, error) {
if ping == nil { if ping == nil {
return nil, merrors.InvalidArgument("payments.storage.mongo: ping func is nil") return nil, merrors.InvalidArgument("payments.storage.mongo: ping func is nil")
} }
@@ -53,12 +68,19 @@ func NewWithRepository(logger mlogger.Logger, ping func(context.Context) error,
return nil, merrors.InvalidArgument("payments.storage.mongo: plan templates repository is nil") return nil, merrors.InvalidArgument("payments.storage.mongo: plan templates repository is nil")
} }
cfg := options{}
for _, opt := range opts {
if opt != nil {
opt(&cfg)
}
}
childLogger := logger.Named("storage").Named("mongo") childLogger := logger.Named("storage").Named("mongo")
paymentsStore, err := store.NewPayments(childLogger, paymentsRepo) paymentsStore, err := store.NewPayments(childLogger, paymentsRepo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
quotesStore, err := store.NewQuotes(childLogger, quotesRepo) quotesStore, err := store.NewQuotes(childLogger, quotesRepo, cfg.quoteRetention)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -9,6 +9,7 @@ import (
"github.com/tech/sendico/payments/orchestrator/storage" "github.com/tech/sendico/payments/orchestrator/storage"
"github.com/tech/sendico/payments/orchestrator/storage/model" "github.com/tech/sendico/payments/orchestrator/storage/model"
"github.com/tech/sendico/pkg/db/repository" "github.com/tech/sendico/pkg/db/repository"
"github.com/tech/sendico/pkg/db/repository/builder"
ri "github.com/tech/sendico/pkg/db/repository/index" ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mlogger"
@@ -19,25 +20,43 @@ import (
type Quotes struct { type Quotes struct {
logger mlogger.Logger logger mlogger.Logger
repo repository.Repository repo repository.Repository
retention time.Duration
} }
const defaultPaymentQuoteRetention = 72 * time.Hour
// NewQuotes constructs a Mongo-backed quotes store. // NewQuotes constructs a Mongo-backed quotes store.
func NewQuotes(logger mlogger.Logger, repo repository.Repository) (*Quotes, error) { func NewQuotes(logger mlogger.Logger, repo repository.Repository, retention time.Duration) (*Quotes, error) {
if repo == nil { if repo == nil {
return nil, merrors.InvalidArgument("quotesStore: repository is nil") return nil, merrors.InvalidArgument("quotesStore: repository is nil")
} }
if retention <= 0 {
logger.Info("Using default retention duration", zap.Duration("default_retention", defaultPaymentQuoteRetention))
retention = defaultPaymentQuoteRetention
}
logger.Info("Using retention duration", zap.Duration("retention", retention))
indexes := []*ri.Definition{ indexes := []*ri.Definition{
{ {
Keys: []ri.Key{{Field: "quoteRef", Sort: ri.Asc}}, Keys: []ri.Key{{Field: "quoteRef", Sort: ri.Asc}},
Unique: true, Unique: true,
}, },
{
Keys: []ri.Key{
{Field: "organizationRef", Sort: ri.Asc},
{Field: "idempotencyKey", Sort: ri.Asc},
},
Unique: true,
Name: "payment_quotes_org_idempotency_key",
PartialFilter: repository.Query().Comparison(repository.Field("idempotencyKey"), builder.Ne, ""),
},
{ {
Keys: []ri.Key{{Field: "organizationRef", Sort: ri.Asc}}, Keys: []ri.Key{{Field: "organizationRef", Sort: ri.Asc}},
}, },
{ {
Keys: []ri.Key{{Field: "expiresAt", Sort: ri.Asc}}, Keys: []ri.Key{{Field: "purgeAt", Sort: ri.Asc}},
TTL: int32Ptr(0), TTL: int32Ptr(0),
Name: "payment_quotes_purge_at_ttl",
}, },
} }
@@ -51,6 +70,7 @@ func NewQuotes(logger mlogger.Logger, repo repository.Repository) (*Quotes, erro
return &Quotes{ return &Quotes{
logger: logger.Named("quotes"), logger: logger.Named("quotes"),
repo: repo, repo: repo,
retention: retention,
}, nil }, nil
} }
@@ -65,12 +85,16 @@ func (q *Quotes) Create(ctx context.Context, quote *model.PaymentQuoteRecord) er
if quote.OrganizationRef == primitive.NilObjectID { if quote.OrganizationRef == primitive.NilObjectID {
return merrors.InvalidArgument("quotesStore: organization_ref is required") return merrors.InvalidArgument("quotesStore: organization_ref is required")
} }
quote.IdempotencyKey = strings.TrimSpace(quote.IdempotencyKey)
if quote.IdempotencyKey == "" { if quote.IdempotencyKey == "" {
return merrors.InvalidArgument("quotesStore: idempotency key is required") return merrors.InvalidArgument("quotesStore: idempotency key is required")
} }
if quote.ExpiresAt.IsZero() { if quote.ExpiresAt.IsZero() {
return merrors.InvalidArgument("quotesStore: expires_at is required") return merrors.InvalidArgument("quotesStore: expires_at is required")
} }
if quote.PurgeAt.IsZero() || quote.PurgeAt.Before(quote.ExpiresAt) {
quote.PurgeAt = quote.ExpiresAt.Add(q.retention)
}
if quote.Intent.Attributes != nil { if quote.Intent.Attributes != nil {
for k, v := range quote.Intent.Attributes { for k, v := range quote.Intent.Attributes {
quote.Intent.Attributes[k] = strings.TrimSpace(v) quote.Intent.Attributes[k] = strings.TrimSpace(v)
@@ -123,13 +147,16 @@ func (q *Quotes) GetByRef(ctx context.Context, orgRef primitive.ObjectID, quoteR
return entity, nil return entity, nil
} }
func (q *Quotes) GetByIdempotencyKey(ctx context.Context, idempotencyKey string) (*model.PaymentQuoteRecord, error) { func (q *Quotes) GetByIdempotencyKey(ctx context.Context, orgRef primitive.ObjectID, idempotencyKey string) (*model.PaymentQuoteRecord, error) {
idempotencyKey = strings.TrimSpace(idempotencyKey) idempotencyKey = strings.TrimSpace(idempotencyKey)
if idempotencyKey == "" { if idempotencyKey == "" {
return nil, merrors.InvalidArgument("quotesStore: empty idempotency key") return nil, merrors.InvalidArgument("quotesStore: empty idempotency key")
} }
if orgRef == primitive.NilObjectID {
return nil, merrors.InvalidArgument("quotesStore: organization_ref is required")
}
entity := &model.PaymentQuoteRecord{} entity := &model.PaymentQuoteRecord{}
query := repository.Filter("idempotencyKey", idempotencyKey) query := repository.OrgFilter(orgRef).And(repository.Filter("idempotencyKey", idempotencyKey))
if err := q.repo.FindOneByFilter(ctx, query, entity); err != nil { if err := q.repo.FindOneByFilter(ctx, query, entity); err != nil {
if errors.Is(err, merrors.ErrNoData) { if errors.Is(err, merrors.ErrNoData) {
return nil, storage.ErrQuoteNotFound return nil, storage.ErrQuoteNotFound

View File

@@ -55,7 +55,7 @@ type PaymentsStore interface {
type QuotesStore interface { type QuotesStore interface {
Create(ctx context.Context, quote *model.PaymentQuoteRecord) error Create(ctx context.Context, quote *model.PaymentQuoteRecord) error
GetByRef(ctx context.Context, orgRef primitive.ObjectID, quoteRef string) (*model.PaymentQuoteRecord, error) GetByRef(ctx context.Context, orgRef primitive.ObjectID, quoteRef string) (*model.PaymentQuoteRecord, error)
GetByIdempotencyKey(ctx context.Context, idempotencyKey string) (*model.PaymentQuoteRecord, error) GetByIdempotencyKey(ctx context.Context, orgRef primitive.ObjectID, idempotencyKey string) (*model.PaymentQuoteRecord, error)
} }
// RoutesStore manages allowed routing transitions. // RoutesStore manages allowed routing transitions.

View File

@@ -0,0 +1,65 @@
package discovery
import (
"errors"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/tech/sendico/pkg/merrors"
)
var (
metricsOnce sync.Once
eventLatency *prometheus.HistogramVec
eventStatus *prometheus.CounterVec
)
func initMetrics() {
metricsOnce.Do(func() {
eventLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "sendico",
Subsystem: "discovery",
Name: "event_latency_seconds",
Help: "Latency distribution for discovery event handling.",
Buckets: prometheus.DefBuckets,
}, []string{"event"})
eventStatus = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sendico",
Subsystem: "discovery",
Name: "event_requests_total",
Help: "Total number of discovery events handled, grouped by event and status.",
}, []string{"event", "status"})
})
}
func observeEvent(event string, err error, duration time.Duration) {
if eventLatency != nil {
eventLatency.WithLabelValues(event).Observe(duration.Seconds())
}
if eventStatus != nil {
eventStatus.WithLabelValues(event, statusLabel(err)).Inc()
}
}
func statusLabel(err error) string {
switch {
case err == nil:
return "ok"
case errors.Is(err, merrors.ErrInvalidArg):
return "invalid_argument"
case errors.Is(err, merrors.ErrNoData):
return "not_found"
case errors.Is(err, merrors.ErrDataConflict):
return "conflict"
case errors.Is(err, merrors.ErrAccessDenied):
return "denied"
case errors.Is(err, merrors.ErrInternal):
return "internal"
default:
return "error"
}
}

View File

@@ -59,6 +59,7 @@ func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg
return nil, merrors.InvalidArgument("discovery registry: no logger provided", "logger") return nil, merrors.InvalidArgument("discovery registry: no logger provided", "logger")
} }
logger = logger.Named("discovery_registry") logger = logger.Named("discovery_registry")
initMetrics()
sender = strings.TrimSpace(sender) sender = strings.TrimSpace(sender)
if sender == "" { if sender == "" {
sender = "discovery" sender = "discovery"
@@ -147,9 +148,14 @@ func (s *RegistryService) Stop() {
}) })
} }
func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) error { func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) (err error) {
start := time.Now()
defer func() {
observeEvent("announce", err, time.Since(start))
}()
var payload Announcement var payload Announcement
if err := json.Unmarshal(env.GetData(), &payload); err != nil { if err = json.Unmarshal(env.GetData(), &payload); err != nil {
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err)) fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery announce payload", fields...) s.logWarn("Failed to decode discovery announce payload", fields...)
return err return err
@@ -174,9 +180,14 @@ func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) err
return nil return nil
} }
func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) error { func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) (err error) {
start := time.Now()
defer func() {
observeEvent("heartbeat", err, time.Since(start))
}()
var payload Heartbeat var payload Heartbeat
if err := json.Unmarshal(env.GetData(), &payload); err != nil { if err = json.Unmarshal(env.GetData(), &payload); err != nil {
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err)) fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery heartbeat payload", fields...) s.logWarn("Failed to decode discovery heartbeat payload", fields...)
return err return err
@@ -208,13 +219,18 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) er
return nil return nil
} }
func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) error { func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) (err error) {
start := time.Now()
defer func() {
observeEvent("lookup", err, time.Since(start))
}()
if s.producer == nil { if s.producer == nil {
s.logWarn("Discovery lookup request ignored: producer not configured", envelopeFields(env)...) s.logWarn("Discovery lookup request ignored: producer not configured", envelopeFields(env)...)
return nil return nil
} }
var payload LookupRequest var payload LookupRequest
if err := json.Unmarshal(env.GetData(), &payload); err != nil { if err = json.Unmarshal(env.GetData(), &payload); err != nil {
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err)) fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery lookup payload", fields...) s.logWarn("Failed to decode discovery lookup payload", fields...)
return err return err
@@ -222,7 +238,7 @@ func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) error
resp := s.registry.Lookup(time.Now()) resp := s.registry.Lookup(time.Now())
resp.RequestID = strings.TrimSpace(payload.RequestID) resp.RequestID = strings.TrimSpace(payload.RequestID)
s.logDebug("Discovery lookup prepared", zap.String("request_id", resp.RequestID), zap.Int("services", len(resp.Services)), zap.Int("gateways", len(resp.Gateways))) s.logDebug("Discovery lookup prepared", zap.String("request_id", resp.RequestID), zap.Int("services", len(resp.Services)), zap.Int("gateways", len(resp.Gateways)))
if err := s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil { if err = s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil {
fields := []zap.Field{zap.String("request_id", resp.RequestID), zap.Error(err)} fields := []zap.Field{zap.String("request_id", resp.RequestID), zap.Error(err)}
s.logWarn("Failed to publish discovery lookup response", fields...) s.logWarn("Failed to publish discovery lookup response", fields...)
return err return err

View File

@@ -6,17 +6,25 @@ import 'package:pshared/models/payment/type.dart';
IconData iconForPaymentType(PaymentType type) { IconData iconForPaymentType(PaymentType type) {
switch (type) { switch (type) {
case PaymentType.bankAccount: case PaymentType.bankAccount:
return Icons.account_balance; return Icons.account_balance; // bank / institution
case PaymentType.iban: case PaymentType.iban:
return Icons.language; return Icons.public; // cross-border / international account
case PaymentType.ledger:
return Icons.account_balance; // internal ledger account (best fit)
case PaymentType.wallet: case PaymentType.wallet:
return Icons.account_balance_wallet; return Icons.account_balance_wallet; // user wallet / stored funds
case PaymentType.card: case PaymentType.card:
return Icons.credit_card; case PaymentType.cardToken:
return Icons.credit_card; // card rail
case PaymentType.externalChain: case PaymentType.externalChain:
return Icons.currency_bitcoin; return Icons.hub; // network / blockchain, not "bitcoin"
//TODO: define new payment methods
default: case PaymentType.managedWallet:
return Icons.question_mark; return Icons.currency_bitcoin; // custodial / managed / controlled
} }
} }