interface refactoring

This commit is contained in:
Stephan D
2026-01-05 01:22:47 +01:00
parent fcd831902a
commit 7424ef751c
16 changed files with 3623 additions and 133 deletions

View File

@@ -0,0 +1,622 @@
package ledger
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/tech/sendico/ledger/internal/appversion"
"github.com/tech/sendico/pkg/connector/params"
"github.com/tech/sendico/pkg/merrors"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
const ledgerConnectorID = "ledger"
type connectorAdapter struct {
svc *Service
}
func newConnectorAdapter(svc *Service) *connectorAdapter {
return &connectorAdapter{svc: svc}
}
func (c *connectorAdapter) GetCapabilities(_ context.Context, _ *connectorv1.GetCapabilitiesRequest) (*connectorv1.GetCapabilitiesResponse, error) {
return &connectorv1.GetCapabilitiesResponse{
Capabilities: &connectorv1.ConnectorCapabilities{
ConnectorType: ledgerConnectorID,
Version: appversion.Create().Short(),
SupportedAccountKinds: []connectorv1.AccountKind{connectorv1.AccountKind_LEDGER_ACCOUNT},
SupportedOperationTypes: []connectorv1.OperationType{
connectorv1.OperationType_CREDIT,
connectorv1.OperationType_DEBIT,
connectorv1.OperationType_TRANSFER,
connectorv1.OperationType_FX,
},
OpenAccountParams: ledgerOpenAccountParams(),
OperationParams: ledgerOperationParams(),
},
}, nil
}
func (c *connectorAdapter) OpenAccount(ctx context.Context, req *connectorv1.OpenAccountRequest) (*connectorv1.OpenAccountResponse, error) {
if req == nil {
return &connectorv1.OpenAccountResponse{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "open_account: request is required", nil, "")}, nil
}
if req.GetKind() != connectorv1.AccountKind_LEDGER_ACCOUNT {
return &connectorv1.OpenAccountResponse{Error: connectorError(connectorv1.ErrorCode_UNSUPPORTED_ACCOUNT_KIND, "open_account: unsupported account kind", nil, "")}, nil
}
reader := params.New(req.GetParams())
orgRef := strings.TrimSpace(reader.String("organization_ref"))
accountCode := strings.TrimSpace(reader.String("account_code"))
if orgRef == "" || accountCode == "" {
return &connectorv1.OpenAccountResponse{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "open_account: organization_ref and account_code are required", nil, "")}, nil
}
accountType, err := parseLedgerAccountType(reader, "account_type")
if err != nil {
return &connectorv1.OpenAccountResponse{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, err.Error(), nil, "")}, nil
}
currency := strings.TrimSpace(req.GetAsset())
if currency == "" {
currency = strings.TrimSpace(reader.String("currency"))
}
if currency == "" {
return &connectorv1.OpenAccountResponse{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "open_account: asset is required", nil, "")}, nil
}
status := parseLedgerAccountStatus(reader, "status")
metadata := mergeMetadata(reader.StringMap("metadata"), req.GetLabel(), req.GetOwnerRef(), req.GetCorrelationId(), req.GetParentIntentId())
resp, err := c.svc.CreateAccount(ctx, &ledgerv1.CreateAccountRequest{
OrganizationRef: orgRef,
AccountCode: accountCode,
AccountType: accountType,
Currency: currency,
Status: status,
AllowNegative: reader.Bool("allow_negative"),
IsSettlement: reader.Bool("is_settlement"),
Metadata: metadata,
})
if err != nil {
return &connectorv1.OpenAccountResponse{Error: connectorError(mapErrorCode(err), err.Error(), nil, "")}, nil
}
return &connectorv1.OpenAccountResponse{
Account: ledgerAccountToConnector(resp.GetAccount()),
}, nil
}
func (c *connectorAdapter) GetAccount(ctx context.Context, req *connectorv1.GetAccountRequest) (*connectorv1.GetAccountResponse, error) {
if req == nil || req.GetAccountRef() == nil || strings.TrimSpace(req.GetAccountRef().GetAccountId()) == "" {
return nil, merrors.InvalidArgument("get_account: account_ref.account_id is required")
}
accountRef, err := parseObjectID(strings.TrimSpace(req.GetAccountRef().GetAccountId()))
if err != nil {
return nil, err
}
if c.svc.storage == nil || c.svc.storage.Accounts() == nil {
return nil, merrors.Internal("get_account: storage unavailable")
}
account, err := c.svc.storage.Accounts().Get(ctx, accountRef)
if err != nil {
return nil, err
}
return &connectorv1.GetAccountResponse{
Account: ledgerAccountToConnector(toProtoAccount(account)),
}, nil
}
func (c *connectorAdapter) ListAccounts(ctx context.Context, req *connectorv1.ListAccountsRequest) (*connectorv1.ListAccountsResponse, error) {
if req == nil || strings.TrimSpace(req.GetOwnerRef()) == "" {
return nil, merrors.InvalidArgument("list_accounts: owner_ref is required")
}
resp, err := c.svc.ListAccounts(ctx, &ledgerv1.ListAccountsRequest{OrganizationRef: strings.TrimSpace(req.GetOwnerRef())})
if err != nil {
return nil, err
}
accounts := make([]*connectorv1.Account, 0, len(resp.GetAccounts()))
for _, account := range resp.GetAccounts() {
accounts = append(accounts, ledgerAccountToConnector(account))
}
return &connectorv1.ListAccountsResponse{Accounts: accounts}, nil
}
func (c *connectorAdapter) GetBalance(ctx context.Context, req *connectorv1.GetBalanceRequest) (*connectorv1.GetBalanceResponse, error) {
if req == nil || req.GetAccountRef() == nil || strings.TrimSpace(req.GetAccountRef().GetAccountId()) == "" {
return nil, merrors.InvalidArgument("get_balance: account_ref.account_id is required")
}
resp, err := c.svc.GetBalance(ctx, &ledgerv1.GetBalanceRequest{
LedgerAccountRef: strings.TrimSpace(req.GetAccountRef().GetAccountId()),
})
if err != nil {
return nil, err
}
return &connectorv1.GetBalanceResponse{
Balance: &connectorv1.Balance{
AccountRef: req.GetAccountRef(),
Available: resp.GetBalance(),
CalculatedAt: resp.GetLastUpdated(),
PendingInbound: nil,
PendingOutbound: nil,
},
}, nil
}
func (c *connectorAdapter) SubmitOperation(ctx context.Context, req *connectorv1.SubmitOperationRequest) (*connectorv1.SubmitOperationResponse, error) {
if req == nil || req.GetOperation() == nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: operation is required", nil, "")}}, nil
}
op := req.GetOperation()
if strings.TrimSpace(op.GetIdempotencyKey()) == "" {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: idempotency_key is required", op, "")}}, nil
}
reader := params.New(op.GetParams())
orgRef := strings.TrimSpace(reader.String("organization_ref"))
if orgRef == "" {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "submit_operation: organization_ref is required", op, "")}}, nil
}
metadata := mergeMetadata(reader.StringMap("metadata"), "", "", op.GetCorrelationId(), op.GetParentIntentId())
description := strings.TrimSpace(reader.String("description"))
eventTime := parseEventTime(reader)
charges, err := parseLedgerCharges(reader)
if err != nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, err.Error(), op, "")}}, nil
}
switch op.GetType() {
case connectorv1.OperationType_CREDIT:
accountID := operationAccountID(op.GetTo())
if accountID == "" {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "credit: to.account is required", op, "")}}, nil
}
resp, err := c.svc.PostCreditWithCharges(ctx, &ledgerv1.PostCreditRequest{
IdempotencyKey: strings.TrimSpace(op.GetIdempotencyKey()),
OrganizationRef: orgRef,
LedgerAccountRef: accountID,
Money: op.GetMoney(),
Description: description,
Charges: charges,
Metadata: metadata,
EventTime: eventTime,
ContraLedgerAccountRef: strings.TrimSpace(reader.String("contra_ledger_account_ref")),
})
if err != nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, accountID)}}, nil
}
return &connectorv1.SubmitOperationResponse{Receipt: ledgerReceipt(resp.GetJournalEntryRef(), connectorv1.OperationStatus_CONFIRMED)}, nil
case connectorv1.OperationType_DEBIT:
accountID := operationAccountID(op.GetFrom())
if accountID == "" {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "debit: from.account is required", op, "")}}, nil
}
resp, err := c.svc.PostDebitWithCharges(ctx, &ledgerv1.PostDebitRequest{
IdempotencyKey: strings.TrimSpace(op.GetIdempotencyKey()),
OrganizationRef: orgRef,
LedgerAccountRef: accountID,
Money: op.GetMoney(),
Description: description,
Charges: charges,
Metadata: metadata,
EventTime: eventTime,
ContraLedgerAccountRef: strings.TrimSpace(reader.String("contra_ledger_account_ref")),
})
if err != nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, accountID)}}, nil
}
return &connectorv1.SubmitOperationResponse{Receipt: ledgerReceipt(resp.GetJournalEntryRef(), connectorv1.OperationStatus_CONFIRMED)}, nil
case connectorv1.OperationType_TRANSFER:
fromID := operationAccountID(op.GetFrom())
toID := operationAccountID(op.GetTo())
if fromID == "" || toID == "" {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "transfer: from.account and to.account are required", op, "")}}, nil
}
resp, err := c.svc.TransferInternal(ctx, &ledgerv1.TransferRequest{
IdempotencyKey: strings.TrimSpace(op.GetIdempotencyKey()),
OrganizationRef: orgRef,
FromLedgerAccountRef: fromID,
ToLedgerAccountRef: toID,
Money: op.GetMoney(),
Description: description,
Charges: charges,
Metadata: metadata,
EventTime: eventTime,
})
if err != nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, "")}}, nil
}
return &connectorv1.SubmitOperationResponse{Receipt: ledgerReceipt(resp.GetJournalEntryRef(), connectorv1.OperationStatus_CONFIRMED)}, nil
case connectorv1.OperationType_FX:
fromID := operationAccountID(op.GetFrom())
toID := operationAccountID(op.GetTo())
if fromID == "" || toID == "" {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, "fx: from.account and to.account are required", op, "")}}, nil
}
toMoney, err := parseMoneyFromMap(reader.Map("to_money"))
if err != nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_INVALID_PARAMS, err.Error(), op, "")}}, nil
}
resp, err := c.svc.ApplyFXWithCharges(ctx, &ledgerv1.FXRequest{
IdempotencyKey: strings.TrimSpace(op.GetIdempotencyKey()),
OrganizationRef: orgRef,
FromLedgerAccountRef: fromID,
ToLedgerAccountRef: toID,
FromMoney: op.GetMoney(),
ToMoney: toMoney,
Rate: strings.TrimSpace(reader.String("rate")),
Description: description,
Charges: charges,
Metadata: metadata,
EventTime: eventTime,
})
if err != nil {
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(mapErrorCode(err), err.Error(), op, "")}}, nil
}
return &connectorv1.SubmitOperationResponse{Receipt: ledgerReceipt(resp.GetJournalEntryRef(), connectorv1.OperationStatus_CONFIRMED)}, nil
default:
return &connectorv1.SubmitOperationResponse{Receipt: &connectorv1.OperationReceipt{Error: connectorError(connectorv1.ErrorCode_UNSUPPORTED_OPERATION, "submit_operation: unsupported operation type", op, "")}}, nil
}
}
func (c *connectorAdapter) GetOperation(ctx context.Context, req *connectorv1.GetOperationRequest) (*connectorv1.GetOperationResponse, error) {
if req == nil || strings.TrimSpace(req.GetOperationId()) == "" {
return nil, merrors.InvalidArgument("get_operation: operation_id is required")
}
entry, err := c.svc.GetJournalEntry(ctx, &ledgerv1.GetEntryRequest{EntryRef: strings.TrimSpace(req.GetOperationId())})
if err != nil {
return nil, err
}
return &connectorv1.GetOperationResponse{Operation: ledgerEntryToOperation(entry)}, nil
}
func (c *connectorAdapter) ListOperations(ctx context.Context, req *connectorv1.ListOperationsRequest) (*connectorv1.ListOperationsResponse, error) {
if req == nil || req.GetAccountRef() == nil || strings.TrimSpace(req.GetAccountRef().GetAccountId()) == "" {
return nil, merrors.InvalidArgument("list_operations: account_ref.account_id is required")
}
resp, err := c.svc.GetStatement(ctx, &ledgerv1.GetStatementRequest{
LedgerAccountRef: strings.TrimSpace(req.GetAccountRef().GetAccountId()),
Cursor: "",
Limit: 0,
})
if err != nil {
return nil, err
}
ops := make([]*connectorv1.Operation, 0, len(resp.GetEntries()))
for _, entry := range resp.GetEntries() {
ops = append(ops, ledgerEntryToOperation(entry))
}
return &connectorv1.ListOperationsResponse{Operations: ops}, nil
}
func ledgerOpenAccountParams() []*connectorv1.ParamSpec {
return []*connectorv1.ParamSpec{
{Key: "organization_ref", Type: connectorv1.ParamType_STRING, Required: true, Description: "Organization reference for the ledger account."},
{Key: "account_code", Type: connectorv1.ParamType_STRING, Required: true, Description: "Ledger account code."},
{Key: "account_type", Type: connectorv1.ParamType_STRING, Required: true, Description: "ASSET | LIABILITY | REVENUE | EXPENSE."},
{Key: "status", Type: connectorv1.ParamType_STRING, Required: false, Description: "ACTIVE | FROZEN."},
{Key: "allow_negative", Type: connectorv1.ParamType_BOOL, Required: false, Description: "Allow negative balance."},
{Key: "is_settlement", Type: connectorv1.ParamType_BOOL, Required: false, Description: "Mark account as settlement."},
{Key: "metadata", Type: connectorv1.ParamType_JSON, Required: false, Description: "Additional metadata map."},
}
}
func ledgerOperationParams() []*connectorv1.OperationParamSpec {
common := []*connectorv1.ParamSpec{
{Key: "organization_ref", Type: connectorv1.ParamType_STRING, Required: true, Description: "Organization reference."},
{Key: "description", Type: connectorv1.ParamType_STRING, Required: false, Description: "Ledger entry description."},
{Key: "metadata", Type: connectorv1.ParamType_JSON, Required: false, Description: "Entry metadata map."},
{Key: "charges", Type: connectorv1.ParamType_JSON, Required: false, Description: "Posting line charges."},
{Key: "event_time", Type: connectorv1.ParamType_STRING, Required: false, Description: "RFC3339 timestamp."},
}
return []*connectorv1.OperationParamSpec{
{OperationType: connectorv1.OperationType_CREDIT, Params: append(common, &connectorv1.ParamSpec{Key: "contra_ledger_account_ref", Type: connectorv1.ParamType_STRING, Required: false})},
{OperationType: connectorv1.OperationType_DEBIT, Params: append(common, &connectorv1.ParamSpec{Key: "contra_ledger_account_ref", Type: connectorv1.ParamType_STRING, Required: false})},
{OperationType: connectorv1.OperationType_TRANSFER, Params: common},
{OperationType: connectorv1.OperationType_FX, Params: append(common,
&connectorv1.ParamSpec{Key: "to_money", Type: connectorv1.ParamType_JSON, Required: true, Description: "Target amount {amount,currency}."},
&connectorv1.ParamSpec{Key: "rate", Type: connectorv1.ParamType_STRING, Required: false, Description: "FX rate snapshot."},
)},
}
}
func ledgerAccountToConnector(account *ledgerv1.LedgerAccount) *connectorv1.Account {
if account == nil {
return nil
}
details, _ := structpb.NewStruct(map[string]interface{}{
"account_code": account.GetAccountCode(),
"account_type": account.GetAccountType().String(),
"status": account.GetStatus().String(),
"allow_negative": account.GetAllowNegative(),
"is_settlement": account.GetIsSettlement(),
})
return &connectorv1.Account{
Ref: &connectorv1.AccountRef{
ConnectorId: ledgerConnectorID,
AccountId: strings.TrimSpace(account.GetLedgerAccountRef()),
},
Kind: connectorv1.AccountKind_LEDGER_ACCOUNT,
Asset: strings.TrimSpace(account.GetCurrency()),
State: ledgerAccountState(account.GetStatus()),
Label: strings.TrimSpace(account.GetAccountCode()),
OwnerRef: strings.TrimSpace(account.GetOrganizationRef()),
ProviderDetails: details,
CreatedAt: account.GetCreatedAt(),
UpdatedAt: account.GetUpdatedAt(),
}
}
func ledgerAccountState(status ledgerv1.AccountStatus) connectorv1.AccountState {
switch status {
case ledgerv1.AccountStatus_ACCOUNT_STATUS_ACTIVE:
return connectorv1.AccountState_ACCOUNT_ACTIVE
case ledgerv1.AccountStatus_ACCOUNT_STATUS_FROZEN:
return connectorv1.AccountState_ACCOUNT_SUSPENDED
default:
return connectorv1.AccountState_ACCOUNT_STATE_UNSPECIFIED
}
}
func ledgerReceipt(ref string, status connectorv1.OperationStatus) *connectorv1.OperationReceipt {
return &connectorv1.OperationReceipt{
OperationId: strings.TrimSpace(ref),
Status: status,
ProviderRef: strings.TrimSpace(ref),
}
}
func ledgerEntryToOperation(entry *ledgerv1.JournalEntryResponse) *connectorv1.Operation {
if entry == nil {
return nil
}
op := &connectorv1.Operation{
OperationId: strings.TrimSpace(entry.GetEntryRef()),
Type: ledgerEntryType(entry.GetEntryType()),
Status: connectorv1.OperationStatus_CONFIRMED,
CreatedAt: entry.GetEventTime(),
UpdatedAt: entry.GetEventTime(),
}
mainLines := ledgerMainLines(entry.GetLines())
if len(mainLines) > 0 {
op.Money = mainLines[0].GetMoney()
}
switch op.Type {
case connectorv1.OperationType_CREDIT:
if len(mainLines) > 0 {
op.To = &connectorv1.OperationParty{Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{ConnectorId: ledgerConnectorID, AccountId: mainLines[0].GetLedgerAccountRef()}}}
}
case connectorv1.OperationType_DEBIT:
if len(mainLines) > 0 {
op.From = &connectorv1.OperationParty{Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{ConnectorId: ledgerConnectorID, AccountId: mainLines[0].GetLedgerAccountRef()}}}
}
case connectorv1.OperationType_TRANSFER, connectorv1.OperationType_FX:
if len(mainLines) > 0 {
op.From = &connectorv1.OperationParty{Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{ConnectorId: ledgerConnectorID, AccountId: mainLines[0].GetLedgerAccountRef()}}}
}
if len(mainLines) > 1 {
op.To = &connectorv1.OperationParty{Ref: &connectorv1.OperationParty_Account{Account: &connectorv1.AccountRef{ConnectorId: ledgerConnectorID, AccountId: mainLines[1].GetLedgerAccountRef()}}}
}
}
return op
}
func ledgerMainLines(lines []*ledgerv1.PostingLine) []*ledgerv1.PostingLine {
if len(lines) == 0 {
return nil
}
result := make([]*ledgerv1.PostingLine, 0, len(lines))
for _, line := range lines {
if line == nil {
continue
}
if line.GetLineType() == ledgerv1.LineType_LINE_MAIN {
result = append(result, line)
}
}
return result
}
func ledgerEntryType(entryType ledgerv1.EntryType) connectorv1.OperationType {
switch entryType {
case ledgerv1.EntryType_ENTRY_CREDIT:
return connectorv1.OperationType_CREDIT
case ledgerv1.EntryType_ENTRY_DEBIT:
return connectorv1.OperationType_DEBIT
case ledgerv1.EntryType_ENTRY_TRANSFER:
return connectorv1.OperationType_TRANSFER
case ledgerv1.EntryType_ENTRY_FX:
return connectorv1.OperationType_FX
default:
return connectorv1.OperationType_OPERATION_TYPE_UNSPECIFIED
}
}
func operationAccountID(party *connectorv1.OperationParty) string {
if party == nil {
return ""
}
if account := party.GetAccount(); account != nil {
return strings.TrimSpace(account.GetAccountId())
}
return ""
}
func parseLedgerAccountType(reader params.Reader, key string) (ledgerv1.AccountType, error) {
value, ok := reader.Value(key)
if !ok {
return ledgerv1.AccountType_ACCOUNT_TYPE_UNSPECIFIED, fmt.Errorf("open_account: account_type is required")
}
switch v := value.(type) {
case string:
return parseLedgerAccountTypeString(v)
case float64:
return ledgerv1.AccountType(int32(v)), nil
case int:
return ledgerv1.AccountType(v), nil
case int64:
return ledgerv1.AccountType(v), nil
default:
return ledgerv1.AccountType_ACCOUNT_TYPE_UNSPECIFIED, fmt.Errorf("open_account: account_type is required")
}
}
func parseLedgerAccountTypeString(value string) (ledgerv1.AccountType, error) {
switch strings.ToUpper(strings.TrimSpace(value)) {
case "ACCOUNT_TYPE_ASSET", "ASSET":
return ledgerv1.AccountType_ACCOUNT_TYPE_ASSET, nil
case "ACCOUNT_TYPE_LIABILITY", "LIABILITY":
return ledgerv1.AccountType_ACCOUNT_TYPE_LIABILITY, nil
case "ACCOUNT_TYPE_REVENUE", "REVENUE":
return ledgerv1.AccountType_ACCOUNT_TYPE_REVENUE, nil
case "ACCOUNT_TYPE_EXPENSE", "EXPENSE":
return ledgerv1.AccountType_ACCOUNT_TYPE_EXPENSE, nil
default:
return ledgerv1.AccountType_ACCOUNT_TYPE_UNSPECIFIED, fmt.Errorf("open_account: invalid account_type")
}
}
func parseLedgerAccountStatus(reader params.Reader, key string) ledgerv1.AccountStatus {
value := strings.ToUpper(strings.TrimSpace(reader.String(key)))
switch value {
case "ACCOUNT_STATUS_ACTIVE", "ACTIVE":
return ledgerv1.AccountStatus_ACCOUNT_STATUS_ACTIVE
case "ACCOUNT_STATUS_FROZEN", "FROZEN":
return ledgerv1.AccountStatus_ACCOUNT_STATUS_FROZEN
default:
return ledgerv1.AccountStatus_ACCOUNT_STATUS_UNSPECIFIED
}
}
func parseEventTime(reader params.Reader) *timestamppb.Timestamp {
raw := strings.TrimSpace(reader.String("event_time"))
if raw == "" {
return nil
}
parsed, err := time.Parse(time.RFC3339Nano, raw)
if err != nil {
return nil
}
return timestamppb.New(parsed)
}
func parseLedgerCharges(reader params.Reader) ([]*ledgerv1.PostingLine, error) {
items := reader.List("charges")
if len(items) == 0 {
return nil, nil
}
result := make([]*ledgerv1.PostingLine, 0, len(items))
for i, item := range items {
raw, ok := item.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("charges[%d]: invalid charge entry", i)
}
accountRef := strings.TrimSpace(fmt.Sprint(raw["ledger_account_ref"]))
if accountRef == "" {
return nil, fmt.Errorf("charges[%d]: ledger_account_ref is required", i)
}
money, err := parseMoneyFromMap(raw)
if err != nil {
return nil, fmt.Errorf("charges[%d]: %w", i, err)
}
lineType := parseLedgerLineType(fmt.Sprint(raw["line_type"]))
result = append(result, &ledgerv1.PostingLine{
LedgerAccountRef: accountRef,
Money: money,
LineType: lineType,
})
}
return result, nil
}
func parseLedgerLineType(value string) ledgerv1.LineType {
switch strings.ToUpper(strings.TrimSpace(value)) {
case "LINE_TYPE_FEE", "FEE":
return ledgerv1.LineType_LINE_FEE
case "LINE_TYPE_SPREAD", "SPREAD":
return ledgerv1.LineType_LINE_SPREAD
case "LINE_TYPE_REVERSAL", "REVERSAL":
return ledgerv1.LineType_LINE_REVERSAL
default:
return ledgerv1.LineType_LINE_FEE
}
}
func parseMoneyFromMap(raw map[string]interface{}) (*moneyv1.Money, error) {
if raw == nil {
return nil, fmt.Errorf("money is required")
}
amount := strings.TrimSpace(fmt.Sprint(raw["amount"]))
currency := strings.TrimSpace(fmt.Sprint(raw["currency"]))
if amount == "" || currency == "" {
return nil, fmt.Errorf("money is required")
}
return &moneyv1.Money{
Amount: amount,
Currency: currency,
}, nil
}
func mergeMetadata(base map[string]string, label, ownerRef, correlationID, parentIntentID string) map[string]string {
metadata := map[string]string{}
for k, v := range base {
metadata[strings.TrimSpace(k)] = strings.TrimSpace(v)
}
if label != "" {
if _, ok := metadata["label"]; !ok {
metadata["label"] = label
}
}
if ownerRef != "" {
if _, ok := metadata["owner_ref"]; !ok {
metadata["owner_ref"] = ownerRef
}
}
if correlationID != "" {
metadata["correlation_id"] = correlationID
}
if parentIntentID != "" {
metadata["parent_intent_id"] = parentIntentID
}
if len(metadata) == 0 {
return nil
}
return metadata
}
func connectorError(code connectorv1.ErrorCode, message string, op *connectorv1.Operation, accountID string) *connectorv1.ConnectorError {
err := &connectorv1.ConnectorError{
Code: code,
Message: strings.TrimSpace(message),
AccountId: strings.TrimSpace(accountID),
}
if op != nil {
err.CorrelationId = strings.TrimSpace(op.GetCorrelationId())
err.ParentIntentId = strings.TrimSpace(op.GetParentIntentId())
err.OperationId = strings.TrimSpace(op.GetOperationId())
}
return err
}
func mapErrorCode(err error) connectorv1.ErrorCode {
switch {
case errors.Is(err, merrors.ErrInvalidArg):
return connectorv1.ErrorCode_INVALID_PARAMS
case errors.Is(err, merrors.ErrNoData):
return connectorv1.ErrorCode_NOT_FOUND
case errors.Is(err, merrors.ErrNotImplemented):
return connectorv1.ErrorCode_UNSUPPORTED_OPERATION
case errors.Is(err, merrors.ErrInternal):
return connectorv1.ErrorCode_TEMPORARY_UNAVAILABLE
default:
return connectorv1.ErrorCode_PROVIDER_ERROR
}
}

View File

@@ -24,7 +24,7 @@ import (
pmessaging "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
unifiedv1 "github.com/tech/sendico/pkg/proto/gateway/unified/v1"
connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1"
ledgerv1 "github.com/tech/sendico/pkg/proto/ledger/v1"
)
@@ -50,7 +50,6 @@ type Service struct {
cancel context.CancelFunc
publisher *outboxPublisher
}
unifiedv1.UnimplementedUnifiedGatewayServiceServer
}
type feesDependency struct {
@@ -83,7 +82,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.
func (s *Service) Register(router routers.GRPC) error {
return router.Register(func(reg grpc.ServiceRegistrar) {
unifiedv1.RegisterUnifiedGatewayServiceServer(reg, s)
connectorv1.RegisterConnectorServiceServer(reg, newConnectorAdapter(s))
})
}