Files
sendico/api/ledger/internal/service/ledger/queries.go
2025-11-18 00:20:25 +01:00

270 lines
8.2 KiB
Go

package ledger
import (
"context"
"encoding/base64"
"fmt"
"strconv"
"strings"
"github.com/tech/sendico/ledger/storage"
"github.com/tech/sendico/pkg/api/routers/gsresponse"
"github.com/tech/sendico/pkg/merrors"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
)
// getBalanceResponder implements balance query logic
func (s *Service) getBalanceResponder(_ context.Context, req *ledgerv1.GetBalanceRequest) gsresponse.Responder[ledgerv1.BalanceResponse] {
return func(ctx context.Context) (*ledgerv1.BalanceResponse, error) {
if req.LedgerAccountRef == "" {
return nil, merrors.InvalidArgument("ledger_account_ref is required")
}
accountRef, err := parseObjectID(req.LedgerAccountRef)
if err != nil {
return nil, err
}
// Get account to verify it exists
account, err := s.storage.Accounts().Get(ctx, accountRef)
if err != nil {
if err == storage.ErrAccountNotFound {
return nil, merrors.NoData("account not found")
}
s.logger.Warn("failed to get account", zap.Error(err))
return nil, merrors.Internal("failed to get account")
}
// Get balance
balance, err := s.storage.Balances().Get(ctx, accountRef)
if err != nil {
if err == storage.ErrBalanceNotFound {
// Return zero balance if account exists but has no balance yet
return &ledgerv1.BalanceResponse{
LedgerAccountRef: req.LedgerAccountRef,
Balance: &moneyv1.Money{
Amount: "0",
Currency: account.Currency,
},
Version: 0,
LastUpdated: timestamppb.Now(),
}, nil
}
s.logger.Warn("failed to get balance", zap.Error(err))
return nil, merrors.Internal("failed to get balance")
}
recordBalanceQuery("success", 0)
return &ledgerv1.BalanceResponse{
LedgerAccountRef: req.LedgerAccountRef,
Balance: &moneyv1.Money{
Amount: balance.Balance,
Currency: account.Currency,
},
Version: balance.Version,
LastUpdated: timestamppb.New(balance.UpdatedAt),
}, nil
}
}
// getJournalEntryResponder implements journal entry query logic
func (s *Service) getJournalEntryResponder(_ context.Context, req *ledgerv1.GetEntryRequest) gsresponse.Responder[ledgerv1.JournalEntryResponse] {
return func(ctx context.Context) (*ledgerv1.JournalEntryResponse, error) {
if req.EntryRef == "" {
return nil, merrors.InvalidArgument("entry_ref is required")
}
entryRef, err := parseObjectID(req.EntryRef)
if err != nil {
return nil, err
}
// Get journal entry
entry, err := s.storage.JournalEntries().Get(ctx, entryRef)
if err != nil {
if err == storage.ErrJournalEntryNotFound {
return nil, merrors.NoData("journal entry not found")
}
s.logger.Warn("failed to get journal entry", zap.Error(err))
return nil, merrors.Internal("failed to get journal entry")
}
// Get posting lines for this entry
lines, err := s.storage.PostingLines().ListByJournalEntry(ctx, entryRef)
if err != nil {
s.logger.Warn("failed to get posting lines", zap.Error(err))
return nil, merrors.Internal("failed to get posting lines")
}
// Convert to proto
protoLines := make([]*ledgerv1.PostingLine, 0, len(lines))
accountRefs := make([]string, 0, len(lines))
for _, line := range lines {
protoLines = append(protoLines, &ledgerv1.PostingLine{
LedgerAccountRef: line.AccountRef.Hex(),
Money: &moneyv1.Money{
Amount: line.Amount,
Currency: line.Currency,
},
LineType: modelLineTypeToProto(line.LineType),
})
accountRefs = append(accountRefs, line.AccountRef.Hex())
}
return &ledgerv1.JournalEntryResponse{
EntryRef: req.EntryRef,
IdempotencyKey: entry.IdempotencyKey,
EntryType: modelEntryTypeToProto(entry.EntryType),
Description: entry.Description,
EventTime: timestamppb.New(entry.EventTime),
Version: entry.Version,
Lines: protoLines,
Metadata: entry.Metadata,
LedgerAccountRefs: accountRefs,
}, nil
}
}
// getStatementResponder implements account statement query logic
func (s *Service) getStatementResponder(_ context.Context, req *ledgerv1.GetStatementRequest) gsresponse.Responder[ledgerv1.StatementResponse] {
return func(ctx context.Context) (*ledgerv1.StatementResponse, error) {
if req.LedgerAccountRef == "" {
return nil, merrors.InvalidArgument("ledger_account_ref is required")
}
accountRef, err := parseObjectID(req.LedgerAccountRef)
if err != nil {
return nil, err
}
// Verify account exists
_, err = s.storage.Accounts().Get(ctx, accountRef)
if err != nil {
if err == storage.ErrAccountNotFound {
return nil, merrors.NoData("account not found")
}
s.logger.Warn("failed to get account", zap.Error(err))
return nil, merrors.Internal("failed to get account")
}
// Parse pagination
limit := int(req.Limit)
if limit <= 0 {
limit = 50 // default
}
if limit > 100 {
limit = 100 // max
}
offset := 0
if req.Cursor != "" {
offset, err = parseCursor(req.Cursor)
if err != nil {
return nil, merrors.InvalidArgument(fmt.Sprintf("invalid cursor: %v", err))
}
}
// Get posting lines for account
postingLines, err := s.storage.PostingLines().ListByAccount(ctx, accountRef, limit+1, offset)
if err != nil {
s.logger.Warn("failed to get posting lines", zap.Error(err))
return nil, merrors.Internal("failed to get posting lines")
}
// Check if there are more results
hasMore := len(postingLines) > limit
if hasMore {
postingLines = postingLines[:limit]
}
// Group by journal entry and fetch entry details
entryMap := make(map[string]bool)
for _, line := range postingLines {
entryMap[line.JournalEntryRef.Hex()] = true
}
entries := make([]*ledgerv1.JournalEntryResponse, 0)
for entryRefHex := range entryMap {
entryRef, _ := parseObjectID(entryRefHex)
entry, err := s.storage.JournalEntries().Get(ctx, entryRef)
if err != nil {
s.logger.Warn("failed to get journal entry for statement", zap.Error(err), zap.String("entryRef", entryRefHex))
continue
}
// Get all lines for this entry
lines, err := s.storage.PostingLines().ListByJournalEntry(ctx, entryRef)
if err != nil {
s.logger.Warn("failed to get posting lines for entry", zap.Error(err), zap.String("entryRef", entryRefHex))
continue
}
// Convert to proto
protoLines := make([]*ledgerv1.PostingLine, 0, len(lines))
accountRefs := make([]string, 0, len(lines))
for _, line := range lines {
protoLines = append(protoLines, &ledgerv1.PostingLine{
LedgerAccountRef: line.AccountRef.Hex(),
Money: &moneyv1.Money{
Amount: line.Amount,
Currency: line.Currency,
},
LineType: modelLineTypeToProto(line.LineType),
})
accountRefs = append(accountRefs, line.AccountRef.Hex())
}
entries = append(entries, &ledgerv1.JournalEntryResponse{
EntryRef: entryRefHex,
IdempotencyKey: entry.IdempotencyKey,
EntryType: modelEntryTypeToProto(entry.EntryType),
Description: entry.Description,
EventTime: timestamppb.New(entry.EventTime),
Version: entry.Version,
Lines: protoLines,
Metadata: entry.Metadata,
LedgerAccountRefs: accountRefs,
})
}
// Generate next cursor
nextCursor := ""
if hasMore {
nextCursor = encodeCursor(offset + limit)
}
return &ledgerv1.StatementResponse{
Entries: entries,
NextCursor: nextCursor,
}, nil
}
}
// parseCursor decodes a pagination cursor
func parseCursor(cursor string) (int, error) {
decoded, err := base64.StdEncoding.DecodeString(cursor)
if err != nil {
return 0, merrors.InvalidArgumentWrap(err, "invalid cursor base64 encoding")
}
parts := strings.Split(string(decoded), ":")
if len(parts) != 2 || parts[0] != "offset" {
return 0, merrors.InvalidArgument("invalid cursor format")
}
offset, err := strconv.Atoi(parts[1])
if err != nil {
return 0, merrors.InvalidArgumentWrap(err, "invalid cursor offset")
}
return offset, nil
}
// encodeCursor encodes an offset into a pagination cursor
func encodeCursor(offset int) string {
cursor := fmt.Sprintf("offset:%d", offset)
return base64.StdEncoding.EncodeToString([]byte(cursor))
}