Fixed billing fees unreachable error propagation. Added USDT ledger creation. Fixed ledger boundaries operation types

This commit is contained in:
Stephan D
2026-02-26 16:25:52 +01:00
parent 54e5c799e8
commit 336f352858
37 changed files with 838 additions and 302 deletions

View File

@@ -9,6 +9,7 @@ import (
"github.com/tech/sendico/ledger/internal/appversion"
"github.com/tech/sendico/pkg/connector/params"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/ledgerconv"
"github.com/tech/sendico/pkg/merrors"
accountrolev1 "github.com/tech/sendico/pkg/proto/common/account_role/v1"
@@ -16,6 +17,7 @@ import (
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
@@ -222,7 +224,7 @@ func (c *connectorAdapter) SubmitOperation(ctx context.Context, req *connectorv1
if err != nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, err.Error(), op, "")}}, nil
}
operation := strings.ToLower(strings.TrimSpace(reader.String("operation")))
operation := discovery.NormalizeOperation(reader.String("operation"))
switch op.GetType() {
case connectorv1.OperationType_CREDIT:
@@ -230,11 +232,11 @@ func (c *connectorAdapter) SubmitOperation(ctx context.Context, req *connectorv1
if accountID == "" && op.GetToRole() == accountrolev1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "credit: to.account or to_role is required", op, "")}}, nil
}
if operation != "" && operation != "external.credit" {
if operation != "" && operation != discovery.OperationExternalCredit {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "credit: unsupported operation override", op, "")}}, nil
}
creditFn := c.svc.PostCreditWithCharges
if operation == "external.credit" {
if operation == discovery.OperationExternalCredit {
creditFn = c.svc.PostExternalCreditWithCharges
}
resp, err := creditFn(ctx, &ledgerv1.PostCreditRequest{
@@ -250,6 +252,10 @@ func (c *connectorAdapter) SubmitOperation(ctx context.Context, req *connectorv1
Role: accountRoleFromConnectorRole(op.GetToRole()),
})
if err != nil {
c.svc.logger.Warn("Operation failed", zap.Error(err), zap.String("operation", operation),
zap.String("idempotency_key", op.IdempotencyKey), zap.String("description", description),
zap.String("organization_ref", orgRef), zap.String("ledger_account_ref", accountID),
)
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, accountID)}}, nil
}
return &connectorv1.SubmitOperationResponse{Receipt: ledgerReceipt(resp.GetJournalEntryRef(), connectorv1.OperationStatus_OPERATION_SUCCESS)}, nil
@@ -258,11 +264,11 @@ func (c *connectorAdapter) SubmitOperation(ctx context.Context, req *connectorv1
if accountID == "" && op.GetFromRole() == accountrolev1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "debit: from.account or from_role is required", op, "")}}, nil
}
if operation != "" && operation != "external.debit" {
if operation != "" && operation != discovery.OperationExternalDebit {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "debit: unsupported operation override", op, "")}}, nil
}
debitFn := c.svc.PostDebitWithCharges
if operation == "external.debit" {
if operation == discovery.OperationExternalDebit {
debitFn = c.svc.PostExternalDebitWithCharges
}
resp, err := debitFn(ctx, &ledgerv1.PostDebitRequest{
@@ -393,14 +399,14 @@ func ledgerOperationParams() []*connectorv1.OperationParamSpec {
Type: connectorv1.ParamType_STRING,
Required: false,
Description: "Optional ledger operation override (external.credit).",
AllowedValues: []string{"external.credit"},
AllowedValues: []string{discovery.OperationExternalCredit},
}
externalDebit := &connectorv1.ParamSpec{
Key: "operation",
Type: connectorv1.ParamType_STRING,
Required: false,
Description: "Optional ledger operation override (external.debit).",
AllowedValues: []string{"external.debit"},
AllowedValues: []string{discovery.OperationExternalDebit},
}
return []*connectorv1.OperationParamSpec{
{OperationType: connectorv1.OperationType_CREDIT, Params: append(common, externalCredit, &connectorv1.ParamSpec{Key: "contra_ledger_account_ref", Type: connectorv1.ParamType_STRING, Required: false})},

View File

@@ -7,6 +7,40 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)
type journalEntryType string
const (
journalEntryTypeCredit journalEntryType = "credit"
journalEntryTypeDebit journalEntryType = "debit"
journalEntryTypeTransfer journalEntryType = "transfer"
journalEntryTypeFX journalEntryType = "fx"
)
type journalEntryStatus string
const (
journalEntryStatusAttempted journalEntryStatus = "attempted"
journalEntryStatusSuccess journalEntryStatus = "success"
journalEntryStatusError journalEntryStatus = "error"
)
type journalEntryErrorType string
const (
journalEntryErrorNotImplemented journalEntryErrorType = "not_implemented"
journalEntryErrorFailed journalEntryErrorType = "failed"
journalEntryErrorIdempotencyCheck journalEntryErrorType = "idempotency_check_failed"
journalEntryErrorAccountResolve journalEntryErrorType = "account_resolve_failed"
journalEntryErrorAccountInvalid journalEntryErrorType = "account_invalid"
journalEntryErrorContraResolve journalEntryErrorType = "contra_resolve_failed"
journalEntryErrorContraMissingID journalEntryErrorType = "contra_missing_id"
journalEntryErrorSystemAccountResolve journalEntryErrorType = "system_account_resolve_failed"
journalEntryErrorSystemAccountInvalid journalEntryErrorType = "system_account_invalid"
journalEntryErrorSystemAccountMissing journalEntryErrorType = "system_account_missing_id"
journalEntryErrorUnbalancedAfterContra journalEntryErrorType = "unbalanced_after_contra"
journalEntryErrorTransactionFailed journalEntryErrorType = "transaction_failed"
)
var (
metricsOnce sync.Once
@@ -110,16 +144,16 @@ func initMetrics() {
// Metric recording helpers
func recordJournalEntry(entryType, status string, durationSeconds float64) {
func recordJournalEntry(entryType journalEntryType, status journalEntryStatus, durationSeconds float64) {
initMetrics()
journalEntriesTotal.WithLabelValues(entryType, status).Inc()
journalEntryLatency.WithLabelValues(entryType).Observe(durationSeconds)
journalEntriesTotal.WithLabelValues(string(entryType), string(status)).Inc()
journalEntryLatency.WithLabelValues(string(entryType)).Observe(durationSeconds)
}
func recordJournalEntryError(entryType, errorType string) {
func recordJournalEntryError(entryType journalEntryType, errorType journalEntryErrorType) {
initMetrics()
journalEntryErrors.WithLabelValues(entryType, errorType).Inc()
journalEntriesTotal.WithLabelValues(entryType, "error").Inc()
journalEntryErrors.WithLabelValues(string(entryType), string(errorType)).Inc()
journalEntriesTotal.WithLabelValues(string(entryType), string(journalEntryStatusError)).Inc()
}
func recordBalanceQuery(status string, durationSeconds float64) {
@@ -128,9 +162,9 @@ func recordBalanceQuery(status string, durationSeconds float64) {
balanceQueryLatency.WithLabelValues(status).Observe(durationSeconds)
}
func recordTransactionAmount(currency, entryType string, amount float64) {
func recordTransactionAmount(currency string, entryType journalEntryType, amount float64) {
initMetrics()
transactionAmounts.WithLabelValues(currency, entryType).Observe(amount)
transactionAmounts.WithLabelValues(currency, string(entryType)).Observe(amount)
}
func recordAccountOperation(operation, status string) {
@@ -138,7 +172,7 @@ func recordAccountOperation(operation, status string) {
accountOperationsTotal.WithLabelValues(operation, status).Inc()
}
func recordDuplicateRequest(entryType string) {
func recordDuplicateRequest(entryType journalEntryType) {
initMetrics()
duplicateRequestsTotal.WithLabelValues(entryType).Inc()
duplicateRequestsTotal.WithLabelValues(string(entryType)).Inc()
}

View File

@@ -65,7 +65,7 @@ func (s *Service) postCreditResponder(_ context.Context, req *ledgerv1.PostCredi
existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey)
if err == nil && existingEntry != nil {
recordDuplicateRequest("credit")
recordDuplicateRequest(journalEntryTypeCredit)
logger.Info("Duplicate credit request (idempotency)",
zap.String("existingEntryID", existingEntry.GetID().Hex()))
return &ledgerv1.PostResponse{
@@ -75,18 +75,18 @@ func (s *Service) postCreditResponder(_ context.Context, req *ledgerv1.PostCredi
}, nil
}
if err != nil && err != storage.ErrJournalEntryNotFound {
recordJournalEntryError("credit", "idempotency_check_failed")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorIdempotencyCheck)
logger.Warn("Failed to check idempotency", zap.Error(err))
return nil, merrors.Internal("failed to check idempotency")
}
account, accountRef, err := s.resolveAccount(ctx, strings.TrimSpace(req.LedgerAccountRef), roleModel, orgRef, req.Money.Currency, "account")
if err != nil {
recordJournalEntryError("credit", "account_resolve_failed")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorAccountResolve)
return nil, err
}
if err := validateAccountForOrg(account, orgRef, req.Money.Currency); err != nil {
recordJournalEntryError("credit", "account_invalid")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorAccountInvalid)
return nil, err
}
@@ -159,12 +159,12 @@ func (s *Service) postCreditResponder(_ context.Context, req *ledgerv1.PostCredi
contraAccount, err := s.resolveSettlementAccount(ctx, orgRef, req.Money.Currency, req.ContraLedgerAccountRef, accountsByRef)
if err != nil {
recordJournalEntryError("credit", "contra_resolve_failed")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorContraResolve)
return nil, err
}
contraAccountID := contraAccount.GetID()
if contraAccountID == nil {
recordJournalEntryError("credit", "contra_missing_id")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorContraMissingID)
return nil, merrors.Internal("contra account missing identifier")
}
@@ -183,7 +183,7 @@ func (s *Service) postCreditResponder(_ context.Context, req *ledgerv1.PostCredi
}
if !entryTotal.IsZero() {
recordJournalEntryError("credit", "unbalanced_after_contra")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorUnbalancedAfterContra)
return nil, merrors.Internal("failed to balance journal entry")
}
@@ -237,13 +237,13 @@ func (s *Service) postCreditResponder(_ context.Context, req *ledgerv1.PostCredi
})
if err != nil {
recordJournalEntryError("credit", "transaction_failed")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorTransactionFailed)
return nil, err
}
amountFloat, _ := creditAmount.Float64()
recordTransactionAmount(req.Money.Currency, "credit", amountFloat)
recordJournalEntry("credit", "success", 0)
recordTransactionAmount(req.Money.Currency, journalEntryTypeCredit, amountFloat)
recordJournalEntry(journalEntryTypeCredit, journalEntryStatusSuccess, 0)
return result.(*ledgerv1.PostResponse), nil
}
}

View File

@@ -63,7 +63,7 @@ func (s *Service) postDebitResponder(_ context.Context, req *ledgerv1.PostDebitR
existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey)
if err == nil && existingEntry != nil {
recordDuplicateRequest("debit")
recordDuplicateRequest(journalEntryTypeDebit)
logger.Info("Duplicate debit request (idempotency)",
zap.String("existingEntryID", existingEntry.GetID().Hex()))
return &ledgerv1.PostResponse{
@@ -79,11 +79,11 @@ func (s *Service) postDebitResponder(_ context.Context, req *ledgerv1.PostDebitR
account, accountRef, err := s.resolveAccount(ctx, strings.TrimSpace(req.LedgerAccountRef), roleModel, orgRef, req.Money.Currency, "account")
if err != nil {
recordJournalEntryError("debit", "account_resolve_failed")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorAccountResolve)
return nil, err
}
if err := validateAccountForOrg(account, orgRef, req.Money.Currency); err != nil {
recordJournalEntryError("debit", "account_invalid")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorAccountInvalid)
return nil, err
}
@@ -156,12 +156,12 @@ func (s *Service) postDebitResponder(_ context.Context, req *ledgerv1.PostDebitR
contraAccount, err := s.resolveSettlementAccount(ctx, orgRef, req.Money.Currency, req.ContraLedgerAccountRef, accountsByRef)
if err != nil {
recordJournalEntryError("debit", "contra_resolve_failed")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorContraResolve)
return nil, err
}
contraAccountID := contraAccount.GetID()
if contraAccountID == nil {
recordJournalEntryError("debit", "contra_missing_id")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorContraMissingID)
return nil, merrors.Internal("contra account missing identifier")
}
@@ -180,7 +180,7 @@ func (s *Service) postDebitResponder(_ context.Context, req *ledgerv1.PostDebitR
}
if !entryTotal.IsZero() {
recordJournalEntryError("debit", "unbalanced_after_contra")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorUnbalancedAfterContra)
return nil, merrors.Internal("failed to balance journal entry")
}
@@ -234,13 +234,13 @@ func (s *Service) postDebitResponder(_ context.Context, req *ledgerv1.PostDebitR
})
if err != nil {
recordJournalEntryError("debit", "transaction_failed")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorTransactionFailed)
return nil, err
}
amountFloat, _ := debitAmount.Float64()
recordTransactionAmount(req.Money.Currency, "debit", amountFloat)
recordJournalEntry("debit", "success", 0)
recordTransactionAmount(req.Money.Currency, journalEntryTypeDebit, amountFloat)
recordJournalEntry(journalEntryTypeDebit, journalEntryStatusSuccess, 0)
return result.(*ledgerv1.PostResponse), nil
}
}

View File

@@ -60,7 +60,7 @@ func (s *Service) postExternalCreditResponder(_ context.Context, req *ledgerv1.P
existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey)
if err == nil && existingEntry != nil {
recordDuplicateRequest("credit")
recordDuplicateRequest(journalEntryTypeCredit)
logger.Info("Duplicate external credit request (idempotency)",
zap.String("existingEntryID", existingEntry.GetID().Hex()))
return &ledgerv1.PostResponse{
@@ -70,34 +70,34 @@ func (s *Service) postExternalCreditResponder(_ context.Context, req *ledgerv1.P
}, nil
}
if err != nil && err != storage.ErrJournalEntryNotFound {
recordJournalEntryError("credit", "idempotency_check_failed")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorIdempotencyCheck)
logger.Warn("Failed to check idempotency", zap.Error(err))
return nil, merrors.Internal("failed to check idempotency")
}
account, accountRef, err := s.resolveAccount(ctx, strings.TrimSpace(req.LedgerAccountRef), roleModel, orgRef, req.Money.Currency, "account")
if err != nil {
recordJournalEntryError("credit", "account_resolve_failed")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorAccountResolve)
return nil, err
}
if err := validateAccountForOrg(account, orgRef, req.Money.Currency); err != nil {
recordJournalEntryError("credit", "account_invalid")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorAccountInvalid)
return nil, err
}
systemAccount, err := s.systemAccount(ctx, pmodel.SystemAccountPurposeExternalSource, req.Money.Currency)
if err != nil {
recordJournalEntryError("credit", "system_account_resolve_failed")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorSystemAccountResolve)
return nil, err
}
if err := validateSystemAccount(systemAccount, pmodel.SystemAccountPurposeExternalSource, req.Money.Currency); err != nil {
recordJournalEntryError("credit", "system_account_invalid")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorSystemAccountInvalid)
return nil, err
}
systemAccountID := systemAccount.GetID()
if systemAccountID == nil {
recordJournalEntryError("credit", "system_account_missing_id")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorSystemAccountMissing)
return nil, merrors.Internal("system account missing identifier")
}
@@ -186,7 +186,7 @@ func (s *Service) postExternalCreditResponder(_ context.Context, req *ledgerv1.P
}
if !entryTotal.IsZero() {
recordJournalEntryError("credit", "unbalanced_after_contra")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorUnbalancedAfterContra)
return nil, merrors.Internal("failed to balance journal entry")
}
@@ -240,13 +240,13 @@ func (s *Service) postExternalCreditResponder(_ context.Context, req *ledgerv1.P
})
if err != nil {
recordJournalEntryError("credit", "transaction_failed")
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorTransactionFailed)
return nil, err
}
amountFloat, _ := creditAmount.Float64()
recordTransactionAmount(req.Money.Currency, "credit", amountFloat)
recordJournalEntry("credit", "success", 0)
recordTransactionAmount(req.Money.Currency, journalEntryTypeCredit, amountFloat)
recordJournalEntry(journalEntryTypeCredit, journalEntryStatusSuccess, 0)
return result.(*ledgerv1.PostResponse), nil
}
}
@@ -293,7 +293,7 @@ func (s *Service) postExternalDebitResponder(_ context.Context, req *ledgerv1.Po
existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey)
if err == nil && existingEntry != nil {
recordDuplicateRequest("debit")
recordDuplicateRequest(journalEntryTypeDebit)
logger.Info("Duplicate external debit request (idempotency)",
zap.String("existingEntryID", existingEntry.GetID().Hex()))
return &ledgerv1.PostResponse{
@@ -303,34 +303,34 @@ func (s *Service) postExternalDebitResponder(_ context.Context, req *ledgerv1.Po
}, nil
}
if err != nil && err != storage.ErrJournalEntryNotFound {
recordJournalEntryError("debit", "idempotency_check_failed")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorIdempotencyCheck)
logger.Warn("Failed to check idempotency", zap.Error(err))
return nil, merrors.Internal("failed to check idempotency")
}
account, accountRef, err := s.resolveAccount(ctx, strings.TrimSpace(req.LedgerAccountRef), roleModel, orgRef, req.Money.Currency, "account")
if err != nil {
recordJournalEntryError("debit", "account_resolve_failed")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorAccountResolve)
return nil, err
}
if err := validateAccountForOrg(account, orgRef, req.Money.Currency); err != nil {
recordJournalEntryError("debit", "account_invalid")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorAccountInvalid)
return nil, err
}
systemAccount, err := s.systemAccount(ctx, pmodel.SystemAccountPurposeExternalSink, req.Money.Currency)
if err != nil {
recordJournalEntryError("debit", "system_account_resolve_failed")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorSystemAccountResolve)
return nil, err
}
if err := validateSystemAccount(systemAccount, pmodel.SystemAccountPurposeExternalSink, req.Money.Currency); err != nil {
recordJournalEntryError("debit", "system_account_invalid")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorSystemAccountInvalid)
return nil, err
}
systemAccountID := systemAccount.GetID()
if systemAccountID == nil {
recordJournalEntryError("debit", "system_account_missing_id")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorSystemAccountMissing)
return nil, merrors.Internal("system account missing identifier")
}
@@ -419,7 +419,7 @@ func (s *Service) postExternalDebitResponder(_ context.Context, req *ledgerv1.Po
}
if !entryTotal.IsZero() {
recordJournalEntryError("debit", "unbalanced_after_contra")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorUnbalancedAfterContra)
return nil, merrors.Internal("failed to balance journal entry")
}
@@ -473,13 +473,13 @@ func (s *Service) postExternalDebitResponder(_ context.Context, req *ledgerv1.Po
})
if err != nil {
recordJournalEntryError("debit", "transaction_failed")
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorTransactionFailed)
return nil, err
}
amountFloat, _ := debitAmount.Float64()
recordTransactionAmount(req.Money.Currency, "debit", amountFloat)
recordJournalEntry("debit", "success", 0)
recordTransactionAmount(req.Money.Currency, journalEntryTypeDebit, amountFloat)
recordJournalEntry(journalEntryTypeDebit, journalEntryStatusSuccess, 0)
return result.(*ledgerv1.PostResponse), nil
}
}

View File

@@ -76,7 +76,7 @@ func (s *Service) fxResponder(_ context.Context, req *ledgerv1.FXRequest) gsresp
// Check for duplicate idempotency key
existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey)
if err == nil && existingEntry != nil {
recordDuplicateRequest("fx")
recordDuplicateRequest(journalEntryTypeFX)
logger.Info("Duplicate FX request (idempotency)",
zap.String("existingEntryID", existingEntry.GetID().Hex()))
return &ledgerv1.PostResponse{
@@ -244,15 +244,15 @@ func (s *Service) fxResponder(_ context.Context, req *ledgerv1.FXRequest) gsresp
})
if err != nil {
recordJournalEntryError("fx", "transaction_failed")
recordJournalEntryError(journalEntryTypeFX, journalEntryErrorTransactionFailed)
return nil, err
}
fromAmountFloat, _ := fromAmount.Float64()
toAmountFloat, _ := toAmount.Float64()
recordTransactionAmount(req.FromMoney.Currency, "fx", fromAmountFloat)
recordTransactionAmount(req.ToMoney.Currency, "fx", toAmountFloat)
recordJournalEntry("fx", "success", 0)
recordTransactionAmount(req.FromMoney.Currency, journalEntryTypeFX, fromAmountFloat)
recordTransactionAmount(req.ToMoney.Currency, journalEntryTypeFX, toAmountFloat)
recordJournalEntry(journalEntryTypeFX, journalEntryStatusSuccess, 0)
return result.(*ledgerv1.PostResponse), nil
}
}

View File

@@ -86,7 +86,7 @@ func (s *Service) transferResponder(_ context.Context, req *ledgerv1.TransferReq
// Check for duplicate idempotency key
existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey)
if err == nil && existingEntry != nil {
recordDuplicateRequest("transfer")
recordDuplicateRequest(journalEntryTypeTransfer)
logger.Info("Duplicate transfer request (idempotency)",
zap.String("existingEntryID", existingEntry.GetID().Hex()))
return &ledgerv1.PostResponse{
@@ -246,13 +246,13 @@ func (s *Service) transferResponder(_ context.Context, req *ledgerv1.TransferReq
})
if err != nil {
recordJournalEntryError("transfer", "failed")
recordJournalEntryError(journalEntryTypeTransfer, journalEntryErrorFailed)
return nil, err
}
amountFloat, _ := transferAmount.Float64()
recordTransactionAmount(req.Money.Currency, "transfer", amountFloat)
recordJournalEntry("transfer", "success", 0)
recordTransactionAmount(req.Money.Currency, journalEntryTypeTransfer, amountFloat)
recordJournalEntry(journalEntryTypeTransfer, journalEntryStatusSuccess, 0)
return result.(*ledgerv1.PostResponse), nil
}
}

View File

@@ -77,7 +77,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.
initMetrics()
service := &Service{
logger: logger.Named("ledger"),
logger: logger.Named("service"),
storage: repo,
producer: prod,
msgCfg: msgCfg,
@@ -117,17 +117,10 @@ func (s *Service) CreateAccount(ctx context.Context, req *ledgerv1.CreateAccount
func (s *Service) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
start := time.Now()
defer func() {
recordJournalEntry("credit", "attempted", time.Since(start).Seconds())
recordJournalEntry(journalEntryTypeCredit, journalEntryStatusAttempted, time.Since(start).Seconds())
}()
responder := s.postCreditResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError("credit", "not_implemented")
}
logger := s.logger.With(zap.String("operation", "credit"))
logger := s.logger.With(zap.String("operation", discovery.OperationLedgerCredit))
if req != nil {
logger = logger.With(
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
@@ -147,7 +140,16 @@ func (s *Service) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostC
logger = logger.With(zap.String("contra_ledger_account_ref", contra))
}
}
s.logLedgerOperation("credit", logger, resp, err)
s.logLedgerOperationStart(discovery.OperationLedgerCredit, logger)
responder := s.postCreditResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorNotImplemented)
}
s.logLedgerOperation(discovery.OperationLedgerCredit, logger, resp, err, time.Since(start))
return resp, err
}
@@ -156,17 +158,10 @@ func (s *Service) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostC
func (s *Service) PostExternalCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
start := time.Now()
defer func() {
recordJournalEntry("credit", "attempted", time.Since(start).Seconds())
recordJournalEntry(journalEntryTypeCredit, journalEntryStatusAttempted, time.Since(start).Seconds())
}()
responder := s.postExternalCreditResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError("credit", "failed")
}
logger := s.logger.With(zap.String("operation", "external_credit"))
logger := s.logger.With(zap.String("operation", discovery.OperationExternalCredit))
if req != nil {
logger = logger.With(
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
@@ -183,7 +178,16 @@ func (s *Service) PostExternalCreditWithCharges(ctx context.Context, req *ledger
logger = logger.With(zap.String("role", role.String()))
}
}
s.logLedgerOperation("external_credit", logger, resp, err)
s.logLedgerOperationStart(discovery.OperationExternalCredit, logger)
responder := s.postExternalCreditResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorFailed)
}
s.logLedgerOperation(discovery.OperationExternalCredit, logger, resp, err, time.Since(start))
return resp, err
}
@@ -192,17 +196,10 @@ func (s *Service) PostExternalCreditWithCharges(ctx context.Context, req *ledger
func (s *Service) PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) {
start := time.Now()
defer func() {
recordJournalEntry("debit", "attempted", time.Since(start).Seconds())
recordJournalEntry(journalEntryTypeDebit, journalEntryStatusAttempted, time.Since(start).Seconds())
}()
responder := s.postDebitResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError("debit", "failed")
}
logger := s.logger.With(zap.String("operation", "debit"))
logger := s.logger.With(zap.String("operation", discovery.OperationLedgerDebit))
if req != nil {
logger = logger.With(
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
@@ -222,7 +219,16 @@ func (s *Service) PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDe
logger = logger.With(zap.String("contra_ledger_account_ref", contra))
}
}
s.logLedgerOperation("debit", logger, resp, err)
s.logLedgerOperationStart(discovery.OperationLedgerDebit, logger)
responder := s.postDebitResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorFailed)
}
s.logLedgerOperation(discovery.OperationLedgerDebit, logger, resp, err, time.Since(start))
return resp, err
}
@@ -231,17 +237,10 @@ func (s *Service) PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDe
func (s *Service) PostExternalDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) {
start := time.Now()
defer func() {
recordJournalEntry("debit", "attempted", time.Since(start).Seconds())
recordJournalEntry(journalEntryTypeDebit, journalEntryStatusAttempted, time.Since(start).Seconds())
}()
responder := s.postExternalDebitResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError("debit", "failed")
}
logger := s.logger.With(zap.String("operation", "external_debit"))
logger := s.logger.With(zap.String("operation", discovery.OperationExternalDebit))
if req != nil {
logger = logger.With(
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
@@ -258,7 +257,16 @@ func (s *Service) PostExternalDebitWithCharges(ctx context.Context, req *ledgerv
logger = logger.With(zap.String("role", role.String()))
}
}
s.logLedgerOperation("external_debit", logger, resp, err)
s.logLedgerOperationStart(discovery.OperationExternalDebit, logger)
responder := s.postExternalDebitResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorFailed)
}
s.logLedgerOperation(discovery.OperationExternalDebit, logger, resp, err, time.Since(start))
return resp, err
}
@@ -267,17 +275,10 @@ func (s *Service) PostExternalDebitWithCharges(ctx context.Context, req *ledgerv
func (s *Service) TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
start := time.Now()
defer func() {
recordJournalEntry("transfer", "attempted", time.Since(start).Seconds())
recordJournalEntry(journalEntryTypeTransfer, journalEntryStatusAttempted, time.Since(start).Seconds())
}()
responder := s.transferResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError("transfer", "failed")
}
logger := s.logger.With(zap.String("operation", "transfer"))
logger := s.logger.With(zap.String("operation", discovery.OperationLedgerTransfer))
if req != nil {
logger = logger.With(
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
@@ -298,7 +299,16 @@ func (s *Service) TransferInternal(ctx context.Context, req *ledgerv1.TransferRe
logger = logger.With(zap.String("to_role", role.String()))
}
}
s.logLedgerOperation("transfer", logger, resp, err)
s.logLedgerOperationStart(discovery.OperationLedgerTransfer, logger)
responder := s.transferResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError(journalEntryTypeTransfer, journalEntryErrorFailed)
}
s.logLedgerOperation(discovery.OperationLedgerTransfer, logger, resp, err, time.Since(start))
return resp, err
}
@@ -307,17 +317,10 @@ func (s *Service) TransferInternal(ctx context.Context, req *ledgerv1.TransferRe
func (s *Service) ApplyFXWithCharges(ctx context.Context, req *ledgerv1.FXRequest) (*ledgerv1.PostResponse, error) {
start := time.Now()
defer func() {
recordJournalEntry("fx", "attempted", time.Since(start).Seconds())
recordJournalEntry(journalEntryTypeFX, journalEntryStatusAttempted, time.Since(start).Seconds())
}()
responder := s.fxResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError("fx", "failed")
}
logger := s.logger.With(zap.String("operation", "fx"))
logger := s.logger.With(zap.String("operation", discovery.OperationLedgerFX))
if req != nil {
logger = logger.With(
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
@@ -341,7 +344,16 @@ func (s *Service) ApplyFXWithCharges(ctx context.Context, req *ledgerv1.FXReques
logger = logger.With(zap.String("rate", rate))
}
}
s.logLedgerOperation("fx", logger, resp, err)
s.logLedgerOperationStart(discovery.OperationLedgerFX, logger)
responder := s.fxResponder(ctx, req)
resp, err := responder(ctx)
if err != nil {
recordJournalEntryError(journalEntryTypeFX, journalEntryErrorFailed)
}
s.logLedgerOperation(discovery.OperationLedgerFX, logger, resp, err, time.Since(start))
return resp, err
}
@@ -365,23 +377,42 @@ func (s *Service) GetJournalEntry(ctx context.Context, req *ledgerv1.GetEntryReq
return responder(ctx)
}
func (s *Service) logLedgerOperation(op string, logger mlogger.Logger, resp *ledgerv1.PostResponse, err error) {
func (s *Service) logLedgerOperationStart(op string, logger mlogger.Logger) {
if logger == nil {
return
}
if err != nil {
logger.Warn(fmt.Sprintf("ledger %s failed", op), zap.Error(err))
logger.Debug("Ledger operation execution started", zap.String("operation_name", op))
}
func (s *Service) logLedgerOperation(op string, logger mlogger.Logger, resp *ledgerv1.PostResponse, err error, duration time.Duration) {
if logger == nil {
return
}
entryRef := ""
if resp != nil {
entryRef = strings.TrimSpace(resp.GetJournalEntryRef())
}
if entryRef == "" {
logger.Info(fmt.Sprintf("ledger %s posted", op))
status := "succeeded"
fields := []zap.Field{
zap.String("operation_name", op),
zap.String("status", status),
zap.Int64("duration_ms", duration.Milliseconds()),
}
if entryRef != "" {
fields = append(fields, zap.String("journal_entry_ref", entryRef))
}
if err != nil {
fields[1] = zap.String("status", "failed")
logger.Debug("Ledger operation execution completed", append(fields, zap.Error(err))...)
logger.Warn("Ledger operation failed", zap.String("operation_name", op), zap.Error(err))
return
}
logger.Info(fmt.Sprintf("ledger %s posted", op), zap.String("journal_entry_ref", entryRef))
logger.Debug("Ledger operation execution completed", fields...)
if entryRef == "" {
logger.Info("Ledger operation posted", zap.String("operation_name", op))
return
}
logger.Info("Ledger operation posted", zap.String("operation_name", op), zap.String("journal_entry_ref", entryRef))
}
func (s *Service) Shutdown() {
@@ -402,7 +433,7 @@ func (s *Service) startDiscoveryAnnouncer() {
}
announce := discovery.Announcement{
Service: "LEDGER",
Operations: []string{"balance.read", "ledger.debit", "ledger.credit", "external.credit", "external.debit"},
Operations: discovery.LedgerServiceOperations(),
InvokeURI: s.invokeURI,
Version: appversion.Create().Short(),
}
@@ -428,8 +459,7 @@ func (s *Service) startOutboxReliableProducer() error {
}
s.outbox.producer = reliableProducer
if s.outbox.producer == nil || s.producer == nil {
s.logger.Info("Outbox reliable publisher disabled",
zap.Bool("enabled", settings.Enabled))
s.logger.Info("Outbox reliable publisher disabled", zap.Bool("enabled", settings.Enabled))
return
}
s.logger.Info("Outbox reliable publisher configured",