229 lines
7.1 KiB
Go
229 lines
7.1 KiB
Go
package ledger
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/shopspring/decimal"
|
|
"github.com/tech/sendico/ledger/storage"
|
|
"github.com/tech/sendico/ledger/storage/model"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type outboxLinePayload struct {
|
|
AccountRef string `json:"accountRef"`
|
|
Amount string `json:"amount"`
|
|
Currency string `json:"currency"`
|
|
LineType string `json:"lineType"`
|
|
}
|
|
|
|
type outboxJournalPayload struct {
|
|
JournalEntryRef string `json:"journalEntryRef"`
|
|
EntryType string `json:"entryType"`
|
|
OrganizationRef string `json:"organizationRef"`
|
|
Version int64 `json:"version"`
|
|
EventTime time.Time `json:"eventTime"`
|
|
Lines []outboxLinePayload `json:"lines"`
|
|
}
|
|
|
|
func validateAccountForOrg(account *model.Account, orgRef primitive.ObjectID, currency string) error {
|
|
if account == nil {
|
|
return merrors.InvalidArgument("account is required")
|
|
}
|
|
if account.OrganizationRef != orgRef {
|
|
return merrors.InvalidArgument("account does not belong to organization")
|
|
}
|
|
if account.Status != model.AccountStatusActive {
|
|
return merrors.InvalidArgument(fmt.Sprintf("account is %s", account.Status))
|
|
}
|
|
if currency != "" && account.Currency != currency {
|
|
return merrors.InvalidArgument(fmt.Sprintf("account currency mismatch: account=%s, expected=%s", account.Currency, currency))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) getAccount(ctx context.Context, cache map[primitive.ObjectID]*model.Account, accountRef primitive.ObjectID) (*model.Account, error) {
|
|
if accountRef.IsZero() {
|
|
return nil, merrors.InvalidArgument("account reference is required")
|
|
}
|
|
if account, ok := cache[accountRef]; ok {
|
|
return account, nil
|
|
}
|
|
|
|
account, err := s.storage.Accounts().Get(ctx, accountRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cache[accountRef] = account
|
|
return account, nil
|
|
}
|
|
|
|
func (s *Service) resolveSettlementAccount(ctx context.Context, orgRef primitive.ObjectID, currency, override string, cache map[primitive.ObjectID]*model.Account) (*model.Account, error) {
|
|
if override != "" {
|
|
overrideRef, err := parseObjectID(override)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
account, err := s.getAccount(ctx, cache, overrideRef)
|
|
if err != nil {
|
|
if errors.Is(err, storage.ErrAccountNotFound) {
|
|
return nil, merrors.NoData("contra account not found")
|
|
}
|
|
s.logger.Warn("failed to load override contra account", zap.Error(err), zap.String("accountRef", overrideRef.Hex()))
|
|
return nil, merrors.Internal("failed to load contra account")
|
|
}
|
|
if err := validateAccountForOrg(account, orgRef, currency); err != nil {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("contra account: %s", err.Error()))
|
|
}
|
|
return account, nil
|
|
}
|
|
|
|
account, err := s.storage.Accounts().GetDefaultSettlement(ctx, orgRef, currency)
|
|
if err != nil {
|
|
if errors.Is(err, storage.ErrAccountNotFound) {
|
|
return nil, merrors.InvalidArgument("no default settlement account configured for currency")
|
|
}
|
|
s.logger.Warn("failed to resolve default settlement account",
|
|
zap.Error(err),
|
|
zap.String("organizationRef", orgRef.Hex()),
|
|
zap.String("currency", currency))
|
|
return nil, merrors.Internal("failed to resolve settlement account")
|
|
}
|
|
|
|
accountID := account.GetID()
|
|
if accountID == nil {
|
|
return nil, merrors.Internal("settlement account missing identifier")
|
|
}
|
|
cache[*accountID] = account
|
|
|
|
if err := validateAccountForOrg(account, orgRef, currency); err != nil {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("settlement account: %s", err.Error()))
|
|
}
|
|
|
|
return account, nil
|
|
}
|
|
|
|
func (s *Service) upsertBalances(ctx context.Context, lines []*model.PostingLine, accounts map[primitive.ObjectID]*model.Account) error {
|
|
if len(lines) == 0 {
|
|
return nil
|
|
}
|
|
|
|
balanceDeltas := make(map[primitive.ObjectID]decimal.Decimal, len(lines))
|
|
for _, line := range lines {
|
|
delta, err := parseDecimal(line.Amount)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if current, ok := balanceDeltas[line.AccountRef]; ok {
|
|
balanceDeltas[line.AccountRef] = current.Add(delta)
|
|
continue
|
|
}
|
|
balanceDeltas[line.AccountRef] = delta
|
|
}
|
|
|
|
balancesStore := s.storage.Balances()
|
|
now := time.Now().UTC()
|
|
|
|
for accountRef, delta := range balanceDeltas {
|
|
account := accounts[accountRef]
|
|
if account == nil {
|
|
s.logger.Warn("account cache missing for balance update", zap.String("accountRef", accountRef.Hex()))
|
|
return merrors.Internal("account cache missing for balance update")
|
|
}
|
|
|
|
currentBalance, err := balancesStore.Get(ctx, accountRef)
|
|
if err != nil && !errors.Is(err, storage.ErrBalanceNotFound) {
|
|
s.logger.Warn("failed to fetch account balance",
|
|
zap.Error(err),
|
|
zap.String("accountRef", accountRef.Hex()))
|
|
return merrors.Internal("failed to update balance")
|
|
}
|
|
|
|
newAmount := delta
|
|
version := int64(1)
|
|
if currentBalance != nil {
|
|
existing, err := parseDecimal(currentBalance.Balance)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newAmount = existing.Add(delta)
|
|
version = currentBalance.Version + 1
|
|
}
|
|
|
|
if !account.AllowNegative && newAmount.LessThan(decimal.Zero) {
|
|
return merrors.InvalidArgument(fmt.Sprintf("account %s does not allow negative balances", accountRef.Hex()))
|
|
}
|
|
|
|
newBalance := &model.AccountBalance{
|
|
AccountRef: accountRef,
|
|
Balance: newAmount.String(),
|
|
Currency: account.Currency,
|
|
Version: version,
|
|
LastUpdated: now,
|
|
}
|
|
newBalance.OrganizationRef = account.OrganizationRef
|
|
|
|
if err := balancesStore.Upsert(ctx, newBalance); err != nil {
|
|
s.logger.Warn("failed to upsert account balance", zap.Error(err), zap.String("accountRef", accountRef.Hex()))
|
|
return merrors.Internal("failed to update balance")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) enqueueOutbox(ctx context.Context, entry *model.JournalEntry, lines []*model.PostingLine) error {
|
|
if entry == nil {
|
|
return merrors.Internal("journal entry is required")
|
|
}
|
|
entryID := entry.GetID()
|
|
if entryID == nil {
|
|
return merrors.Internal("journal entry missing identifier")
|
|
}
|
|
|
|
payload := outboxJournalPayload{
|
|
JournalEntryRef: entryID.Hex(),
|
|
EntryType: string(entry.EntryType),
|
|
OrganizationRef: entry.OrganizationRef.Hex(),
|
|
Version: entry.Version,
|
|
EventTime: entry.EventTime,
|
|
Lines: make([]outboxLinePayload, 0, len(lines)),
|
|
}
|
|
|
|
for _, line := range lines {
|
|
payload.Lines = append(payload.Lines, outboxLinePayload{
|
|
AccountRef: line.AccountRef.Hex(),
|
|
Amount: line.Amount,
|
|
Currency: line.Currency,
|
|
LineType: string(line.LineType),
|
|
})
|
|
}
|
|
|
|
body, err := json.Marshal(payload)
|
|
if err != nil {
|
|
s.logger.Warn("failed to marshal ledger outbox payload", zap.Error(err))
|
|
return merrors.Internal("failed to marshal ledger event")
|
|
}
|
|
|
|
event := &model.OutboxEvent{
|
|
EventID: entryID.Hex(),
|
|
Subject: ledgerOutboxSubject,
|
|
Payload: body,
|
|
Status: model.OutboxStatusPending,
|
|
Attempts: 0,
|
|
}
|
|
event.OrganizationRef = entry.OrganizationRef
|
|
|
|
if err := s.storage.Outbox().Create(ctx, event); err != nil {
|
|
s.logger.Warn("failed to enqueue ledger outbox event", zap.Error(err))
|
|
return merrors.Internal("failed to enqueue ledger event")
|
|
}
|
|
|
|
return nil
|
|
}
|