255 lines
8.6 KiB
Go
255 lines
8.6 KiB
Go
package ledger
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/ledger/storage"
|
|
"github.com/tech/sendico/ledger/storage/model"
|
|
storageMongo "github.com/tech/sendico/ledger/storage/mongo"
|
|
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// fxResponder implements foreign exchange transactions with charges
|
|
func (s *Service) fxResponder(_ context.Context, req *ledgerv1.FXRequest) gsresponse.Responder[ledgerv1.PostResponse] {
|
|
return func(ctx context.Context) (*ledgerv1.PostResponse, error) {
|
|
// Validate request
|
|
if req.IdempotencyKey == "" {
|
|
return nil, merrors.InvalidArgument("idempotency_key is required")
|
|
}
|
|
if req.OrganizationRef == "" {
|
|
return nil, merrors.InvalidArgument("organization_ref is required")
|
|
}
|
|
if req.FromLedgerAccountRef == "" {
|
|
return nil, merrors.InvalidArgument("from_ledger_account_ref is required")
|
|
}
|
|
if req.ToLedgerAccountRef == "" {
|
|
return nil, merrors.InvalidArgument("to_ledger_account_ref is required")
|
|
}
|
|
if req.FromLedgerAccountRef == req.ToLedgerAccountRef {
|
|
return nil, merrors.InvalidArgument("cannot exchange to same account")
|
|
}
|
|
if err := validateMoney(req.FromMoney, "from_money"); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := validateMoney(req.ToMoney, "to_money"); err != nil {
|
|
return nil, err
|
|
}
|
|
if req.FromMoney.Currency == req.ToMoney.Currency {
|
|
return nil, merrors.InvalidArgument("from_money and to_money must have different currencies")
|
|
}
|
|
if req.Rate == "" {
|
|
return nil, merrors.InvalidArgument("rate is required")
|
|
}
|
|
if err := validatePostingLines(req.Charges); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
orgRef, err := parseObjectID(req.OrganizationRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
fromAccountRef, err := parseObjectID(req.FromLedgerAccountRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
toAccountRef, err := parseObjectID(req.ToLedgerAccountRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Check for duplicate idempotency key
|
|
existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey)
|
|
if err == nil && existingEntry != nil {
|
|
recordDuplicateRequest("fx")
|
|
s.logger.Info("duplicate FX request (idempotency)",
|
|
zap.String("idempotencyKey", req.IdempotencyKey),
|
|
zap.String("existingEntryID", existingEntry.GetID().Hex()))
|
|
return &ledgerv1.PostResponse{
|
|
JournalEntryRef: existingEntry.GetID().Hex(),
|
|
Version: existingEntry.Version,
|
|
EntryType: ledgerv1.EntryType_ENTRY_FX,
|
|
}, nil
|
|
}
|
|
if err != nil && err != storage.ErrJournalEntryNotFound {
|
|
s.logger.Warn("failed to check idempotency", zap.Error(err))
|
|
return nil, merrors.Internal("failed to check idempotency")
|
|
}
|
|
|
|
// Verify both accounts exist and are active
|
|
fromAccount, err := s.storage.Accounts().Get(ctx, fromAccountRef)
|
|
if err != nil {
|
|
if err == storage.ErrAccountNotFound {
|
|
return nil, merrors.NoData("from_account not found")
|
|
}
|
|
s.logger.Warn("failed to get from_account", zap.Error(err))
|
|
return nil, merrors.Internal("failed to get from_account")
|
|
}
|
|
if err := validateAccountForOrg(fromAccount, orgRef, req.FromMoney.Currency); err != nil {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("from_account: %s", err.Error()))
|
|
}
|
|
|
|
toAccount, err := s.storage.Accounts().Get(ctx, toAccountRef)
|
|
if err != nil {
|
|
if err == storage.ErrAccountNotFound {
|
|
return nil, merrors.NoData("to_account not found")
|
|
}
|
|
s.logger.Warn("failed to get to_account", zap.Error(err))
|
|
return nil, merrors.Internal("failed to get to_account")
|
|
}
|
|
if err := validateAccountForOrg(toAccount, orgRef, req.ToMoney.Currency); err != nil {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("to_account: %s", err.Error()))
|
|
}
|
|
|
|
accountsByRef := map[primitive.ObjectID]*model.Account{
|
|
fromAccountRef: fromAccount,
|
|
toAccountRef: toAccount,
|
|
}
|
|
|
|
eventTime := getEventTime(req.EventTime)
|
|
fromAmount, _ := parseDecimal(req.FromMoney.Amount)
|
|
toAmount, _ := parseDecimal(req.ToMoney.Amount)
|
|
|
|
// Create posting lines for FX
|
|
// Dr From Account in fromCurrency (debit = negative)
|
|
// Cr To Account in toCurrency (credit = positive)
|
|
postingLines := make([]*model.PostingLine, 0, 2+len(req.Charges))
|
|
|
|
// Debit from account
|
|
fromLine := &model.PostingLine{
|
|
JournalEntryRef: primitive.NilObjectID,
|
|
AccountRef: fromAccountRef,
|
|
Amount: fromAmount.Neg().String(), // negative = debit
|
|
Currency: req.FromMoney.Currency,
|
|
LineType: model.LineTypeMain,
|
|
}
|
|
fromLine.OrganizationRef = orgRef
|
|
postingLines = append(postingLines, fromLine)
|
|
|
|
// Credit to account
|
|
toLine := &model.PostingLine{
|
|
JournalEntryRef: primitive.NilObjectID,
|
|
AccountRef: toAccountRef,
|
|
Amount: toAmount.String(), // positive = credit
|
|
Currency: req.ToMoney.Currency,
|
|
LineType: model.LineTypeMain,
|
|
}
|
|
toLine.OrganizationRef = orgRef
|
|
postingLines = append(postingLines, toLine)
|
|
|
|
for i, charge := range req.Charges {
|
|
chargeAccountRef, err := parseObjectID(charge.LedgerAccountRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
chargeAccount, err := s.getAccount(ctx, accountsByRef, chargeAccountRef)
|
|
if err != nil {
|
|
if err == storage.ErrAccountNotFound {
|
|
return nil, merrors.NoData(fmt.Sprintf("charges[%d]: account not found", i))
|
|
}
|
|
s.logger.Warn("failed to get FX charge account", zap.Error(err), zap.String("chargeAccountRef", chargeAccountRef.Hex()))
|
|
return nil, merrors.Internal("failed to get charge account")
|
|
}
|
|
if err := validateAccountForOrg(chargeAccount, orgRef, charge.Money.Currency); err != nil {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("charges[%d]: %s", i, err.Error()))
|
|
}
|
|
|
|
chargeAmount, err := parseDecimal(charge.Money.Amount)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
chargeLine := &model.PostingLine{
|
|
JournalEntryRef: primitive.NilObjectID,
|
|
AccountRef: chargeAccountRef,
|
|
Amount: chargeAmount.String(),
|
|
Currency: charge.Money.Currency,
|
|
LineType: protoLineTypeToModel(charge.LineType),
|
|
}
|
|
chargeLine.OrganizationRef = orgRef
|
|
postingLines = append(postingLines, chargeLine)
|
|
}
|
|
|
|
// Execute in transaction
|
|
mongoStore, ok := s.storage.(*storageMongo.Store)
|
|
if !ok {
|
|
return nil, merrors.Internal("storage does not support transactions")
|
|
}
|
|
|
|
result, err := mongoStore.TransactionFactory().CreateTransaction().Execute(ctx, func(txCtx context.Context) (any, error) {
|
|
metadata := make(map[string]string)
|
|
if req.Metadata != nil {
|
|
for k, v := range req.Metadata {
|
|
metadata[k] = v
|
|
}
|
|
}
|
|
metadata["fx_rate"] = req.Rate
|
|
metadata["from_currency"] = req.FromMoney.Currency
|
|
metadata["to_currency"] = req.ToMoney.Currency
|
|
metadata["from_amount"] = req.FromMoney.Amount
|
|
metadata["to_amount"] = req.ToMoney.Amount
|
|
|
|
entry := &model.JournalEntry{
|
|
IdempotencyKey: req.IdempotencyKey,
|
|
EventTime: eventTime,
|
|
EntryType: model.EntryTypeFX,
|
|
Description: req.Description,
|
|
Metadata: metadata,
|
|
Version: time.Now().UnixNano(),
|
|
}
|
|
entry.OrganizationRef = orgRef
|
|
|
|
if err := s.storage.JournalEntries().Create(txCtx, entry); err != nil {
|
|
s.logger.Warn("failed to create journal entry", zap.Error(err))
|
|
return nil, merrors.Internal("failed to create journal entry")
|
|
}
|
|
|
|
entryRef := entry.GetID()
|
|
if entryRef == nil {
|
|
return nil, merrors.Internal("journal entry missing identifier")
|
|
}
|
|
|
|
for _, line := range postingLines {
|
|
line.JournalEntryRef = *entryRef
|
|
}
|
|
|
|
if err := s.storage.PostingLines().CreateMany(txCtx, postingLines); err != nil {
|
|
s.logger.Warn("failed to create posting lines", zap.Error(err))
|
|
return nil, merrors.Internal("failed to create posting lines")
|
|
}
|
|
|
|
if err := s.upsertBalances(txCtx, postingLines, accountsByRef); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.enqueueOutbox(txCtx, entry, postingLines); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &ledgerv1.PostResponse{
|
|
JournalEntryRef: entryRef.Hex(),
|
|
Version: entry.Version,
|
|
EntryType: ledgerv1.EntryType_ENTRY_FX,
|
|
}, nil
|
|
})
|
|
|
|
if err != nil {
|
|
recordJournalEntryError("fx", "transaction_failed")
|
|
return nil, err
|
|
}
|
|
|
|
fromAmountFloat, _ := fromAmount.Float64()
|
|
toAmountFloat, _ := toAmount.Float64()
|
|
recordTransactionAmount(req.FromMoney.Currency, "fx", fromAmountFloat)
|
|
recordTransactionAmount(req.ToMoney.Currency, "fx", toAmountFloat)
|
|
recordJournalEntry("fx", "success", 0)
|
|
return result.(*ledgerv1.PostResponse), nil
|
|
}
|
|
}
|