458 lines
14 KiB
Go
458 lines
14 KiB
Go
package treasury
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/big"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/gateway/tgsettle/internal/service/treasury/ledger"
|
|
"github.com/tech/sendico/gateway/tgsettle/storage"
|
|
storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var ErrActiveTreasuryRequest = errors.New("active treasury request exists")
|
|
|
|
type CreateRequestInput struct {
|
|
OperationType storagemodel.TreasuryOperationType
|
|
TelegramUserID string
|
|
LedgerAccountID string
|
|
ChatID string
|
|
Amount string
|
|
}
|
|
|
|
type AccountProfile struct {
|
|
AccountID string
|
|
AccountCode string
|
|
Currency string
|
|
}
|
|
|
|
type ExecutionResult struct {
|
|
Request *storagemodel.TreasuryRequest
|
|
NewBalance *ledger.Balance
|
|
ExecutionError error
|
|
}
|
|
|
|
type Service struct {
|
|
logger mlogger.Logger
|
|
repo storage.TreasuryRequestsStore
|
|
ledger ledger.Client
|
|
|
|
validator *Validator
|
|
executionDelay time.Duration
|
|
}
|
|
|
|
func NewService(
|
|
logger mlogger.Logger,
|
|
repo storage.TreasuryRequestsStore,
|
|
ledgerClient ledger.Client,
|
|
executionDelay time.Duration,
|
|
maxPerOperation string,
|
|
maxDaily string,
|
|
) (*Service, error) {
|
|
if logger == nil {
|
|
return nil, merrors.InvalidArgument("logger is required", "logger")
|
|
}
|
|
if repo == nil {
|
|
return nil, merrors.InvalidArgument("treasury repository is required", "repo")
|
|
}
|
|
if ledgerClient == nil {
|
|
return nil, merrors.InvalidArgument("ledger client is required", "ledger_client")
|
|
}
|
|
if executionDelay <= 0 {
|
|
executionDelay = 30 * time.Second
|
|
}
|
|
validator, err := NewValidator(repo, maxPerOperation, maxDaily)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Service{
|
|
logger: logger.Named("treasury_service"),
|
|
repo: repo,
|
|
ledger: ledgerClient,
|
|
validator: validator,
|
|
executionDelay: executionDelay,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) ExecutionDelay() time.Duration {
|
|
if s == nil {
|
|
return 0
|
|
}
|
|
return s.executionDelay
|
|
}
|
|
|
|
func (s *Service) MaxPerOperationLimit() string {
|
|
if s == nil || s.validator == nil {
|
|
return ""
|
|
}
|
|
return s.validator.MaxPerOperation()
|
|
}
|
|
|
|
func (s *Service) GetActiveRequestForAccount(ctx context.Context, ledgerAccountID string) (*storagemodel.TreasuryRequest, error) {
|
|
if s == nil || s.repo == nil {
|
|
return nil, merrors.Internal("treasury service unavailable")
|
|
}
|
|
return s.repo.FindActiveByLedgerAccountID(ctx, ledgerAccountID)
|
|
}
|
|
|
|
func (s *Service) GetRequest(ctx context.Context, requestID string) (*storagemodel.TreasuryRequest, error) {
|
|
if s == nil || s.repo == nil {
|
|
return nil, merrors.Internal("treasury service unavailable")
|
|
}
|
|
return s.repo.FindByRequestID(ctx, requestID)
|
|
}
|
|
|
|
func (s *Service) GetAccountProfile(ctx context.Context, ledgerAccountID string) (*AccountProfile, error) {
|
|
if s == nil || s.ledger == nil {
|
|
return nil, merrors.Internal("treasury service unavailable")
|
|
}
|
|
ledgerAccountID = strings.TrimSpace(ledgerAccountID)
|
|
if ledgerAccountID == "" {
|
|
return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id")
|
|
}
|
|
|
|
account, err := s.ledger.GetAccount(ctx, ledgerAccountID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if account == nil {
|
|
return nil, merrors.NoData("ledger account not found")
|
|
}
|
|
return &AccountProfile{
|
|
AccountID: ledgerAccountID,
|
|
AccountCode: resolveAccountCode(account, ledgerAccountID),
|
|
Currency: strings.ToUpper(strings.TrimSpace(account.Currency)),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) CreateRequest(ctx context.Context, input CreateRequestInput) (*storagemodel.TreasuryRequest, error) {
|
|
if s == nil || s.repo == nil || s.ledger == nil || s.validator == nil {
|
|
return nil, merrors.Internal("treasury service unavailable")
|
|
}
|
|
input.TelegramUserID = strings.TrimSpace(input.TelegramUserID)
|
|
input.LedgerAccountID = strings.TrimSpace(input.LedgerAccountID)
|
|
input.ChatID = strings.TrimSpace(input.ChatID)
|
|
input.Amount = strings.TrimSpace(input.Amount)
|
|
|
|
switch input.OperationType {
|
|
case storagemodel.TreasuryOperationFund, storagemodel.TreasuryOperationWithdraw:
|
|
default:
|
|
return nil, merrors.InvalidArgument("treasury operation is invalid", "operation_type")
|
|
}
|
|
if input.TelegramUserID == "" {
|
|
return nil, merrors.InvalidArgument("telegram_user_id is required", "telegram_user_id")
|
|
}
|
|
if input.LedgerAccountID == "" {
|
|
return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id")
|
|
}
|
|
if input.ChatID == "" {
|
|
return nil, merrors.InvalidArgument("chat_id is required", "chat_id")
|
|
}
|
|
|
|
active, err := s.repo.FindActiveByLedgerAccountID(ctx, input.LedgerAccountID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if active != nil {
|
|
return active, ErrActiveTreasuryRequest
|
|
}
|
|
|
|
amountRat, normalizedAmount, err := s.validator.ValidateAmount(input.Amount)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := s.validator.ValidateDailyLimit(ctx, input.LedgerAccountID, amountRat, time.Now()); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
account, err := s.ledger.GetAccount(ctx, input.LedgerAccountID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if account == nil || strings.TrimSpace(account.Currency) == "" {
|
|
return nil, merrors.Internal("ledger account currency is unavailable")
|
|
}
|
|
if strings.TrimSpace(account.OrganizationRef) == "" {
|
|
return nil, merrors.Internal("ledger account organization is unavailable")
|
|
}
|
|
|
|
requestID := newRequestID()
|
|
record := &storagemodel.TreasuryRequest{
|
|
RequestID: requestID,
|
|
OperationType: input.OperationType,
|
|
TelegramUserID: input.TelegramUserID,
|
|
LedgerAccountID: input.LedgerAccountID,
|
|
LedgerAccountCode: resolveAccountCode(account, input.LedgerAccountID),
|
|
OrganizationRef: account.OrganizationRef,
|
|
ChatID: input.ChatID,
|
|
Amount: normalizedAmount,
|
|
Currency: strings.ToUpper(strings.TrimSpace(account.Currency)),
|
|
Status: storagemodel.TreasuryRequestStatusCreated,
|
|
IdempotencyKey: fmt.Sprintf("tgsettle:%s", requestID),
|
|
Active: true,
|
|
}
|
|
if err := s.repo.Create(ctx, record); err != nil {
|
|
if errors.Is(err, storage.ErrDuplicate) {
|
|
active, fetchErr := s.repo.FindActiveByLedgerAccountID(ctx, input.LedgerAccountID)
|
|
if fetchErr != nil {
|
|
return nil, fetchErr
|
|
}
|
|
if active != nil {
|
|
return active, ErrActiveTreasuryRequest
|
|
}
|
|
return nil, err
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
s.logRequest(record, "created", nil)
|
|
return record, nil
|
|
}
|
|
|
|
func (s *Service) ConfirmRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) {
|
|
requestID = strings.TrimSpace(requestID)
|
|
telegramUserID = strings.TrimSpace(telegramUserID)
|
|
if requestID == "" {
|
|
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
|
}
|
|
record, err := s.repo.FindByRequestID(ctx, requestID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if record == nil {
|
|
return nil, merrors.NoData("treasury request not found")
|
|
}
|
|
if telegramUserID != "" && record.TelegramUserID != telegramUserID {
|
|
return nil, merrors.Unauthorized("treasury request ownership mismatch")
|
|
}
|
|
|
|
switch record.Status {
|
|
case storagemodel.TreasuryRequestStatusScheduled:
|
|
return record, nil
|
|
case storagemodel.TreasuryRequestStatusCreated, storagemodel.TreasuryRequestStatusConfirmed:
|
|
now := time.Now()
|
|
record.ConfirmedAt = now
|
|
record.ScheduledAt = now.Add(s.executionDelay)
|
|
record.Status = storagemodel.TreasuryRequestStatusScheduled
|
|
record.Active = true
|
|
record.ErrorMessage = ""
|
|
default:
|
|
return nil, merrors.InvalidArgument("treasury request cannot be confirmed in current status", "status")
|
|
}
|
|
if err := s.repo.Update(ctx, record); err != nil {
|
|
return nil, err
|
|
}
|
|
s.logRequest(record, "scheduled", nil)
|
|
return record, nil
|
|
}
|
|
|
|
func (s *Service) CancelRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) {
|
|
requestID = strings.TrimSpace(requestID)
|
|
telegramUserID = strings.TrimSpace(telegramUserID)
|
|
if requestID == "" {
|
|
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
|
}
|
|
record, err := s.repo.FindByRequestID(ctx, requestID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if record == nil {
|
|
return nil, merrors.NoData("treasury request not found")
|
|
}
|
|
if telegramUserID != "" && record.TelegramUserID != telegramUserID {
|
|
return nil, merrors.Unauthorized("treasury request ownership mismatch")
|
|
}
|
|
|
|
switch record.Status {
|
|
case storagemodel.TreasuryRequestStatusCancelled:
|
|
return record, nil
|
|
case storagemodel.TreasuryRequestStatusCreated, storagemodel.TreasuryRequestStatusConfirmed, storagemodel.TreasuryRequestStatusScheduled:
|
|
record.Status = storagemodel.TreasuryRequestStatusCancelled
|
|
record.CancelledAt = time.Now()
|
|
record.Active = false
|
|
default:
|
|
return nil, merrors.InvalidArgument("treasury request cannot be cancelled in current status", "status")
|
|
}
|
|
|
|
if err := s.repo.Update(ctx, record); err != nil {
|
|
return nil, err
|
|
}
|
|
s.logRequest(record, "cancelled", nil)
|
|
return record, nil
|
|
}
|
|
|
|
func (s *Service) ExecuteRequest(ctx context.Context, requestID string) (*ExecutionResult, error) {
|
|
requestID = strings.TrimSpace(requestID)
|
|
if requestID == "" {
|
|
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
|
}
|
|
record, err := s.repo.FindByRequestID(ctx, requestID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if record == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
switch record.Status {
|
|
case storagemodel.TreasuryRequestStatusExecuted,
|
|
storagemodel.TreasuryRequestStatusCancelled,
|
|
storagemodel.TreasuryRequestStatusFailed:
|
|
return nil, nil
|
|
case storagemodel.TreasuryRequestStatusScheduled:
|
|
claimed, err := s.repo.ClaimScheduled(ctx, requestID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !claimed {
|
|
return nil, nil
|
|
}
|
|
record, err = s.repo.FindByRequestID(ctx, requestID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if record == nil {
|
|
return nil, nil
|
|
}
|
|
}
|
|
|
|
if record.Status != storagemodel.TreasuryRequestStatusConfirmed {
|
|
return nil, nil
|
|
}
|
|
return s.executeClaimed(ctx, record)
|
|
}
|
|
|
|
func (s *Service) executeClaimed(ctx context.Context, record *storagemodel.TreasuryRequest) (*ExecutionResult, error) {
|
|
if record == nil {
|
|
return nil, merrors.InvalidArgument("treasury request is required", "request")
|
|
}
|
|
postReq := ledger.PostRequest{
|
|
AccountID: record.LedgerAccountID,
|
|
OrganizationRef: record.OrganizationRef,
|
|
Amount: record.Amount,
|
|
Currency: record.Currency,
|
|
Reference: record.RequestID,
|
|
IdempotencyKey: record.IdempotencyKey,
|
|
}
|
|
|
|
var (
|
|
opResult *ledger.OperationResult
|
|
err error
|
|
)
|
|
switch record.OperationType {
|
|
case storagemodel.TreasuryOperationFund:
|
|
opResult, err = s.ledger.ExternalCredit(ctx, postReq)
|
|
case storagemodel.TreasuryOperationWithdraw:
|
|
opResult, err = s.ledger.ExternalDebit(ctx, postReq)
|
|
default:
|
|
err = merrors.InvalidArgument("treasury operation is invalid", "operation_type")
|
|
}
|
|
now := time.Now()
|
|
if err != nil {
|
|
record.Status = storagemodel.TreasuryRequestStatusFailed
|
|
record.Active = false
|
|
record.ExecutedAt = now
|
|
record.ErrorMessage = strings.TrimSpace(err.Error())
|
|
if saveErr := s.repo.Update(ctx, record); saveErr != nil {
|
|
return nil, saveErr
|
|
}
|
|
s.logRequest(record, "failed", err)
|
|
return &ExecutionResult{
|
|
Request: record,
|
|
ExecutionError: err,
|
|
}, nil
|
|
}
|
|
|
|
if opResult != nil {
|
|
record.LedgerReference = strings.TrimSpace(opResult.Reference)
|
|
}
|
|
record.Status = storagemodel.TreasuryRequestStatusExecuted
|
|
record.Active = false
|
|
record.ExecutedAt = now
|
|
record.ErrorMessage = ""
|
|
|
|
balance, balanceErr := s.ledger.GetBalance(ctx, record.LedgerAccountID)
|
|
if balanceErr != nil {
|
|
record.ErrorMessage = strings.TrimSpace(balanceErr.Error())
|
|
}
|
|
|
|
if saveErr := s.repo.Update(ctx, record); saveErr != nil {
|
|
return nil, saveErr
|
|
}
|
|
s.logRequest(record, "executed", nil)
|
|
return &ExecutionResult{
|
|
Request: record,
|
|
NewBalance: balance,
|
|
ExecutionError: balanceErr,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) DueRequests(ctx context.Context, statuses []storagemodel.TreasuryRequestStatus, now time.Time, limit int64) ([]storagemodel.TreasuryRequest, error) {
|
|
if s == nil || s.repo == nil {
|
|
return nil, merrors.Internal("treasury service unavailable")
|
|
}
|
|
return s.repo.FindDueByStatus(ctx, statuses, now, limit)
|
|
}
|
|
|
|
func (s *Service) ScheduledRequests(ctx context.Context, limit int64) ([]storagemodel.TreasuryRequest, error) {
|
|
if s == nil || s.repo == nil {
|
|
return nil, merrors.Internal("treasury service unavailable")
|
|
}
|
|
return s.repo.FindDueByStatus(
|
|
ctx,
|
|
[]storagemodel.TreasuryRequestStatus{storagemodel.TreasuryRequestStatusScheduled},
|
|
time.Now().Add(10*365*24*time.Hour),
|
|
limit,
|
|
)
|
|
}
|
|
|
|
func (s *Service) ParseAmount(value string) (*big.Rat, error) {
|
|
return parseAmountRat(value)
|
|
}
|
|
|
|
func (s *Service) logRequest(record *storagemodel.TreasuryRequest, status string, err error) {
|
|
if s == nil || s.logger == nil || record == nil {
|
|
return
|
|
}
|
|
fields := []zap.Field{
|
|
zap.String("request_id", strings.TrimSpace(record.RequestID)),
|
|
zap.String("telegram_user_id", strings.TrimSpace(record.TelegramUserID)),
|
|
zap.String("ledger_account_id", strings.TrimSpace(record.LedgerAccountID)),
|
|
zap.String("ledger_account_code", strings.TrimSpace(record.LedgerAccountCode)),
|
|
zap.String("chat_id", strings.TrimSpace(record.ChatID)),
|
|
zap.String("operation_type", strings.TrimSpace(string(record.OperationType))),
|
|
zap.String("amount", strings.TrimSpace(record.Amount)),
|
|
zap.String("currency", strings.TrimSpace(record.Currency)),
|
|
zap.String("status", status),
|
|
zap.String("ledger_reference", strings.TrimSpace(record.LedgerReference)),
|
|
zap.String("error_message", strings.TrimSpace(record.ErrorMessage)),
|
|
}
|
|
if err != nil {
|
|
fields = append(fields, zap.Error(err))
|
|
}
|
|
s.logger.Info("treasury_request", fields...)
|
|
}
|
|
|
|
func newRequestID() string {
|
|
return "TG-TREASURY-" + strings.ToUpper(bson.NewObjectID().Hex())
|
|
}
|
|
|
|
func resolveAccountCode(account *ledger.Account, fallbackAccountID string) string {
|
|
if account != nil {
|
|
if code := strings.TrimSpace(account.AccountCode); code != "" {
|
|
return code
|
|
}
|
|
if code := strings.TrimSpace(account.AccountID); code != "" {
|
|
return code
|
|
}
|
|
}
|
|
return strings.TrimSpace(fallbackAccountID)
|
|
}
|