package ledger import ( "context" "fmt" "time" "github.com/tech/sendico/ledger/storage" "github.com/tech/sendico/ledger/storage/model" storageMongo "github.com/tech/sendico/ledger/storage/mongo" "github.com/tech/sendico/pkg/api/routers/gsresponse" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mutil/mzap" ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" ) // transferResponder implements internal transfer between accounts func (s *Service) transferResponder(_ context.Context, req *ledgerv1.TransferRequest) gsresponse.Responder[ledgerv1.PostResponse] { return func(ctx context.Context) (*ledgerv1.PostResponse, error) { // Validate request if req.IdempotencyKey == "" { return nil, merrors.InvalidArgument("idempotency_key is required") } if req.OrganizationRef == "" { return nil, merrors.InvalidArgument("organization_ref is required") } if req.FromLedgerAccountRef == "" { return nil, merrors.InvalidArgument("from_ledger_account_ref is required") } if req.ToLedgerAccountRef == "" { return nil, merrors.InvalidArgument("to_ledger_account_ref is required") } if req.FromLedgerAccountRef == req.ToLedgerAccountRef { return nil, merrors.InvalidArgument("cannot transfer to same account") } if err := validateMoney(req.Money, "money"); err != nil { return nil, err } if err := validatePostingLines(req.Charges); err != nil { return nil, err } orgRef, err := parseObjectID(req.OrganizationRef) if err != nil { return nil, err } fromAccountRef, err := parseObjectID(req.FromLedgerAccountRef) if err != nil { return nil, err } toAccountRef, err := parseObjectID(req.ToLedgerAccountRef) if err != nil { return nil, err } logger := s.logger.With( zap.String("idempotency_key", req.IdempotencyKey), mzap.ObjRef("organization_ref", orgRef), mzap.ObjRef("from_account_ref", fromAccountRef), mzap.ObjRef("to_account_ref", toAccountRef), zap.String("currency", req.Money.Currency), ) // Check for duplicate idempotency key existingEntry, err := s.storage.JournalEntries().GetByIdempotencyKey(ctx, orgRef, req.IdempotencyKey) if err == nil && existingEntry != nil { recordDuplicateRequest("transfer") logger.Info("duplicate transfer request (idempotency)", zap.String("existingEntryID", existingEntry.GetID().Hex())) return &ledgerv1.PostResponse{ JournalEntryRef: existingEntry.GetID().Hex(), Version: existingEntry.Version, EntryType: ledgerv1.EntryType_ENTRY_TRANSFER, }, nil } if err != nil && err != storage.ErrJournalEntryNotFound { logger.Warn("failed to check idempotency", zap.Error(err)) return nil, merrors.Internal("failed to check idempotency") } // Verify both accounts exist and are active fromAccount, err := s.storage.Accounts().Get(ctx, fromAccountRef) if err != nil { if err == storage.ErrAccountNotFound { return nil, merrors.NoData("from_account not found") } logger.Warn("failed to get from_account", zap.Error(err)) return nil, merrors.Internal("failed to get from_account") } if err := validateAccountForOrg(fromAccount, orgRef, req.Money.Currency); err != nil { return nil, merrors.InvalidArgument(fmt.Sprintf("from_account: %s", err.Error())) } toAccount, err := s.storage.Accounts().Get(ctx, toAccountRef) if err != nil { if err == storage.ErrAccountNotFound { return nil, merrors.NoData("to_account not found") } logger.Warn("failed to get to_account", zap.Error(err)) return nil, merrors.Internal("failed to get to_account") } if err := validateAccountForOrg(toAccount, orgRef, req.Money.Currency); err != nil { return nil, merrors.InvalidArgument(fmt.Sprintf("to_account: %s", err.Error())) } accountsByRef := map[primitive.ObjectID]*model.Account{ fromAccountRef: fromAccount, toAccountRef: toAccount, } eventTime := getEventTime(req.EventTime) transferAmount, _ := parseDecimal(req.Money.Amount) // Create posting lines for transfer // Dr From Account (debit = negative) // Cr To Account (credit = positive) postingLines := make([]*model.PostingLine, 0, 2+len(req.Charges)) // Debit from account fromLine := &model.PostingLine{ JournalEntryRef: primitive.NilObjectID, AccountRef: fromAccountRef, Amount: transferAmount.Neg().String(), // negative = debit Currency: req.Money.Currency, LineType: model.LineTypeMain, } fromLine.OrganizationRef = orgRef postingLines = append(postingLines, fromLine) // Credit to account toLine := &model.PostingLine{ JournalEntryRef: primitive.NilObjectID, AccountRef: toAccountRef, Amount: transferAmount.String(), // positive = credit Currency: req.Money.Currency, LineType: model.LineTypeMain, } toLine.OrganizationRef = orgRef postingLines = append(postingLines, toLine) // Process charges (fees/spreads) for i, charge := range req.Charges { chargeAccountRef, err := parseObjectID(charge.LedgerAccountRef) if err != nil { return nil, err } if charge.Money.Currency != req.Money.Currency { return nil, merrors.InvalidArgument(fmt.Sprintf("charges[%d]: currency mismatch", i)) } chargeAccount, err := s.getAccount(ctx, accountsByRef, chargeAccountRef) if err != nil { if err == storage.ErrAccountNotFound { return nil, merrors.NoData(fmt.Sprintf("charges[%d]: account not found", i)) } logger.Warn("failed to get charge account", zap.Error(err), zap.String("chargeAccountRef", chargeAccountRef.Hex())) return nil, merrors.Internal("failed to get charge account") } if err := validateAccountForOrg(chargeAccount, orgRef, charge.Money.Currency); err != nil { return nil, merrors.InvalidArgument(fmt.Sprintf("charges[%d]: %s", i, err.Error())) } chargeAmount, err := parseDecimal(charge.Money.Amount) if err != nil { return nil, err } chargeLine := &model.PostingLine{ JournalEntryRef: primitive.NilObjectID, AccountRef: chargeAccountRef, Amount: chargeAmount.String(), Currency: charge.Money.Currency, LineType: protoLineTypeToModel(charge.LineType), } chargeLine.OrganizationRef = orgRef postingLines = append(postingLines, chargeLine) } // Execute in transaction mongoStore, ok := s.storage.(*storageMongo.Store) if !ok { return nil, merrors.Internal("storage does not support transactions") } result, err := mongoStore.TransactionFactory().CreateTransaction().Execute(ctx, func(txCtx context.Context) (any, error) { entry := &model.JournalEntry{ IdempotencyKey: req.IdempotencyKey, EventTime: eventTime, EntryType: model.EntryTypeTransfer, Description: req.Description, Metadata: req.Metadata, Version: time.Now().UnixNano(), } entry.OrganizationRef = orgRef if err := s.storage.JournalEntries().Create(txCtx, entry); err != nil { logger.Warn("failed to create journal entry", zap.Error(err)) return nil, merrors.Internal("failed to create journal entry") } entryRef := entry.GetID() if entryRef == nil { return nil, merrors.Internal("journal entry missing identifier") } for _, line := range postingLines { line.JournalEntryRef = *entryRef } if err := validateBalanced(postingLines); err != nil { return nil, err } if err := s.storage.PostingLines().CreateMany(txCtx, postingLines); err != nil { logger.Warn("failed to create posting lines", zap.Error(err)) return nil, merrors.Internal("failed to create posting lines") } if err := s.upsertBalances(txCtx, postingLines, accountsByRef); err != nil { return nil, err } if err := s.enqueueOutbox(txCtx, entry, postingLines); err != nil { return nil, err } return &ledgerv1.PostResponse{ JournalEntryRef: entryRef.Hex(), Version: entry.Version, EntryType: ledgerv1.EntryType_ENTRY_TRANSFER, }, nil }) if err != nil { recordJournalEntryError("transfer", "failed") return nil, err } amountFloat, _ := transferAmount.Float64() recordTransactionAmount(req.Money.Currency, "transfer", amountFloat) recordJournalEntry("transfer", "success", 0) return result.(*ledgerv1.PostResponse), nil } }