package ledger import ( "context" "fmt" "time" "github.com/tech/sendico/ledger/storage" "github.com/tech/sendico/ledger/storage/model" storageMongo "github.com/tech/sendico/ledger/storage/mongo" "github.com/tech/sendico/pkg/api/routers/gsresponse" "github.com/tech/sendico/pkg/merrors" ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" ) // fxResponder implements foreign exchange transactions with charges func (s *Service) fxResponder(_ context.Context, req *ledgerv1.FXRequest) gsresponse.Responder[ledgerv1.PostResponse] { return func(ctx context.Context) (*ledgerv1.PostResponse, error) { // Validate request if req.IdempotencyKey == "" { return nil, merrors.InvalidArgument("idempotency_key is required") } if req.OrganizationRef == "" { return nil, merrors.InvalidArgument("organization_ref is required") } if req.FromLedgerAccountRef == "" { return nil, merrors.InvalidArgument("from_ledger_account_ref is required") } if req.ToLedgerAccountRef == "" { return nil, merrors.InvalidArgument("to_ledger_account_ref is required") } if req.FromLedgerAccountRef == req.ToLedgerAccountRef { return nil, merrors.InvalidArgument("cannot exchange to same account") } if err := validateMoney(req.FromMoney, "from_money"); err != nil { return nil, err } if err := validateMoney(req.ToMoney, "to_money"); err != nil { return nil, err } if req.FromMoney.Currency == req.ToMoney.Currency { return nil, merrors.InvalidArgument("from_money and to_money must have different currencies") } if req.Rate == "" { return nil, merrors.InvalidArgument("rate is required") } if err := validatePostingLines(req.Charges); err != nil { return nil, err } orgRef, err := parseObjectID(req.OrganizationRef) if err != nil { return nil, err } fromAccountRef, err := parseObjectID(req.FromLedgerAccountRef) if err != nil { return nil, err } toAccountRef, err := parseObjectID(req.ToLedgerAccountRef) if err != nil { return nil, err } // Check for duplicate idempotency key existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey) if err == nil && existingEntry != nil { recordDuplicateRequest("fx") s.logger.Info("duplicate FX request (idempotency)", zap.String("idempotencyKey", req.IdempotencyKey), zap.String("existingEntryID", existingEntry.GetID().Hex())) return &ledgerv1.PostResponse{ JournalEntryRef: existingEntry.GetID().Hex(), Version: existingEntry.Version, EntryType: ledgerv1.EntryType_ENTRY_FX, }, nil } if err != nil && err != storage.ErrJournalEntryNotFound { s.logger.Warn("failed to check idempotency", zap.Error(err)) return nil, merrors.Internal("failed to check idempotency") } // Verify both accounts exist and are active fromAccount, err := s.storage.Accounts().Get(ctx, fromAccountRef) if err != nil { if err == storage.ErrAccountNotFound { return nil, merrors.NoData("from_account not found") } s.logger.Warn("failed to get from_account", zap.Error(err)) return nil, merrors.Internal("failed to get from_account") } if err := validateAccountForOrg(fromAccount, orgRef, req.FromMoney.Currency); err != nil { return nil, merrors.InvalidArgument(fmt.Sprintf("from_account: %s", err.Error())) } toAccount, err := s.storage.Accounts().Get(ctx, toAccountRef) if err != nil { if err == storage.ErrAccountNotFound { return nil, merrors.NoData("to_account not found") } s.logger.Warn("failed to get to_account", zap.Error(err)) return nil, merrors.Internal("failed to get to_account") } if err := validateAccountForOrg(toAccount, orgRef, req.ToMoney.Currency); err != nil { return nil, merrors.InvalidArgument(fmt.Sprintf("to_account: %s", err.Error())) } accountsByRef := map[primitive.ObjectID]*model.Account{ fromAccountRef: fromAccount, toAccountRef: toAccount, } eventTime := getEventTime(req.EventTime) fromAmount, _ := parseDecimal(req.FromMoney.Amount) toAmount, _ := parseDecimal(req.ToMoney.Amount) // Create posting lines for FX // Dr From Account in fromCurrency (debit = negative) // Cr To Account in toCurrency (credit = positive) postingLines := make([]*model.PostingLine, 0, 2+len(req.Charges)) // Debit from account fromLine := &model.PostingLine{ JournalEntryRef: primitive.NilObjectID, AccountRef: fromAccountRef, Amount: fromAmount.Neg().String(), // negative = debit Currency: req.FromMoney.Currency, LineType: model.LineTypeMain, } fromLine.OrganizationRef = orgRef postingLines = append(postingLines, fromLine) // Credit to account toLine := &model.PostingLine{ JournalEntryRef: primitive.NilObjectID, AccountRef: toAccountRef, Amount: toAmount.String(), // positive = credit Currency: req.ToMoney.Currency, LineType: model.LineTypeMain, } toLine.OrganizationRef = orgRef postingLines = append(postingLines, toLine) for i, charge := range req.Charges { chargeAccountRef, err := parseObjectID(charge.LedgerAccountRef) if err != nil { return nil, err } chargeAccount, err := s.getAccount(ctx, accountsByRef, chargeAccountRef) if err != nil { if err == storage.ErrAccountNotFound { return nil, merrors.NoData(fmt.Sprintf("charges[%d]: account not found", i)) } s.logger.Warn("failed to get FX charge account", zap.Error(err), zap.String("chargeAccountRef", chargeAccountRef.Hex())) return nil, merrors.Internal("failed to get charge account") } if err := validateAccountForOrg(chargeAccount, orgRef, charge.Money.Currency); err != nil { return nil, merrors.InvalidArgument(fmt.Sprintf("charges[%d]: %s", i, err.Error())) } chargeAmount, err := parseDecimal(charge.Money.Amount) if err != nil { return nil, err } chargeLine := &model.PostingLine{ JournalEntryRef: primitive.NilObjectID, AccountRef: chargeAccountRef, Amount: chargeAmount.String(), Currency: charge.Money.Currency, LineType: protoLineTypeToModel(charge.LineType), } chargeLine.OrganizationRef = orgRef postingLines = append(postingLines, chargeLine) } // Execute in transaction mongoStore, ok := s.storage.(*storageMongo.Store) if !ok { return nil, merrors.Internal("storage does not support transactions") } result, err := mongoStore.TransactionFactory().CreateTransaction().Execute(ctx, func(txCtx context.Context) (any, error) { metadata := make(map[string]string) if req.Metadata != nil { for k, v := range req.Metadata { metadata[k] = v } } metadata["fx_rate"] = req.Rate metadata["from_currency"] = req.FromMoney.Currency metadata["to_currency"] = req.ToMoney.Currency metadata["from_amount"] = req.FromMoney.Amount metadata["to_amount"] = req.ToMoney.Amount entry := &model.JournalEntry{ IdempotencyKey: req.IdempotencyKey, EventTime: eventTime, EntryType: model.EntryTypeFX, Description: req.Description, Metadata: metadata, Version: time.Now().UnixNano(), } entry.OrganizationRef = orgRef if err := s.storage.JournalEntries().Create(txCtx, entry); err != nil { s.logger.Warn("failed to create journal entry", zap.Error(err)) return nil, merrors.Internal("failed to create journal entry") } entryRef := entry.GetID() if entryRef == nil { return nil, merrors.Internal("journal entry missing identifier") } for _, line := range postingLines { line.JournalEntryRef = *entryRef } if err := s.storage.PostingLines().CreateMany(txCtx, postingLines); err != nil { s.logger.Warn("failed to create posting lines", zap.Error(err)) return nil, merrors.Internal("failed to create posting lines") } if err := s.upsertBalances(txCtx, postingLines, accountsByRef); err != nil { return nil, err } if err := s.enqueueOutbox(txCtx, entry, postingLines); err != nil { return nil, err } return &ledgerv1.PostResponse{ JournalEntryRef: entryRef.Hex(), Version: entry.Version, EntryType: ledgerv1.EntryType_ENTRY_FX, }, nil }) if err != nil { recordJournalEntryError("fx", "transaction_failed") return nil, err } fromAmountFloat, _ := fromAmount.Float64() toAmountFloat, _ := toAmount.Float64() recordTransactionAmount(req.FromMoney.Currency, "fx", fromAmountFloat) recordTransactionAmount(req.ToMoney.Currency, "fx", toAmountFloat) recordJournalEntry("fx", "success", 0) return result.(*ledgerv1.PostResponse), nil } }