473 lines
16 KiB
Go
473 lines
16 KiB
Go
package ledger
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/tech/sendico/ledger/internal/appversion"
|
|
"github.com/tech/sendico/ledger/storage"
|
|
"github.com/tech/sendico/pkg/api/routers"
|
|
"github.com/tech/sendico/pkg/discovery"
|
|
pmessaging "github.com/tech/sendico/pkg/messaging"
|
|
pmessagingreliable "github.com/tech/sendico/pkg/messaging/reliable"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
pmodel "github.com/tech/sendico/pkg/model"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
|
|
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
|
|
)
|
|
|
|
type serviceError string
|
|
|
|
func (e serviceError) Error() string {
|
|
return string(e)
|
|
}
|
|
|
|
var (
|
|
errStorageNotInitialized = serviceError("ledger: storage not initialized")
|
|
)
|
|
|
|
type Service struct {
|
|
logger mlogger.Logger
|
|
storage storage.Repository
|
|
producer pmessaging.Producer
|
|
msgCfg pmodel.SettingsT
|
|
announcer *discovery.Announcer
|
|
invokeURI string
|
|
|
|
outbox struct {
|
|
once sync.Once
|
|
cancel context.CancelFunc
|
|
producer *pmessagingreliable.ReliableProducer
|
|
}
|
|
|
|
systemAccounts struct {
|
|
mu sync.RWMutex
|
|
externalSource map[string]*pmodel.LedgerAccount
|
|
externalSink map[string]*pmodel.LedgerAccount
|
|
}
|
|
}
|
|
|
|
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, msgCfg pmodel.SettingsT, invokeURI string) (*Service, error) {
|
|
// Initialize Prometheus metrics
|
|
initMetrics()
|
|
|
|
service := &Service{
|
|
logger: logger.Named("service"),
|
|
storage: repo,
|
|
producer: prod,
|
|
msgCfg: msgCfg,
|
|
invokeURI: strings.TrimSpace(invokeURI),
|
|
}
|
|
|
|
if err := service.startOutboxReliableProducer(); err != nil {
|
|
return nil, err
|
|
}
|
|
service.startDiscoveryAnnouncer()
|
|
return service, nil
|
|
}
|
|
|
|
func (s *Service) Register(router routers.GRPC) error {
|
|
return router.Register(func(reg grpc.ServiceRegistrar) {
|
|
connectorv1.RegisterConnectorServiceServer(reg, newConnectorAdapter(s))
|
|
})
|
|
}
|
|
|
|
// ListAccounts lists ledger accounts for an organization.
|
|
func (s *Service) ListAccounts(ctx context.Context, req *ledgerv1.ListAccountsRequest) (*ledgerv1.ListAccountsResponse, error) {
|
|
responder := s.listAccountsResponder(ctx, req)
|
|
return responder(ctx)
|
|
}
|
|
|
|
// CreateAccount provisions a new ledger account scoped to an organization.
|
|
func (s *Service) CreateAccount(ctx context.Context, req *ledgerv1.CreateAccountRequest) (*ledgerv1.CreateAccountResponse, error) {
|
|
responder := s.createAccountResponder(ctx, req)
|
|
return responder(ctx)
|
|
}
|
|
|
|
// PostCreditWithCharges handles credit posting with fees in one atomic journal entry
|
|
func (s *Service) PostCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
recordJournalEntry(journalEntryTypeCredit, journalEntryStatusAttempted, time.Since(start).Seconds())
|
|
}()
|
|
|
|
logger := s.logger.With(zap.String("operation", discovery.OperationLedgerCredit))
|
|
if req != nil {
|
|
logger = logger.With(
|
|
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
|
|
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
|
|
zap.String("ledger_account_ref", strings.TrimSpace(req.GetLedgerAccountRef())),
|
|
)
|
|
if money := req.GetMoney(); money != nil {
|
|
logger = logger.With(
|
|
zap.String("currency", money.GetCurrency()),
|
|
zap.String("amount", money.GetAmount()),
|
|
)
|
|
}
|
|
if role := req.GetRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED {
|
|
logger = logger.With(zap.String("role", role.String()))
|
|
}
|
|
if contra := strings.TrimSpace(req.GetContraLedgerAccountRef()); contra != "" {
|
|
logger = logger.With(zap.String("contra_ledger_account_ref", contra))
|
|
}
|
|
}
|
|
s.logLedgerOperationStart(discovery.OperationLedgerCredit, logger)
|
|
|
|
responder := s.postCreditResponder(ctx, req)
|
|
resp, err := responder(ctx)
|
|
|
|
if err != nil {
|
|
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorNotImplemented)
|
|
}
|
|
|
|
s.logLedgerOperation(discovery.OperationLedgerCredit, logger, resp, err, time.Since(start))
|
|
|
|
return resp, err
|
|
}
|
|
|
|
// PostExternalCreditWithCharges handles external credit posting (from outside the ledger).
|
|
func (s *Service) PostExternalCreditWithCharges(ctx context.Context, req *ledgerv1.PostCreditRequest) (*ledgerv1.PostResponse, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
recordJournalEntry(journalEntryTypeCredit, journalEntryStatusAttempted, time.Since(start).Seconds())
|
|
}()
|
|
|
|
logger := s.logger.With(zap.String("operation", discovery.OperationExternalCredit))
|
|
if req != nil {
|
|
logger = logger.With(
|
|
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
|
|
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
|
|
zap.String("ledger_account_ref", strings.TrimSpace(req.GetLedgerAccountRef())),
|
|
)
|
|
if money := req.GetMoney(); money != nil {
|
|
logger = logger.With(
|
|
zap.String("currency", money.GetCurrency()),
|
|
zap.String("amount", money.GetAmount()),
|
|
)
|
|
}
|
|
if role := req.GetRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED {
|
|
logger = logger.With(zap.String("role", role.String()))
|
|
}
|
|
}
|
|
s.logLedgerOperationStart(discovery.OperationExternalCredit, logger)
|
|
|
|
responder := s.postExternalCreditResponder(ctx, req)
|
|
resp, err := responder(ctx)
|
|
|
|
if err != nil {
|
|
recordJournalEntryError(journalEntryTypeCredit, journalEntryErrorFailed)
|
|
}
|
|
|
|
s.logLedgerOperation(discovery.OperationExternalCredit, logger, resp, err, time.Since(start))
|
|
|
|
return resp, err
|
|
}
|
|
|
|
// PostDebitWithCharges handles debit posting with fees in one atomic journal entry
|
|
func (s *Service) PostDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
recordJournalEntry(journalEntryTypeDebit, journalEntryStatusAttempted, time.Since(start).Seconds())
|
|
}()
|
|
|
|
logger := s.logger.With(zap.String("operation", discovery.OperationLedgerDebit))
|
|
if req != nil {
|
|
logger = logger.With(
|
|
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
|
|
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
|
|
zap.String("ledger_account_ref", strings.TrimSpace(req.GetLedgerAccountRef())),
|
|
)
|
|
if money := req.GetMoney(); money != nil {
|
|
logger = logger.With(
|
|
zap.String("currency", money.GetCurrency()),
|
|
zap.String("amount", money.GetAmount()),
|
|
)
|
|
}
|
|
if role := req.GetRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED {
|
|
logger = logger.With(zap.String("role", role.String()))
|
|
}
|
|
if contra := strings.TrimSpace(req.GetContraLedgerAccountRef()); contra != "" {
|
|
logger = logger.With(zap.String("contra_ledger_account_ref", contra))
|
|
}
|
|
}
|
|
s.logLedgerOperationStart(discovery.OperationLedgerDebit, logger)
|
|
|
|
responder := s.postDebitResponder(ctx, req)
|
|
resp, err := responder(ctx)
|
|
|
|
if err != nil {
|
|
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorFailed)
|
|
}
|
|
|
|
s.logLedgerOperation(discovery.OperationLedgerDebit, logger, resp, err, time.Since(start))
|
|
|
|
return resp, err
|
|
}
|
|
|
|
// PostExternalDebitWithCharges handles external debit posting (to outside the ledger).
|
|
func (s *Service) PostExternalDebitWithCharges(ctx context.Context, req *ledgerv1.PostDebitRequest) (*ledgerv1.PostResponse, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
recordJournalEntry(journalEntryTypeDebit, journalEntryStatusAttempted, time.Since(start).Seconds())
|
|
}()
|
|
|
|
logger := s.logger.With(zap.String("operation", discovery.OperationExternalDebit))
|
|
if req != nil {
|
|
logger = logger.With(
|
|
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
|
|
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
|
|
zap.String("ledger_account_ref", strings.TrimSpace(req.GetLedgerAccountRef())),
|
|
)
|
|
if money := req.GetMoney(); money != nil {
|
|
logger = logger.With(
|
|
zap.String("currency", money.GetCurrency()),
|
|
zap.String("amount", money.GetAmount()),
|
|
)
|
|
}
|
|
if role := req.GetRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED {
|
|
logger = logger.With(zap.String("role", role.String()))
|
|
}
|
|
}
|
|
s.logLedgerOperationStart(discovery.OperationExternalDebit, logger)
|
|
|
|
responder := s.postExternalDebitResponder(ctx, req)
|
|
resp, err := responder(ctx)
|
|
|
|
if err != nil {
|
|
recordJournalEntryError(journalEntryTypeDebit, journalEntryErrorFailed)
|
|
}
|
|
|
|
s.logLedgerOperation(discovery.OperationExternalDebit, logger, resp, err, time.Since(start))
|
|
|
|
return resp, err
|
|
}
|
|
|
|
// TransferInternal handles internal transfer between accounts
|
|
func (s *Service) TransferInternal(ctx context.Context, req *ledgerv1.TransferRequest) (*ledgerv1.PostResponse, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
recordJournalEntry(journalEntryTypeTransfer, journalEntryStatusAttempted, time.Since(start).Seconds())
|
|
}()
|
|
|
|
logger := s.logger.With(zap.String("operation", discovery.OperationLedgerTransfer))
|
|
if req != nil {
|
|
logger = logger.With(
|
|
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
|
|
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
|
|
zap.String("from_account_ref", strings.TrimSpace(req.GetFromLedgerAccountRef())),
|
|
zap.String("to_account_ref", strings.TrimSpace(req.GetToLedgerAccountRef())),
|
|
)
|
|
if money := req.GetMoney(); money != nil {
|
|
logger = logger.With(
|
|
zap.String("currency", money.GetCurrency()),
|
|
zap.String("amount", money.GetAmount()),
|
|
)
|
|
}
|
|
if role := req.GetFromRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED {
|
|
logger = logger.With(zap.String("from_role", role.String()))
|
|
}
|
|
if role := req.GetToRole(); role != ledgerv1.AccountRole_ACCOUNT_ROLE_UNSPECIFIED {
|
|
logger = logger.With(zap.String("to_role", role.String()))
|
|
}
|
|
}
|
|
s.logLedgerOperationStart(discovery.OperationLedgerTransfer, logger)
|
|
|
|
responder := s.transferResponder(ctx, req)
|
|
resp, err := responder(ctx)
|
|
|
|
if err != nil {
|
|
recordJournalEntryError(journalEntryTypeTransfer, journalEntryErrorFailed)
|
|
}
|
|
|
|
s.logLedgerOperation(discovery.OperationLedgerTransfer, logger, resp, err, time.Since(start))
|
|
|
|
return resp, err
|
|
}
|
|
|
|
// ApplyFXWithCharges handles foreign exchange transaction with charges
|
|
func (s *Service) ApplyFXWithCharges(ctx context.Context, req *ledgerv1.FXRequest) (*ledgerv1.PostResponse, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
recordJournalEntry(journalEntryTypeFX, journalEntryStatusAttempted, time.Since(start).Seconds())
|
|
}()
|
|
|
|
logger := s.logger.With(zap.String("operation", discovery.OperationLedgerFX))
|
|
if req != nil {
|
|
logger = logger.With(
|
|
zap.String("idempotency_key", strings.TrimSpace(req.GetIdempotencyKey())),
|
|
zap.String("organization_ref", strings.TrimSpace(req.GetOrganizationRef())),
|
|
zap.String("from_account_ref", strings.TrimSpace(req.GetFromLedgerAccountRef())),
|
|
zap.String("to_account_ref", strings.TrimSpace(req.GetToLedgerAccountRef())),
|
|
)
|
|
if money := req.GetFromMoney(); money != nil {
|
|
logger = logger.With(
|
|
zap.String("from_currency", money.GetCurrency()),
|
|
zap.String("from_amount", money.GetAmount()),
|
|
)
|
|
}
|
|
if money := req.GetToMoney(); money != nil {
|
|
logger = logger.With(
|
|
zap.String("to_currency", money.GetCurrency()),
|
|
zap.String("to_amount", money.GetAmount()),
|
|
)
|
|
}
|
|
if rate := strings.TrimSpace(req.GetRate()); rate != "" {
|
|
logger = logger.With(zap.String("rate", rate))
|
|
}
|
|
}
|
|
s.logLedgerOperationStart(discovery.OperationLedgerFX, logger)
|
|
|
|
responder := s.fxResponder(ctx, req)
|
|
resp, err := responder(ctx)
|
|
|
|
if err != nil {
|
|
recordJournalEntryError(journalEntryTypeFX, journalEntryErrorFailed)
|
|
}
|
|
|
|
s.logLedgerOperation(discovery.OperationLedgerFX, logger, resp, err, time.Since(start))
|
|
|
|
return resp, err
|
|
}
|
|
|
|
// GetBalance queries current account balance
|
|
func (s *Service) GetBalance(ctx context.Context, req *ledgerv1.GetBalanceRequest) (*ledgerv1.BalanceResponse, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
recordBalanceQuery("attempted", time.Since(start).Seconds())
|
|
}()
|
|
|
|
responder := s.getBalanceResponder(ctx, req)
|
|
resp, err := responder(ctx)
|
|
|
|
return resp, err
|
|
}
|
|
|
|
// GetJournalEntry gets journal entry details
|
|
func (s *Service) GetJournalEntry(ctx context.Context, req *ledgerv1.GetEntryRequest) (*ledgerv1.JournalEntryResponse, error) {
|
|
responder := s.getJournalEntryResponder(ctx, req)
|
|
return responder(ctx)
|
|
}
|
|
|
|
func (s *Service) logLedgerOperationStart(op string, logger mlogger.Logger) {
|
|
if logger == nil {
|
|
return
|
|
}
|
|
logger.Debug("Ledger operation execution started", zap.String("operation_name", op))
|
|
}
|
|
|
|
func (s *Service) logLedgerOperation(op string, logger mlogger.Logger, resp *ledgerv1.PostResponse, err error, duration time.Duration) {
|
|
if logger == nil {
|
|
return
|
|
}
|
|
entryRef := ""
|
|
if resp != nil {
|
|
entryRef = strings.TrimSpace(resp.GetJournalEntryRef())
|
|
}
|
|
status := "succeeded"
|
|
fields := []zap.Field{
|
|
zap.String("operation_name", op),
|
|
zap.String("status", status),
|
|
zap.Int64("duration_ms", duration.Milliseconds()),
|
|
}
|
|
if entryRef != "" {
|
|
fields = append(fields, zap.String("journal_entry_ref", entryRef))
|
|
}
|
|
if err != nil {
|
|
fields[1] = zap.String("status", "failed")
|
|
logger.Debug("Ledger operation execution completed", append(fields, zap.Error(err))...)
|
|
logger.Warn("Ledger operation failed", zap.String("operation_name", op), zap.Error(err))
|
|
return
|
|
}
|
|
logger.Debug("Ledger operation execution completed", fields...)
|
|
if entryRef == "" {
|
|
logger.Info("Ledger operation posted", zap.String("operation_name", op))
|
|
return
|
|
}
|
|
logger.Info("Ledger operation posted", zap.String("operation_name", op), zap.String("journal_entry_ref", entryRef))
|
|
}
|
|
|
|
func (s *Service) Shutdown() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
if s.announcer != nil {
|
|
s.announcer.Stop()
|
|
}
|
|
if s.outbox.cancel != nil {
|
|
s.outbox.cancel()
|
|
}
|
|
}
|
|
|
|
func (s *Service) startDiscoveryAnnouncer() {
|
|
if s == nil || s.producer == nil {
|
|
return
|
|
}
|
|
announce := discovery.Announcement{
|
|
Service: mservice.Ledger,
|
|
Rail: discovery.RailLedger,
|
|
Operations: discovery.LedgerServiceOperations(),
|
|
InvokeURI: s.invokeURI,
|
|
Version: appversion.Create().Short(),
|
|
}
|
|
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.Ledger), announce)
|
|
s.announcer.Start()
|
|
}
|
|
|
|
func (s *Service) startOutboxReliableProducer() error {
|
|
if s.storage == nil {
|
|
return nil
|
|
}
|
|
|
|
var initErr error
|
|
s.outbox.once.Do(func() {
|
|
outboxStore := s.storage.Outbox()
|
|
if outboxStore == nil {
|
|
return
|
|
}
|
|
reliableProducer, settings, err := newLedgerReliableProducer(s.logger, s.producer, outboxStore, s.msgCfg)
|
|
if err != nil {
|
|
initErr = err
|
|
return
|
|
}
|
|
s.outbox.producer = reliableProducer
|
|
if s.outbox.producer == nil || s.producer == nil {
|
|
s.logger.Info("Outbox reliable publisher disabled", zap.Bool("enabled", settings.Enabled))
|
|
return
|
|
}
|
|
s.logger.Info("Outbox reliable publisher configured",
|
|
zap.Bool("enabled", settings.Enabled),
|
|
zap.Int("batch_size", settings.BatchSize),
|
|
zap.Int("poll_interval_seconds", settings.PollIntervalSeconds),
|
|
zap.Int("max_attempts", settings.MaxAttempts))
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s.outbox.cancel = cancel
|
|
go s.outbox.producer.Run(ctx)
|
|
})
|
|
return initErr
|
|
}
|
|
|
|
// BlockAccount freezes a ledger account
|
|
func (s *Service) BlockAccount(ctx context.Context, req *ledgerv1.BlockAccountRequest) (*ledgerv1.BlockAccountResponse, error) {
|
|
responder := s.blockAccountResponder(ctx, req)
|
|
return responder(ctx)
|
|
}
|
|
|
|
// UnblockAccount activates a frozen ledger account
|
|
func (s *Service) UnblockAccount(ctx context.Context, req *ledgerv1.UnblockAccountRequest) (*ledgerv1.UnblockAccountResponse, error) {
|
|
responder := s.unblockAccountResponder(ctx, req)
|
|
return responder(ctx)
|
|
}
|
|
|
|
// GetStatement gets account statement with pagination
|
|
func (s *Service) GetStatement(ctx context.Context, req *ledgerv1.GetStatementRequest) (*ledgerv1.StatementResponse, error) {
|
|
responder := s.getStatementResponder(ctx, req)
|
|
return responder(ctx)
|
|
}
|