package ledger import ( "context" "encoding/json" "errors" "fmt" "time" "github.com/shopspring/decimal" "github.com/tech/sendico/ledger/storage" "github.com/tech/sendico/ledger/storage/model" "github.com/tech/sendico/pkg/merrors" pmodel "github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/model/account_role" "github.com/tech/sendico/pkg/mutil/mzap" "go.mongodb.org/mongo-driver/v2/bson" "go.uber.org/zap" ) type outboxLinePayload struct { AccountRef string `json:"accountRef"` Amount string `json:"amount"` Currency string `json:"currency"` LineType string `json:"lineType"` } type outboxJournalPayload struct { JournalEntryRef string `json:"journalEntryRef"` EntryType string `json:"entryType"` OrganizationRef string `json:"organizationRef"` Version int64 `json:"version"` EventTime time.Time `json:"eventTime"` Lines []outboxLinePayload `json:"lines"` } func validateAccountRole(account *pmodel.LedgerAccount, expected account_role.AccountRole, label string) error { if expected == "" { return nil } if account.Role != expected { return merrors.InvalidArgument(fmt.Sprintf("%s: expected role %s, got %s", label, expected, account.Role)) } return nil } // resolveAccount returns an account either by explicit ref or by role lookup. // If accountRefStr is non-empty, it fetches by ID and optionally asserts the role. // If accountRefStr is empty and role is set, it resolves via GetByRole(orgRef, currency, role). // Returns the account and its ObjectID, or an error. func (s *Service) resolveAccount(ctx context.Context, accountRefStr string, role account_role.AccountRole, orgRef bson.ObjectID, currency, label string) (*pmodel.LedgerAccount, bson.ObjectID, error) { if accountRefStr != "" { ref, err := parseObjectID(accountRefStr) if err != nil { return nil, bson.NilObjectID, err } account, err := s.storage.Accounts().Get(ctx, ref) if err != nil { if errors.Is(err, storage.ErrAccountNotFound) { return nil, bson.NilObjectID, merrors.NoData(label + " not found") } return nil, bson.NilObjectID, merrors.Internal("failed to get " + label) } // If role is also specified, assert it matches if role != "" { if err := validateAccountRole(account, role, label); err != nil { return nil, bson.NilObjectID, err } } return account, ref, nil } // No ref provided — resolve by role if role == "" { return nil, bson.NilObjectID, merrors.InvalidArgument(label + ": ledger_account_ref or role is required") } if orgRef.IsZero() { return nil, bson.NilObjectID, merrors.InvalidArgument(label + ": organization_ref is required for role resolution") } if currency == "" { return nil, bson.NilObjectID, merrors.InvalidArgument(label + ": currency is required for role resolution") } account, err := s.storage.Accounts().GetByRole(ctx, orgRef, currency, role) if err != nil { if errors.Is(err, storage.ErrAccountNotFound) { return nil, bson.NilObjectID, merrors.NoData(fmt.Sprintf("%s: no account found with role %s", label, role)) } return nil, bson.NilObjectID, merrors.Internal("failed to resolve " + label + " by role") } return account, *account.GetID(), nil } func validateAccountForOrg(account *pmodel.LedgerAccount, orgRef bson.ObjectID, currency string) error { if account == nil { return merrors.InvalidArgument("account is required") } if account.OrganizationRef == nil || account.OrganizationRef.IsZero() { return merrors.InvalidArgument("account organization reference is required") } if account.Scope != "" && account.Scope != pmodel.LedgerAccountScopeOrganization { return merrors.InvalidArgument("account scope mismatch: expected organization") } if *account.OrganizationRef != orgRef { return merrors.InvalidArgument("account does not belong to organization") } if account.Status != pmodel.LedgerAccountStatusActive { return merrors.InvalidArgument(fmt.Sprintf("account is %s", account.Status)) } if currency != "" && account.Currency != currency { return merrors.InvalidArgument(fmt.Sprintf("account currency mismatch: account=%s, expected=%s", account.Currency, currency)) } return nil } func (s *Service) getAccount(ctx context.Context, cache map[bson.ObjectID]*pmodel.LedgerAccount, accountRef bson.ObjectID) (*pmodel.LedgerAccount, error) { if accountRef.IsZero() { return nil, merrors.InvalidArgument("account reference is required") } if account, ok := cache[accountRef]; ok { return account, nil } account, err := s.storage.Accounts().Get(ctx, accountRef) if err != nil { return nil, err } cache[accountRef] = account return account, nil } func (s *Service) resolveSettlementAccount(ctx context.Context, orgRef bson.ObjectID, currency, override string, cache map[bson.ObjectID]*pmodel.LedgerAccount) (*pmodel.LedgerAccount, error) { if override != "" { overrideRef, err := parseObjectID(override) if err != nil { return nil, err } account, err := s.getAccount(ctx, cache, overrideRef) if err != nil { if errors.Is(err, storage.ErrAccountNotFound) { return nil, merrors.NoData("contra account not found") } s.logger.Warn("Failed to load override contra account", zap.Error(err), mzap.ObjRef("account_ref", overrideRef)) return nil, merrors.Internal("failed to load contra account") } if err := validateAccountForOrg(account, orgRef, currency); err != nil { return nil, merrors.InvalidArgument(fmt.Sprintf("contra account: %s", err.Error())) } return account, nil } account, err := s.storage.Accounts().GetDefaultSettlement(ctx, orgRef, currency) if err != nil { if errors.Is(err, storage.ErrAccountNotFound) { return nil, merrors.InvalidArgument("no default settlement account configured for currency") } s.logger.Warn("Failed to resolve default settlement account", zap.Error(err), mzap.ObjRef("organization_ref", orgRef), zap.String("currency", currency)) return nil, merrors.Internal("failed to resolve settlement account") } accountID := account.GetID() if accountID == nil { return nil, merrors.Internal("settlement account missing identifier") } cache[*accountID] = account if err := validateAccountForOrg(account, orgRef, currency); err != nil { return nil, merrors.InvalidArgument(fmt.Sprintf("settlement account: %s", err.Error())) } return account, nil } func (s *Service) upsertBalances(ctx context.Context, lines []*model.PostingLine, accounts map[bson.ObjectID]*pmodel.LedgerAccount) error { if len(lines) == 0 { return nil } balanceDeltas := make(map[bson.ObjectID]decimal.Decimal, len(lines)) for _, line := range lines { delta, err := parseDecimal(line.Amount) if err != nil { return err } if current, ok := balanceDeltas[line.AccountRef]; ok { balanceDeltas[line.AccountRef] = current.Add(delta) continue } balanceDeltas[line.AccountRef] = delta } balancesStore := s.storage.Balances() now := time.Now().UTC() for accountRef, delta := range balanceDeltas { account := accounts[accountRef] if account == nil { s.logger.Warn("Account cache missing for balance update", mzap.AccRef(accountRef)) return merrors.Internal("account cache missing for balance update") } currentBalance, err := balancesStore.Get(ctx, accountRef) if err != nil && !errors.Is(err, storage.ErrBalanceNotFound) { s.logger.Warn("Failed to fetch account balance", zap.Error(err), mzap.AccRef(accountRef)) return merrors.Internal("failed to update balance") } newAmount := delta version := int64(1) if currentBalance != nil { existing, err := parseDecimal(currentBalance.Balance) if err != nil { return err } newAmount = existing.Add(delta) version = currentBalance.Version + 1 } if !account.AllowNegative && newAmount.LessThan(decimal.Zero) { return merrors.InvalidArgument(fmt.Sprintf("account %s does not allow negative balances", accountRef.Hex())) } newBalance := &model.AccountBalance{ AccountRef: accountRef, Balance: newAmount.String(), Currency: account.Currency, Version: version, LastUpdated: now, } if account.OrganizationRef != nil { newBalance.OrganizationRef = *account.OrganizationRef } else { newBalance.OrganizationRef = bson.NilObjectID } if err := balancesStore.Upsert(ctx, newBalance); err != nil { s.logger.Warn("Failed to upsert account balance", zap.Error(err), mzap.AccRef(accountRef)) return merrors.Internal("failed to update balance") } } return nil } func (s *Service) enqueueOutbox(ctx context.Context, entry *model.JournalEntry, lines []*model.PostingLine) error { if entry == nil { return merrors.Internal("journal entry is required") } entryID := entry.GetID() if entryID == nil { return merrors.Internal("journal entry missing identifier") } payload := outboxJournalPayload{ JournalEntryRef: entryID.Hex(), EntryType: string(entry.EntryType), OrganizationRef: entry.OrganizationRef.Hex(), Version: entry.Version, EventTime: entry.EventTime, Lines: make([]outboxLinePayload, 0, len(lines)), } for _, line := range lines { payload.Lines = append(payload.Lines, outboxLinePayload{ AccountRef: line.AccountRef.Hex(), Amount: line.Amount, Currency: line.Currency, LineType: string(line.LineType), }) } body, err := json.Marshal(payload) if err != nil { s.logger.Warn("Failed to marshal ledger outbox payload", zap.Error(err)) return merrors.Internal("failed to marshal ledger event") } envelope, err := buildLedgerOutboxEnvelope(entryID.Hex(), body, 0, entry.OrganizationRef.Hex(), time.Now().UTC()) if err != nil { s.logger.Warn("Failed to build ledger outbox envelope", zap.Error(err)) return merrors.Internal("failed to prepare ledger event envelope") } if err := s.startOutboxReliableProducer(); err != nil { s.logger.Warn("Failed to initialise outbox reliable producer", zap.Error(err)) return merrors.Internal("failed to initialize reliable outbox") } if s.outbox.producer == nil { s.logger.Warn("Failed to enqueue ledger outbox event: reliable producer not configured") return merrors.Internal("failed to enqueue ledger event") } if err := s.outbox.producer.SendWithOutbox(ctx, envelope); err != nil { s.logger.Warn("Failed to enqueue ledger outbox event", zap.Error(err)) return merrors.Internal("failed to enqueue ledger event") } return nil }