package ledger import ( "context" "fmt" "strings" "sync" "time" "github.com/shopspring/decimal" feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" accountingv1 "github.com/tech/sendico/pkg/proto/common/accounting/v1" moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" tracev1 "github.com/tech/sendico/pkg/proto/common/trace/v1" "github.com/tech/sendico/ledger/internal/appversion" "github.com/tech/sendico/ledger/storage" "github.com/tech/sendico/pkg/api/routers" "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" pmessaging "github.com/tech/sendico/pkg/messaging" pmessagingreliable "github.com/tech/sendico/pkg/messaging/reliable" "github.com/tech/sendico/pkg/mlogger" pmodel "github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/mservice" connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" ) type serviceError string func (e serviceError) Error() string { return string(e) } var ( errStorageNotInitialized = serviceError("ledger: storage not initialized") ) type Service struct { logger mlogger.Logger storage storage.Repository producer pmessaging.Producer msgCfg pmodel.SettingsT fees feesDependency announcer *discovery.Announcer invokeURI string outbox struct { once sync.Once cancel context.CancelFunc producer *pmessagingreliable.ReliableProducer } systemAccounts struct { mu sync.RWMutex externalSource map[string]*pmodel.LedgerAccount externalSink map[string]*pmodel.LedgerAccount } } type feesDependency struct { client feesv1.FeeEngineClient timeout time.Duration } func (f feesDependency) available() bool { return f.client != nil } func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, msgCfg pmodel.SettingsT, feesClient feesv1.FeeEngineClient, feesTimeout time.Duration, invokeURI string) (*Service, error) { // Initialize Prometheus metrics initMetrics() service := &Service{ logger: logger.Named("service"), storage: repo, producer: prod, msgCfg: msgCfg, invokeURI: strings.TrimSpace(invokeURI), fees: feesDependency{ client: feesClient, timeout: feesTimeout, }, } if err := service.startOutboxReliableProducer(); err != nil { return nil, err } service.startDiscoveryAnnouncer() return service, nil } func (s *Service) Register(router routers.GRPC) error { return router.Register(func(reg grpc.ServiceRegistrar) { connectorv1.RegisterConnectorServiceServer(reg, newConnectorAdapter(s)) }) } // ListAccounts lists ledger accounts for an organization. func (s *Service) ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error) { responder := s.listAccountsResponder(ctx, req) return responder(ctx) } // CreateAccount provisions a new ledger account scoped to an organization. func (s *Service) CreateAccount(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error) { responder := s.createAccountResponder(ctx, req) return responder(ctx) } // PostCreditWithCharges handles credit posting with fees in one atomic journal entry func (s *Service) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) { start := time.Now() defer func() { recordJournalEntry(journalEntryTypeCredit, journalEntryStatusAttempted, time.Since(start).Seconds()) }() logger := s.logger.With(zap.String("operation", discovery.OperationLedgerCredit)) if req != nil { logger = logger.With( zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())), zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), zap.String("ledger_account_ref", strings.TrimSpace(req.GetLedgerAccountRef())), ) if money := req.GetMoney(); money != nil { logger = logger.With( zap.String("currency", money.GetCurrency()), zap.String("amount", money.GetAmount()), ) } if role := req.GetRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED { logger = logger.With(zap.String("role", role.String())) } if contra := strings.TrimSpace(req.GetContraLedgerAccountRef()); contra != "" { logger = logger.With(zap.String("contra_ledger_account_ref", contra)) } } s.logLedgerOperationStart(discovery.OperationLedgerCredit, logger) responder := s.postCreditResponder(ctx, req) resp, err := responder(ctx) if err != nil { recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorNotImplemented) } s.logLedgerOperation(discovery.OperationLedgerCredit, logger, resp, err, time.Since(start)) return resp, err } // PostExternalCreditWithCharges handles external credit posting (from outside the ledger). func (s *Service) PostExternalCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) { start := time.Now() defer func() { recordJournalEntry(journalEntryTypeCredit, journalEntryStatusAttempted, time.Since(start).Seconds()) }() logger := s.logger.With(zap.String("operation", discovery.OperationExternalCredit)) if req != nil { logger = logger.With( zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())), zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), zap.String("ledger_account_ref", strings.TrimSpace(req.GetLedgerAccountRef())), ) if money := req.GetMoney(); money != nil { logger = logger.With( zap.String("currency", money.GetCurrency()), zap.String("amount", money.GetAmount()), ) } if role := req.GetRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED { logger = logger.With(zap.String("role", role.String())) } } s.logLedgerOperationStart(discovery.OperationExternalCredit, logger) responder := s.postExternalCreditResponder(ctx, req) resp, err := responder(ctx) if err != nil { recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorFailed) } s.logLedgerOperation(discovery.OperationExternalCredit, logger, resp, err, time.Since(start)) return resp, err } // PostDebitWithCharges handles debit posting with fees in one atomic journal entry func (s *Service) PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) { start := time.Now() defer func() { recordJournalEntry(journalEntryTypeDebit, journalEntryStatusAttempted, time.Since(start).Seconds()) }() logger := s.logger.With(zap.String("operation", discovery.OperationLedgerDebit)) if req != nil { logger = logger.With( zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())), zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), zap.String("ledger_account_ref", strings.TrimSpace(req.GetLedgerAccountRef())), ) if money := req.GetMoney(); money != nil { logger = logger.With( zap.String("currency", money.GetCurrency()), zap.String("amount", money.GetAmount()), ) } if role := req.GetRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED { logger = logger.With(zap.String("role", role.String())) } if contra := strings.TrimSpace(req.GetContraLedgerAccountRef()); contra != "" { logger = logger.With(zap.String("contra_ledger_account_ref", contra)) } } s.logLedgerOperationStart(discovery.OperationLedgerDebit, logger) responder := s.postDebitResponder(ctx, req) resp, err := responder(ctx) if err != nil { recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorFailed) } s.logLedgerOperation(discovery.OperationLedgerDebit, logger, resp, err, time.Since(start)) return resp, err } // PostExternalDebitWithCharges handles external debit posting (to outside the ledger). func (s *Service) PostExternalDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) { start := time.Now() defer func() { recordJournalEntry(journalEntryTypeDebit, journalEntryStatusAttempted, time.Since(start).Seconds()) }() logger := s.logger.With(zap.String("operation", discovery.OperationExternalDebit)) if req != nil { logger = logger.With( zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())), zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), zap.String("ledger_account_ref", strings.TrimSpace(req.GetLedgerAccountRef())), ) if money := req.GetMoney(); money != nil { logger = logger.With( zap.String("currency", money.GetCurrency()), zap.String("amount", money.GetAmount()), ) } if role := req.GetRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED { logger = logger.With(zap.String("role", role.String())) } } s.logLedgerOperationStart(discovery.OperationExternalDebit, logger) responder := s.postExternalDebitResponder(ctx, req) resp, err := responder(ctx) if err != nil { recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorFailed) } s.logLedgerOperation(discovery.OperationExternalDebit, logger, resp, err, time.Since(start)) return resp, err } // TransferInternal handles internal transfer between accounts func (s *Service) TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) { start := time.Now() defer func() { recordJournalEntry(journalEntryTypeTransfer, journalEntryStatusAttempted, time.Since(start).Seconds()) }() logger := s.logger.With(zap.String("operation", discovery.OperationLedgerTransfer)) if req != nil { logger = logger.With( zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())), zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), zap.String("from_account_ref", strings.TrimSpace(req.GetFromLedgerAccountRef())), zap.String("to_account_ref", strings.TrimSpace(req.GetToLedgerAccountRef())), ) if money := req.GetMoney(); money != nil { logger = logger.With( zap.String("currency", money.GetCurrency()), zap.String("amount", money.GetAmount()), ) } if role := req.GetFromRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED { logger = logger.With(zap.String("from_role", role.String())) } if role := req.GetToRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED { logger = logger.With(zap.String("to_role", role.String())) } } s.logLedgerOperationStart(discovery.OperationLedgerTransfer, logger) responder := s.transferResponder(ctx, req) resp, err := responder(ctx) if err != nil { recordJournalEntryError(journalEntryTypeTransfer, journalEntryErrorFailed) } s.logLedgerOperation(discovery.OperationLedgerTransfer, logger, resp, err, time.Since(start)) return resp, err } // ApplyFXWithCharges handles foreign exchange transaction with charges func (s *Service) ApplyFXWithCharges(ctx context.Context, req *ledgerv1.FXRequest) (*ledgerv1.PostResponse, error) { start := time.Now() defer func() { recordJournalEntry(journalEntryTypeFX, journalEntryStatusAttempted, time.Since(start).Seconds()) }() logger := s.logger.With(zap.String("operation", discovery.OperationLedgerFX)) if req != nil { logger = logger.With( zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())), zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())), zap.String("from_account_ref", strings.TrimSpace(req.GetFromLedgerAccountRef())), zap.String("to_account_ref", strings.TrimSpace(req.GetToLedgerAccountRef())), ) if money := req.GetFromMoney(); money != nil { logger = logger.With( zap.String("from_currency", money.GetCurrency()), zap.String("from_amount", money.GetAmount()), ) } if money := req.GetToMoney(); money != nil { logger = logger.With( zap.String("to_currency", money.GetCurrency()), zap.String("to_amount", money.GetAmount()), ) } if rate := strings.TrimSpace(req.GetRate()); rate != "" { logger = logger.With(zap.String("rate", rate)) } } s.logLedgerOperationStart(discovery.OperationLedgerFX, logger) responder := s.fxResponder(ctx, req) resp, err := responder(ctx) if err != nil { recordJournalEntryError(journalEntryTypeFX, journalEntryErrorFailed) } s.logLedgerOperation(discovery.OperationLedgerFX, logger, resp, err, time.Since(start)) return resp, err } // GetBalance queries current account balance func (s *Service) GetBalance(ctx context.Context, req *ledgerv1.GetBalanceRequest) (*ledgerv1.BalanceResponse, error) { start := time.Now() defer func() { recordBalanceQuery("attempted", time.Since(start).Seconds()) }() responder := s.getBalanceResponder(ctx, req) resp, err := responder(ctx) return resp, err } // GetJournalEntry gets journal entry details func (s *Service) GetJournalEntry(ctx context.Context, req *ledgerv1.GetEntryRequest) (*ledgerv1.JournalEntryResponse, error) { responder := s.getJournalEntryResponder(ctx, req) return responder(ctx) } func (s *Service) logLedgerOperationStart(op string, logger mlogger.Logger) { if logger == nil { return } logger.Debug("Ledger operation execution started", zap.String("operation_name", op)) } func (s *Service) logLedgerOperation(op string, logger mlogger.Logger, resp *ledgerv1.PostResponse, err error, duration time.Duration) { if logger == nil { return } entryRef := "" if resp != nil { entryRef = strings.TrimSpace(resp.GetJournalEntryRef()) } status := "succeeded" fields := []zap.Field{ zap.String("operation_name", op), zap.String("status", status), zap.Int64("duration_ms", duration.Milliseconds()), } if entryRef != "" { fields = append(fields, zap.String("journal_entry_ref", entryRef)) } if err != nil { fields[1] = zap.String("status", "failed") logger.Debug("Ledger operation execution completed", append(fields, zap.Error(err))...) logger.Warn("Ledger operation failed", zap.String("operation_name", op), zap.Error(err)) return } logger.Debug("Ledger operation execution completed", fields...) if entryRef == "" { logger.Info("Ledger operation posted", zap.String("operation_name", op)) return } logger.Info("Ledger operation posted", zap.String("operation_name", op), zap.String("journal_entry_ref", entryRef)) } func (s *Service) Shutdown() { if s == nil { return } if s.announcer != nil { s.announcer.Stop() } if s.outbox.cancel != nil { s.outbox.cancel() } } func (s *Service) startDiscoveryAnnouncer() { if s == nil || s.producer == nil { return } announce := discovery.Announcement{ Service: "LEDGER", Rail: discovery.RailLedger, Operations: discovery.LedgerServiceOperations(), InvokeURI: s.invokeURI, Version: appversion.Create().Short(), } s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.Ledger), announce) s.announcer.Start() } func (s *Service) startOutboxReliableProducer() error { if s.storage == nil { return nil } var initErr error s.outbox.once.Do(func() { outboxStore := s.storage.Outbox() if outboxStore == nil { return } reliableProducer, settings, err := newLedgerReliableProducer(s.logger, s.producer, outboxStore, s.msgCfg) if err != nil { initErr = err return } s.outbox.producer = reliableProducer if s.outbox.producer == nil || s.producer == nil { s.logger.Info("Outbox reliable publisher disabled", zap.Bool("enabled", settings.Enabled)) return } s.logger.Info("Outbox reliable publisher configured", zap.Bool("enabled", settings.Enabled), zap.Int("batch_size", settings.BatchSize), zap.Int("poll_interval_seconds", settings.PollIntervalSeconds), zap.Int("max_attempts", settings.MaxAttempts)) ctx, cancel := context.WithCancel(context.Background()) s.outbox.cancel = cancel go s.outbox.producer.Run(ctx) }) return initErr } // BlockAccount freezes a ledger account func (s *Service) BlockAccount(ctx context.Context, req *ledgerv1.BlockAccountRequest) (*ledgerv1.BlockAccountResponse, error) { responder := s.blockAccountResponder(ctx, req) return responder(ctx) } // UnblockAccount activates a frozen ledger account func (s *Service) UnblockAccount(ctx context.Context, req *ledgerv1.UnblockAccountRequest) (*ledgerv1.UnblockAccountResponse, error) { responder := s.unblockAccountResponder(ctx, req) return responder(ctx) } // GetStatement gets account statement with pagination func (s *Service) GetStatement(ctx context.Context, req *ledgerv1.GetStatementRequest) (*ledgerv1.StatementResponse, error) { responder := s.getStatementResponder(ctx, req) return responder(ctx) } func (s *Service) quoteFeesForCredit(ctx context.Context, req *ledgerv1.PostCreditRequest) ([]*ledgerv1.PostingLine, error) { if !s.fees.available() { return nil, nil } attrs := map[string]string{} if strings.TrimSpace(req.GetDescription()) != "" { attrs["description"] = req.GetDescription() } return s.quoteFees(ctx, feesv1.Trigger_TRIGGER_CAPTURE, req.GetOrganizationRef(), req.GetIdempotencyKey(), req.GetLedgerAccountRef(), "ledger.post_credit", req.GetIdempotencyKey(), req.GetEventTime(), req.Money, attrs) } func (s *Service) quoteFeesForDebit(ctx context.Context, req *ledgerv1.PostDebitRequest) ([]*ledgerv1.PostingLine, error) { if !s.fees.available() { return nil, nil } attrs := map[string]string{} if strings.TrimSpace(req.GetDescription()) != "" { attrs["description"] = req.GetDescription() } return s.quoteFees(ctx, feesv1.Trigger_TRIGGER_REFUND, req.GetOrganizationRef(), req.GetIdempotencyKey(), req.GetLedgerAccountRef(), "ledger.post_debit", req.GetIdempotencyKey(), req.GetEventTime(), req.Money, attrs) } func (s *Service) quoteFees(ctx context.Context, trigger feesv1.Trigger, organizationRef, idempotencyKey, ledgerAccountRef, originType, originRef string, eventTime *timestamppb.Timestamp, baseAmount *moneyv1.Money, attributes map[string]string) ([]*ledgerv1.PostingLine, error) { if !s.fees.available() { return nil, nil } if strings.TrimSpace(organizationRef) == "" { return nil, merrors.InvalidArgument("organization reference is required to quote fees") } if baseAmount == nil { return nil, merrors.InvalidArgument("base amount is required to quote fees") } amountCopy := &moneyv1.Money{Amount: baseAmount.GetAmount(), Currency: baseAmount.GetCurrency()} bookedAt := eventTime if bookedAt == nil { bookedAt = timestamppb.Now() } trace := &tracev1.TraceContext{ RequestRef: idempotencyKey, IdempotencyKey: idempotencyKey, } req := &feesv1.QuoteFeesRequest{ Meta: &feesv1.RequestMeta{ OrganizationRef: organizationRef, Trace: trace, }, Intent: &feesv1.Intent{ Trigger: trigger, BaseAmount: amountCopy, BookedAt: bookedAt, OriginType: originType, OriginRef: originRef, Attributes: map[string]string{}, }, } setFeeAttributeIfMissing(req.Intent.Attributes, "product", "ledger") setFeeAttributeIfMissing(req.Intent.Attributes, "operation", ledgerOperation(originType, trigger)) setFeeAttributeIfMissing(req.Intent.Attributes, "currency", strings.TrimSpace(baseAmount.GetCurrency())) if ledgerAccountRef != "" { req.Intent.Attributes["ledger_account_ref"] = ledgerAccountRef } for k, v := range attributes { if strings.TrimSpace(k) == "" { continue } req.Intent.Attributes[k] = v } callCtx := ctx if s.fees.timeout > 0 { var cancel context.CancelFunc callCtx, cancel = context.WithTimeout(ctx, s.fees.timeout) defer cancel() } resp, err := s.fees.client.QuoteFees(callCtx, req) if err != nil { return nil, err } lines, err := convertFeeDerivedLines(resp.GetLines()) if err != nil { return nil, err } return lines, nil } func convertFeeDerivedLines(lines []*feesv1.DerivedPostingLine) ([]*ledgerv1.PostingLine, error) { result := make([]*ledgerv1.PostingLine, 0, len(lines)) for idx, line := range lines { if line == nil { continue } if line.GetMoney() == nil { return nil, merrors.Internal(fmt.Sprintf("fee line %d missing money", idx)) } dec, err := decimal.NewFromString(line.GetMoney().GetAmount()) if err != nil { return nil, merrors.InternalWrap(err, fmt.Sprintf("fee line %d invalid amount", idx)) } dec = ensureAmountForSide(dec, line.GetSide()) posting := &ledgerv1.PostingLine{ LedgerAccountRef: line.GetLedgerAccountRef(), Money: &moneyv1.Money{ Amount: dec.String(), Currency: line.GetMoney().GetCurrency(), }, LineType: mapFeeLineType(line.GetLineType()), } result = append(result, posting) } return result, nil } func ensureAmountForSide(amount decimal.Decimal, side accountingv1.EntrySide) decimal.Decimal { switch side { case accountingv1.EntrySide_ENTRY_SIDE_DEBIT: if amount.Sign() > 0 { return amount.Neg() } case accountingv1.EntrySide_ENTRY_SIDE_CREDIT: if amount.Sign() < 0 { return amount.Neg() } } return amount } func setFeeAttributeIfMissing(attrs map[string]string, key, value string) { if attrs == nil { return } if strings.TrimSpace(key) == "" { return } value = strings.TrimSpace(value) if value == "" { return } if _, exists := attrs[key]; exists { return } attrs[key] = value } func ledgerOperation(originType string, trigger feesv1.Trigger) string { originType = strings.TrimSpace(originType) if originType != "" { parts := strings.SplitN(originType, ".", 2) if len(parts) == 2 && strings.TrimSpace(parts[1]) != "" { return strings.TrimSpace(parts[1]) } } switch trigger { case feesv1.Trigger_TRIGGER_CAPTURE: return "credit" case feesv1.Trigger_TRIGGER_REFUND: return "debit" case feesv1.Trigger_TRIGGER_PAYOUT: return "payout" case feesv1.Trigger_TRIGGER_DISPUTE: return "dispute" case feesv1.Trigger_TRIGGER_FX_CONVERSION: return "fx_conversion" default: return "" } } func mapFeeLineType(lineType accountingv1.PostingLineType) ledgerv1.LineType { switch lineType { case accountingv1.PostingLineType_POSTING_LINE_FEE: return ledgerv1.LineType_LINE_FEE case accountingv1.PostingLineType_POSTING_LINE_SPREAD: return ledgerv1.LineType_LINE_SPREAD case accountingv1.PostingLineType_POSTING_LINE_REVERSAL: return ledgerv1.LineType_LINE_REVERSAL default: return ledgerv1.LineType_LINE_FEE } }