297 lines
9.9 KiB
Go
297 lines
9.9 KiB
Go
package ledger
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/shopspring/decimal"
|
|
"github.com/tech/sendico/ledger/storage"
|
|
"github.com/tech/sendico/ledger/storage/model"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
pmodel "github.com/tech/sendico/pkg/model"
|
|
"github.com/tech/sendico/pkg/mutil/mzap"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type outboxLinePayload struct {
|
|
AccountRef string `json:"accountRef"`
|
|
Amount string `json:"amount"`
|
|
Currency string `json:"currency"`
|
|
LineType string `json:"lineType"`
|
|
}
|
|
|
|
type outboxJournalPayload struct {
|
|
JournalEntryRef string `json:"journalEntryRef"`
|
|
EntryType string `json:"entryType"`
|
|
OrganizationRef string `json:"organizationRef"`
|
|
Version int64 `json:"version"`
|
|
EventTime time.Time `json:"eventTime"`
|
|
Lines []outboxLinePayload `json:"lines"`
|
|
}
|
|
|
|
func validateAccountRole(account *pmodel.LedgerAccount, expected pmodel.AccountRole, label string) error {
|
|
if expected == "" {
|
|
return nil
|
|
}
|
|
if account.Role != expected {
|
|
return merrors.InvalidArgument(fmt.Sprintf("%s: expected role %s, got %s", label, expected, account.Role))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// resolveAccount returns an account either by explicit ref or by role lookup.
|
|
// If accountRefStr is non-empty, it fetches by ID and optionally asserts the role.
|
|
// If accountRefStr is empty and role is set, it resolves via GetByRole(orgRef, currency, role).
|
|
// Returns the account and its ObjectID, or an error.
|
|
func (s *Service) resolveAccount(ctx context.Context, accountRefStr string, role pmodel.AccountRole, orgRef primitive.ObjectID, currency, label string) (*pmodel.LedgerAccount, primitive.ObjectID, error) {
|
|
if accountRefStr != "" {
|
|
ref, err := parseObjectID(accountRefStr)
|
|
if err != nil {
|
|
return nil, primitive.NilObjectID, err
|
|
}
|
|
account, err := s.storage.Accounts().Get(ctx, ref)
|
|
if err != nil {
|
|
if errors.Is(err, storage.ErrAccountNotFound) {
|
|
return nil, primitive.NilObjectID, merrors.NoData(label + " not found")
|
|
}
|
|
return nil, primitive.NilObjectID, merrors.Internal("failed to get " + label)
|
|
}
|
|
// If role is also specified, assert it matches
|
|
if role != "" {
|
|
if err := validateAccountRole(account, role, label); err != nil {
|
|
return nil, primitive.NilObjectID, err
|
|
}
|
|
}
|
|
return account, ref, nil
|
|
}
|
|
|
|
// No ref provided — resolve by role
|
|
if role == "" {
|
|
return nil, primitive.NilObjectID, merrors.InvalidArgument(label + ": ledger_account_ref or role is required")
|
|
}
|
|
if orgRef.IsZero() {
|
|
return nil, primitive.NilObjectID, merrors.InvalidArgument(label + ": organization_ref is required for role resolution")
|
|
}
|
|
if currency == "" {
|
|
return nil, primitive.NilObjectID, merrors.InvalidArgument(label + ": currency is required for role resolution")
|
|
}
|
|
account, err := s.storage.Accounts().GetByRole(ctx, orgRef, currency, role)
|
|
if err != nil {
|
|
if errors.Is(err, storage.ErrAccountNotFound) {
|
|
return nil, primitive.NilObjectID, merrors.NoData(fmt.Sprintf("%s: no account found with role %s", label, role))
|
|
}
|
|
return nil, primitive.NilObjectID, merrors.Internal("failed to resolve " + label + " by role")
|
|
}
|
|
return account, *account.GetID(), nil
|
|
}
|
|
|
|
func validateAccountForOrg(account *pmodel.LedgerAccount, orgRef primitive.ObjectID, currency string) error {
|
|
if account == nil {
|
|
return merrors.InvalidArgument("account is required")
|
|
}
|
|
if account.OrganizationRef == nil || account.OrganizationRef.IsZero() {
|
|
return merrors.InvalidArgument("account organization reference is required")
|
|
}
|
|
if account.Scope != "" && account.Scope != pmodel.LedgerAccountScopeOrganization {
|
|
return merrors.InvalidArgument("account scope mismatch: expected organization")
|
|
}
|
|
if *account.OrganizationRef != orgRef {
|
|
return merrors.InvalidArgument("account does not belong to organization")
|
|
}
|
|
if account.Status != pmodel.LedgerAccountStatusActive {
|
|
return merrors.InvalidArgument(fmt.Sprintf("account is %s", account.Status))
|
|
}
|
|
if currency != "" && account.Currency != currency {
|
|
return merrors.InvalidArgument(fmt.Sprintf("account currency mismatch: account=%s, expected=%s", account.Currency, currency))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) getAccount(ctx context.Context, cache map[primitive.ObjectID]*pmodel.LedgerAccount, accountRef primitive.ObjectID) (*pmodel.LedgerAccount, error) {
|
|
if accountRef.IsZero() {
|
|
return nil, merrors.InvalidArgument("account reference is required")
|
|
}
|
|
if account, ok := cache[accountRef]; ok {
|
|
return account, nil
|
|
}
|
|
|
|
account, err := s.storage.Accounts().Get(ctx, accountRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cache[accountRef] = account
|
|
return account, nil
|
|
}
|
|
|
|
func (s *Service) resolveSettlementAccount(ctx context.Context, orgRef primitive.ObjectID, currency, override string, cache map[primitive.ObjectID]*pmodel.LedgerAccount) (*pmodel.LedgerAccount, error) {
|
|
if override != "" {
|
|
overrideRef, err := parseObjectID(override)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
account, err := s.getAccount(ctx, cache, overrideRef)
|
|
if err != nil {
|
|
if errors.Is(err, storage.ErrAccountNotFound) {
|
|
return nil, merrors.NoData("contra account not found")
|
|
}
|
|
s.logger.Warn("failed to load override contra account", zap.Error(err), zap.String("accountRef", overrideRef.Hex()))
|
|
return nil, merrors.Internal("failed to load contra account")
|
|
}
|
|
if err := validateAccountForOrg(account, orgRef, currency); err != nil {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("contra account: %s", err.Error()))
|
|
}
|
|
return account, nil
|
|
}
|
|
|
|
account, err := s.storage.Accounts().GetDefaultSettlement(ctx, orgRef, currency)
|
|
if err != nil {
|
|
if errors.Is(err, storage.ErrAccountNotFound) {
|
|
return nil, merrors.InvalidArgument("no default settlement account configured for currency")
|
|
}
|
|
s.logger.Warn("failed to resolve default settlement account",
|
|
zap.Error(err),
|
|
mzap.ObjRef("organization_ref", orgRef),
|
|
zap.String("currency", currency))
|
|
return nil, merrors.Internal("failed to resolve settlement account")
|
|
}
|
|
|
|
accountID := account.GetID()
|
|
if accountID == nil {
|
|
return nil, merrors.Internal("settlement account missing identifier")
|
|
}
|
|
cache[*accountID] = account
|
|
|
|
if err := validateAccountForOrg(account, orgRef, currency); err != nil {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("settlement account: %s", err.Error()))
|
|
}
|
|
|
|
return account, nil
|
|
}
|
|
|
|
func (s *Service) upsertBalances(ctx context.Context, lines []*model.PostingLine, accounts map[primitive.ObjectID]*pmodel.LedgerAccount) error {
|
|
if len(lines) == 0 {
|
|
return nil
|
|
}
|
|
|
|
balanceDeltas := make(map[primitive.ObjectID]decimal.Decimal, len(lines))
|
|
for _, line := range lines {
|
|
delta, err := parseDecimal(line.Amount)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if current, ok := balanceDeltas[line.AccountRef]; ok {
|
|
balanceDeltas[line.AccountRef] = current.Add(delta)
|
|
continue
|
|
}
|
|
balanceDeltas[line.AccountRef] = delta
|
|
}
|
|
|
|
balancesStore := s.storage.Balances()
|
|
now := time.Now().UTC()
|
|
|
|
for accountRef, delta := range balanceDeltas {
|
|
account := accounts[accountRef]
|
|
if account == nil {
|
|
s.logger.Warn("account cache missing for balance update", mzap.ObjRef("account_ref", accountRef))
|
|
return merrors.Internal("account cache missing for balance update")
|
|
}
|
|
|
|
currentBalance, err := balancesStore.Get(ctx, accountRef)
|
|
if err != nil && !errors.Is(err, storage.ErrBalanceNotFound) {
|
|
s.logger.Warn("failed to fetch account balance",
|
|
zap.Error(err),
|
|
mzap.ObjRef("account_ref", accountRef))
|
|
return merrors.Internal("failed to update balance")
|
|
}
|
|
|
|
newAmount := delta
|
|
version := int64(1)
|
|
if currentBalance != nil {
|
|
existing, err := parseDecimal(currentBalance.Balance)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newAmount = existing.Add(delta)
|
|
version = currentBalance.Version + 1
|
|
}
|
|
|
|
if !account.AllowNegative && newAmount.LessThan(decimal.Zero) {
|
|
return merrors.InvalidArgument(fmt.Sprintf("account %s does not allow negative balances", accountRef.Hex()))
|
|
}
|
|
|
|
newBalance := &model.AccountBalance{
|
|
AccountRef: accountRef,
|
|
Balance: newAmount.String(),
|
|
Currency: account.Currency,
|
|
Version: version,
|
|
LastUpdated: now,
|
|
}
|
|
if account.OrganizationRef != nil {
|
|
newBalance.OrganizationRef = *account.OrganizationRef
|
|
} else {
|
|
newBalance.OrganizationRef = primitive.NilObjectID
|
|
}
|
|
|
|
if err := balancesStore.Upsert(ctx, newBalance); err != nil {
|
|
s.logger.Warn("failed to upsert account balance", zap.Error(err), mzap.ObjRef("account_ref", accountRef))
|
|
return merrors.Internal("failed to update balance")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) enqueueOutbox(ctx context.Context, entry *model.JournalEntry, lines []*model.PostingLine) error {
|
|
if entry == nil {
|
|
return merrors.Internal("journal entry is required")
|
|
}
|
|
entryID := entry.GetID()
|
|
if entryID == nil {
|
|
return merrors.Internal("journal entry missing identifier")
|
|
}
|
|
|
|
payload := outboxJournalPayload{
|
|
JournalEntryRef: entryID.Hex(),
|
|
EntryType: string(entry.EntryType),
|
|
OrganizationRef: entry.OrganizationRef.Hex(),
|
|
Version: entry.Version,
|
|
EventTime: entry.EventTime,
|
|
Lines: make([]outboxLinePayload, 0, len(lines)),
|
|
}
|
|
|
|
for _, line := range lines {
|
|
payload.Lines = append(payload.Lines, outboxLinePayload{
|
|
AccountRef: line.AccountRef.Hex(),
|
|
Amount: line.Amount,
|
|
Currency: line.Currency,
|
|
LineType: string(line.LineType),
|
|
})
|
|
}
|
|
|
|
body, err := json.Marshal(payload)
|
|
if err != nil {
|
|
s.logger.Warn("failed to marshal ledger outbox payload", zap.Error(err))
|
|
return merrors.Internal("failed to marshal ledger event")
|
|
}
|
|
|
|
event := &model.OutboxEvent{
|
|
EventID: entryID.Hex(),
|
|
Subject: ledgerOutboxSubject,
|
|
Payload: body,
|
|
Status: model.OutboxStatusPending,
|
|
Attempts: 0,
|
|
}
|
|
event.OrganizationRef = entry.OrganizationRef
|
|
|
|
if err := s.storage.Outbox().Create(ctx, event); err != nil {
|
|
s.logger.Warn("failed to enqueue ledger outbox event", zap.Error(err))
|
|
return merrors.Internal("failed to enqueue ledger event")
|
|
}
|
|
|
|
return nil
|
|
}
|