Files
sendico/api/ledger/internal/service/ledger/posting_support.go
Stephan D 2ee17b0c46
Some checks failed
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/fx/1 Pipeline failed
ci/woodpecker/push/fx/2 Pipeline failed
ci/woodpecker/push/nats Pipeline was successful
fx build fix
2025-11-07 23:50:48 +01:00

229 lines
7.1 KiB
Go

package ledger
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/shopspring/decimal"
"github.com/tech/sendico/ledger/storage"
"github.com/tech/sendico/ledger/storage/model"
"github.com/tech/sendico/pkg/merrors"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
)
type outboxLinePayload struct {
AccountRef string `json:"accountRef"`
Amount string `json:"amount"`
Currency string `json:"currency"`
LineType string `json:"lineType"`
}
type outboxJournalPayload struct {
JournalEntryRef string `json:"journalEntryRef"`
EntryType string `json:"entryType"`
OrganizationRef string `json:"organizationRef"`
Version int64 `json:"version"`
EventTime time.Time `json:"eventTime"`
Lines []outboxLinePayload `json:"lines"`
}
func validateAccountForOrg(account *model.Account, orgRef primitive.ObjectID, currency string) error {
if account == nil {
return merrors.InvalidArgument("account is required")
}
if account.OrganizationRef != orgRef {
return merrors.InvalidArgument("account does not belong to organization")
}
if account.Status != model.AccountStatusActive {
return merrors.InvalidArgument(fmt.Sprintf("account is %s", account.Status))
}
if currency != "" && account.Currency != currency {
return merrors.InvalidArgument(fmt.Sprintf("account currency mismatch: account=%s, expected=%s", account.Currency, currency))
}
return nil
}
func (s *Service) getAccount(ctx context.Context, cache map[primitive.ObjectID]*model.Account, accountRef primitive.ObjectID) (*model.Account, error) {
if accountRef.IsZero() {
return nil, merrors.InvalidArgument("account reference is required")
}
if account, ok := cache[accountRef]; ok {
return account, nil
}
account, err := s.storage.Accounts().Get(ctx, accountRef)
if err != nil {
return nil, err
}
cache[accountRef] = account
return account, nil
}
func (s *Service) resolveSettlementAccount(ctx context.Context, orgRef primitive.ObjectID, currency, override string, cache map[primitive.ObjectID]*model.Account) (*model.Account, error) {
if override != "" {
overrideRef, err := parseObjectID(override)
if err != nil {
return nil, err
}
account, err := s.getAccount(ctx, cache, overrideRef)
if err != nil {
if errors.Is(err, storage.ErrAccountNotFound) {
return nil, merrors.NoData("contra account not found")
}
s.logger.Warn("failed to load override contra account", zap.Error(err), zap.String("accountRef", overrideRef.Hex()))
return nil, merrors.Internal("failed to load contra account")
}
if err := validateAccountForOrg(account, orgRef, currency); err != nil {
return nil, merrors.InvalidArgument(fmt.Sprintf("contra account: %s", err.Error()))
}
return account, nil
}
account, err := s.storage.Accounts().GetDefaultSettlement(ctx, orgRef, currency)
if err != nil {
if errors.Is(err, storage.ErrAccountNotFound) {
return nil, merrors.InvalidArgument("no default settlement account configured for currency")
}
s.logger.Warn("failed to resolve default settlement account",
zap.Error(err),
zap.String("organizationRef", orgRef.Hex()),
zap.String("currency", currency))
return nil, merrors.Internal("failed to resolve settlement account")
}
accountID := account.GetID()
if accountID == nil {
return nil, merrors.Internal("settlement account missing identifier")
}
cache[*accountID] = account
if err := validateAccountForOrg(account, orgRef, currency); err != nil {
return nil, merrors.InvalidArgument(fmt.Sprintf("settlement account: %s", err.Error()))
}
return account, nil
}
func (s *Service) upsertBalances(ctx context.Context, lines []*model.PostingLine, accounts map[primitive.ObjectID]*model.Account) error {
if len(lines) == 0 {
return nil
}
balanceDeltas := make(map[primitive.ObjectID]decimal.Decimal, len(lines))
for _, line := range lines {
delta, err := parseDecimal(line.Amount)
if err != nil {
return err
}
if current, ok := balanceDeltas[line.AccountRef]; ok {
balanceDeltas[line.AccountRef] = current.Add(delta)
continue
}
balanceDeltas[line.AccountRef] = delta
}
balancesStore := s.storage.Balances()
now := time.Now().UTC()
for accountRef, delta := range balanceDeltas {
account := accounts[accountRef]
if account == nil {
s.logger.Warn("account cache missing for balance update", zap.String("accountRef", accountRef.Hex()))
return merrors.Internal("account cache missing for balance update")
}
currentBalance, err := balancesStore.Get(ctx, accountRef)
if err != nil && !errors.Is(err, storage.ErrBalanceNotFound) {
s.logger.Warn("failed to fetch account balance",
zap.Error(err),
zap.String("accountRef", accountRef.Hex()))
return merrors.Internal("failed to update balance")
}
newAmount := delta
version := int64(1)
if currentBalance != nil {
existing, err := parseDecimal(currentBalance.Balance)
if err != nil {
return err
}
newAmount = existing.Add(delta)
version = currentBalance.Version + 1
}
if !account.AllowNegative && newAmount.LessThan(decimal.Zero) {
return merrors.InvalidArgument(fmt.Sprintf("account %s does not allow negative balances", accountRef.Hex()))
}
newBalance := &model.AccountBalance{
AccountRef: accountRef,
Balance: newAmount.String(),
Currency: account.Currency,
Version: version,
LastUpdated: now,
}
newBalance.OrganizationRef = account.OrganizationRef
if err := balancesStore.Upsert(ctx, newBalance); err != nil {
s.logger.Warn("failed to upsert account balance", zap.Error(err), zap.String("accountRef", accountRef.Hex()))
return merrors.Internal("failed to update balance")
}
}
return nil
}
func (s *Service) enqueueOutbox(ctx context.Context, entry *model.JournalEntry, lines []*model.PostingLine) error {
if entry == nil {
return merrors.Internal("journal entry is required")
}
entryID := entry.GetID()
if entryID == nil {
return merrors.Internal("journal entry missing identifier")
}
payload := outboxJournalPayload{
JournalEntryRef: entryID.Hex(),
EntryType: string(entry.EntryType),
OrganizationRef: entry.OrganizationRef.Hex(),
Version: entry.Version,
EventTime: entry.EventTime,
Lines: make([]outboxLinePayload, 0, len(lines)),
}
for _, line := range lines {
payload.Lines = append(payload.Lines, outboxLinePayload{
AccountRef: line.AccountRef.Hex(),
Amount: line.Amount,
Currency: line.Currency,
LineType: string(line.LineType),
})
}
body, err := json.Marshal(payload)
if err != nil {
s.logger.Warn("failed to marshal ledger outbox payload", zap.Error(err))
return merrors.Internal("failed to marshal ledger event")
}
event := &model.OutboxEvent{
EventID: entryID.Hex(),
Subject: ledgerOutboxSubject,
Payload: body,
Status: model.OutboxStatusPending,
Attempts: 0,
}
event.OrganizationRef = entry.OrganizationRef
if err := s.storage.Outbox().Create(ctx, event); err != nil {
s.logger.Warn("failed to enqueue ledger outbox event", zap.Error(err))
return merrors.Internal("failed to enqueue ledger event")
}
return nil
}