Files
sendico/api/ledger/internal/service/ledger/posting_transfer.go
Stephan D 2ee17b0c46
Some checks failed
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/fx/1 Pipeline failed
ci/woodpecker/push/fx/2 Pipeline failed
ci/woodpecker/push/nats Pipeline was successful
fx build fix
2025-11-07 23:50:48 +01:00

239 lines
8.0 KiB
Go

package ledger
import (
"context"
"fmt"
"time"
"github.com/tech/sendico/ledger/storage"
"github.com/tech/sendico/ledger/storage/model"
storageMongo "github.com/tech/sendico/ledger/storage/mongo"
"github.com/tech/sendico/pkg/api/routers/gsresponse"
"github.com/tech/sendico/pkg/merrors"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
)
// transferResponder implements internal transfer between accounts
func (s *Service) transferResponder(_ context.Context, req *ledgerv1.TransferRequest) gsresponse.Responder[ledgerv1.PostResponse] {
return func(ctx context.Context) (*ledgerv1.PostResponse, error) {
// Validate request
if req.IdempotencyKey == "" {
return nil, merrors.InvalidArgument("idempotency_key is required")
}
if req.OrganizationRef == "" {
return nil, merrors.InvalidArgument("organization_ref is required")
}
if req.FromLedgerAccountRef == "" {
return nil, merrors.InvalidArgument("from_ledger_account_ref is required")
}
if req.ToLedgerAccountRef == "" {
return nil, merrors.InvalidArgument("to_ledger_account_ref is required")
}
if req.FromLedgerAccountRef == req.ToLedgerAccountRef {
return nil, merrors.InvalidArgument("cannot transfer to same account")
}
if err := validateMoney(req.Money, "money"); err != nil {
return nil, err
}
if err := validatePostingLines(req.Charges); err != nil {
return nil, err
}
orgRef, err := parseObjectID(req.OrganizationRef)
if err != nil {
return nil, err
}
fromAccountRef, err := parseObjectID(req.FromLedgerAccountRef)
if err != nil {
return nil, err
}
toAccountRef, err := parseObjectID(req.ToLedgerAccountRef)
if err != nil {
return nil, err
}
// Check for duplicate idempotency key
existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey)
if err == nil && existingEntry != nil {
recordDuplicateRequest("transfer")
s.logger.Info("duplicate transfer request (idempotency)",
zap.String("idempotencyKey", req.IdempotencyKey),
zap.String("existingEntryID", existingEntry.GetID().Hex()))
return &ledgerv1.PostResponse{
JournalEntryRef: existingEntry.GetID().Hex(),
Version: existingEntry.Version,
EntryType: ledgerv1.EntryType_ENTRY_TRANSFER,
}, nil
}
if err != nil && err != storage.ErrJournalEntryNotFound {
s.logger.Warn("failed to check idempotency", zap.Error(err))
return nil, merrors.Internal("failed to check idempotency")
}
// Verify both accounts exist and are active
fromAccount, err := s.storage.Accounts().Get(ctx, fromAccountRef)
if err != nil {
if err == storage.ErrAccountNotFound {
return nil, merrors.NoData("from_account not found")
}
s.logger.Warn("failed to get from_account", zap.Error(err))
return nil, merrors.Internal("failed to get from_account")
}
if err := validateAccountForOrg(fromAccount, orgRef, req.Money.Currency); err != nil {
return nil, merrors.InvalidArgument(fmt.Sprintf("from_account: %s", err.Error()))
}
toAccount, err := s.storage.Accounts().Get(ctx, toAccountRef)
if err != nil {
if err == storage.ErrAccountNotFound {
return nil, merrors.NoData("to_account not found")
}
s.logger.Warn("failed to get to_account", zap.Error(err))
return nil, merrors.Internal("failed to get to_account")
}
if err := validateAccountForOrg(toAccount, orgRef, req.Money.Currency); err != nil {
return nil, merrors.InvalidArgument(fmt.Sprintf("to_account: %s", err.Error()))
}
accountsByRef := map[primitive.ObjectID]*model.Account{
fromAccountRef: fromAccount,
toAccountRef: toAccount,
}
eventTime := getEventTime(req.EventTime)
transferAmount, _ := parseDecimal(req.Money.Amount)
// Create posting lines for transfer
// Dr From Account (debit = negative)
// Cr To Account (credit = positive)
postingLines := make([]*model.PostingLine, 0, 2+len(req.Charges))
// Debit from account
fromLine := &model.PostingLine{
JournalEntryRef: primitive.NilObjectID,
AccountRef: fromAccountRef,
Amount: transferAmount.Neg().String(), // negative = debit
Currency: req.Money.Currency,
LineType: model.LineTypeMain,
}
fromLine.OrganizationRef = orgRef
postingLines = append(postingLines, fromLine)
// Credit to account
toLine := &model.PostingLine{
JournalEntryRef: primitive.NilObjectID,
AccountRef: toAccountRef,
Amount: transferAmount.String(), // positive = credit
Currency: req.Money.Currency,
LineType: model.LineTypeMain,
}
toLine.OrganizationRef = orgRef
postingLines = append(postingLines, toLine)
// Process charges (fees/spreads)
for i, charge := range req.Charges {
chargeAccountRef, err := parseObjectID(charge.LedgerAccountRef)
if err != nil {
return nil, err
}
if charge.Money.Currency != req.Money.Currency {
return nil, merrors.InvalidArgument(fmt.Sprintf("charges[%d]: currency mismatch", i))
}
chargeAccount, err := s.getAccount(ctx, accountsByRef, chargeAccountRef)
if err != nil {
if err == storage.ErrAccountNotFound {
return nil, merrors.NoData(fmt.Sprintf("charges[%d]: account not found", i))
}
s.logger.Warn("failed to get charge account", zap.Error(err), zap.String("chargeAccountRef", chargeAccountRef.Hex()))
return nil, merrors.Internal("failed to get charge account")
}
if err := validateAccountForOrg(chargeAccount, orgRef, charge.Money.Currency); err != nil {
return nil, merrors.InvalidArgument(fmt.Sprintf("charges[%d]: %s", i, err.Error()))
}
chargeAmount, err := parseDecimal(charge.Money.Amount)
if err != nil {
return nil, err
}
chargeLine := &model.PostingLine{
JournalEntryRef: primitive.NilObjectID,
AccountRef: chargeAccountRef,
Amount: chargeAmount.String(),
Currency: charge.Money.Currency,
LineType: protoLineTypeToModel(charge.LineType),
}
chargeLine.OrganizationRef = orgRef
postingLines = append(postingLines, chargeLine)
}
// Execute in transaction
mongoStore, ok := s.storage.(*storageMongo.Store)
if !ok {
return nil, merrors.Internal("storage does not support transactions")
}
result, err := mongoStore.TransactionFactory().CreateTransaction().Execute(ctx, func(txCtx context.Context) (any, error) {
entry := &model.JournalEntry{
IdempotencyKey: req.IdempotencyKey,
EventTime: eventTime,
EntryType: model.EntryTypeTransfer,
Description: req.Description,
Metadata: req.Metadata,
Version: time.Now().UnixNano(),
}
entry.OrganizationRef = orgRef
if err := s.storage.JournalEntries().Create(txCtx, entry); err != nil {
s.logger.Warn("failed to create journal entry", zap.Error(err))
return nil, merrors.Internal("failed to create journal entry")
}
entryRef := entry.GetID()
if entryRef == nil {
return nil, merrors.Internal("journal entry missing identifier")
}
for _, line := range postingLines {
line.JournalEntryRef = *entryRef
}
if err := validateBalanced(postingLines); err != nil {
return nil, err
}
if err := s.storage.PostingLines().CreateMany(txCtx, postingLines); err != nil {
s.logger.Warn("failed to create posting lines", zap.Error(err))
return nil, merrors.Internal("failed to create posting lines")
}
if err := s.upsertBalances(txCtx, postingLines, accountsByRef); err != nil {
return nil, err
}
if err := s.enqueueOutbox(txCtx, entry, postingLines); err != nil {
return nil, err
}
return &ledgerv1.PostResponse{
JournalEntryRef: entryRef.Hex(),
Version: entry.Version,
EntryType: ledgerv1.EntryType_ENTRY_TRANSFER,
}, nil
})
if err != nil {
recordJournalEntryError("transfer", "failed")
return nil, err
}
amountFloat, _ := transferAmount.Float64()
recordTransactionAmount(req.Money.Currency, "transfer", amountFloat)
recordJournalEntry("transfer", "success", 0)
return result.(*ledgerv1.PostResponse), nil
}
}