From b6f05f52dcfef0cec15e577c7597443479c229a3 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Wed, 4 Mar 2026 20:01:37 +0100 Subject: [PATCH] Treasury bot + ledger fix --- api/gateway/tgsettle/config.dev.yml | 12 + api/gateway/tgsettle/config.yml | 14 + .../internal/server/internal/serverimp.go | 122 +++++- .../service/gateway/confirmation_flow.go | 11 + .../internal/service/gateway/service.go | 116 +++++ .../internal/service/gateway/service_test.go | 5 + .../internal/service/treasury/bot/dialogs.go | 73 ++++ .../internal/service/treasury/bot/router.go | 366 ++++++++++++++++ .../service/treasury/bot/router_test.go | 158 +++++++ .../internal/service/treasury/config.go | 19 + .../service/treasury/ledger/client.go | 287 ++++++++++++ .../treasury/ledger/discovery_client.go | 235 ++++++++++ .../internal/service/treasury/module.go | 148 +++++++ .../internal/service/treasury/scheduler.go | 261 +++++++++++ .../internal/service/treasury/service.go | 411 ++++++++++++++++++ .../internal/service/treasury/validator.go | 181 ++++++++ .../tgsettle/storage/model/storable.go | 5 + .../tgsettle/storage/model/treasury.go | 50 +++ .../tgsettle/storage/mongo/repository.go | 11 + .../storage/mongo/store/treasury_requests.go | 311 +++++++++++++ api/gateway/tgsettle/storage/storage.go | 11 + api/ledger/storage/mongo/store/accounts.go | 55 ++- 22 files changed, 2844 insertions(+), 18 deletions(-) create mode 100644 api/gateway/tgsettle/internal/service/treasury/bot/dialogs.go create mode 100644 api/gateway/tgsettle/internal/service/treasury/bot/router.go create mode 100644 api/gateway/tgsettle/internal/service/treasury/bot/router_test.go create mode 100644 api/gateway/tgsettle/internal/service/treasury/config.go create mode 100644 api/gateway/tgsettle/internal/service/treasury/ledger/client.go create mode 100644 api/gateway/tgsettle/internal/service/treasury/ledger/discovery_client.go create mode 100644 api/gateway/tgsettle/internal/service/treasury/module.go create mode 100644 api/gateway/tgsettle/internal/service/treasury/scheduler.go create mode 100644 api/gateway/tgsettle/internal/service/treasury/service.go create mode 100644 api/gateway/tgsettle/internal/service/treasury/validator.go create mode 100644 api/gateway/tgsettle/storage/model/treasury.go create mode 100644 api/gateway/tgsettle/storage/mongo/store/treasury_requests.go diff --git a/api/gateway/tgsettle/config.dev.yml b/api/gateway/tgsettle/config.dev.yml index 050b9d5d..d2325f63 100644 --- a/api/gateway/tgsettle/config.dev.yml +++ b/api/gateway/tgsettle/config.dev.yml @@ -41,3 +41,15 @@ gateway: timeout_seconds: 345600 accepted_user_ids: [] success_reaction: "\U0001FAE1" + +treasury: + execution_delay: 60s + poll_interval: 60s + telegram: + allowed_chats: [] + users: [] + ledger: + timeout: 5s + limits: + max_amount_per_operation: "1000000" + max_daily_amount: "5000000" diff --git a/api/gateway/tgsettle/config.yml b/api/gateway/tgsettle/config.yml index 7e736332..7fbc8522 100644 --- a/api/gateway/tgsettle/config.yml +++ b/api/gateway/tgsettle/config.yml @@ -41,3 +41,17 @@ gateway: timeout_seconds: 345600 accepted_user_ids: [] success_reaction: "\U0001FAE1" + +treasury: + execution_delay: 60s + poll_interval: 60s + ledger: + timeout: 5s + limits: + max_amount_per_operation: "" + max_daily_amount: "" + telegram: + allowed_chats: [] + users: + - telegram_user_id: "8273799472" + - ledger_account: "6972c738949b91ea0395e5fb" diff --git a/api/gateway/tgsettle/internal/server/internal/serverimp.go b/api/gateway/tgsettle/internal/server/internal/serverimp.go index 2cfe94f2..cc9a2f72 100644 --- a/api/gateway/tgsettle/internal/server/internal/serverimp.go +++ b/api/gateway/tgsettle/internal/server/internal/serverimp.go @@ -3,6 +3,7 @@ package serverimp import ( "context" "os" + "strings" "time" "github.com/tech/sendico/gateway/tgsettle/internal/service/gateway" @@ -28,11 +29,17 @@ type Imp struct { config *config app *grpcapp.App[storage.Repository] service *gateway.Service + + discoveryWatcher *discovery.RegistryWatcher + discoveryReg *discovery.Registry } type config struct { *grpcapp.Config `yaml:",inline"` - Gateway gatewayConfig `yaml:"gateway"` + Gateway gatewayConfig `yaml:"gateway"` + Treasury treasuryConfig `yaml:"treasury"` + Ledger ledgerConfig `yaml:"ledger"` // deprecated: use treasury.ledger + Telegram telegramConfig `yaml:"telegram"` // deprecated: use treasury.telegram } type gatewayConfig struct { @@ -43,6 +50,33 @@ type gatewayConfig struct { SuccessReaction string `yaml:"success_reaction"` } +type telegramConfig struct { + AllowedChats []string `yaml:"allowed_chats"` + Users []telegramUserConfig `yaml:"users"` +} + +type telegramUserConfig struct { + TelegramUserID string `yaml:"telegram_user_id"` + LedgerAccount string `yaml:"ledger_account"` +} + +type treasuryConfig struct { + ExecutionDelay time.Duration `yaml:"execution_delay"` + PollInterval time.Duration `yaml:"poll_interval"` + Telegram telegramConfig `yaml:"telegram"` + Ledger ledgerConfig `yaml:"ledger"` + Limits treasuryLimitsConfig `yaml:"limits"` +} + +type treasuryLimitsConfig struct { + MaxAmountPerOperation string `yaml:"max_amount_per_operation"` + MaxDailyAmount string `yaml:"max_daily_amount"` +} + +type ledgerConfig struct { + Timeout time.Duration `yaml:"timeout"` +} + func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) { return &Imp{ logger: logger.Named("server"), @@ -62,6 +96,9 @@ func (i *Imp) Shutdown() { if i.service != nil { i.service.Shutdown() } + if i.discoveryWatcher != nil { + i.discoveryWatcher.Stop() + } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() i.app.Shutdown(ctx) @@ -81,6 +118,19 @@ func (i *Imp) Start() error { i.logger.Warn("Failed to create messaging broker", zap.Error(err)) } } + if broker != nil { + registry := discovery.NewRegistry() + watcher, watcherErr := discovery.NewRegistryWatcher(i.logger, broker, registry) + if watcherErr != nil { + i.logger.Warn("Failed to initialise discovery registry watcher", zap.Error(watcherErr)) + } else if startErr := watcher.Start(); startErr != nil { + i.logger.Warn("Failed to start discovery registry watcher", zap.Error(startErr)) + } else { + i.discoveryWatcher = watcher + i.discoveryReg = registry + i.logger.Info("Discovery registry watcher started") + } + } repoFactory := func(logger mlogger.Logger, conn *db.MongoConnection) (storage.Repository, error) { return gatewaymongo.New(logger, conn) @@ -95,6 +145,8 @@ func (i *Imp) Start() error { if cfg.Messaging != nil { msgSettings = cfg.Messaging.Settings } + treasuryTelegram := treasuryTelegramConfig(cfg, i.logger) + treasuryLedger := treasuryLedgerConfig(cfg, i.logger) gwCfg := gateway.Config{ Rail: cfg.Gateway.Rail, TargetChatIDEnv: cfg.Gateway.TargetChatIDEnv, @@ -103,6 +155,22 @@ func (i *Imp) Start() error { SuccessReaction: cfg.Gateway.SuccessReaction, InvokeURI: invokeURI, MessagingSettings: msgSettings, + DiscoveryRegistry: i.discoveryReg, + Treasury: gateway.TreasuryConfig{ + ExecutionDelay: cfg.Treasury.ExecutionDelay, + PollInterval: cfg.Treasury.PollInterval, + Telegram: gateway.TelegramConfig{ + AllowedChats: treasuryTelegram.AllowedChats, + Users: telegramUsers(treasuryTelegram.Users), + }, + Ledger: gateway.LedgerConfig{ + Timeout: treasuryLedger.Timeout, + }, + Limits: gateway.TreasuryLimitsConfig{ + MaxAmountPerOperation: cfg.Treasury.Limits.MaxAmountPerOperation, + MaxDailyAmount: cfg.Treasury.Limits.MaxDailyAmount, + }, + }, } svc := gateway.NewService(logger, repo, producer, broker, gwCfg) i.service = svc @@ -142,6 +210,15 @@ func (i *Imp) loadConfig() (*config, error) { if cfg.Metrics == nil { cfg.Metrics = &grpcapp.MetricsConfig{Address: ":9406"} } + if cfg.Treasury.ExecutionDelay <= 0 { + cfg.Treasury.ExecutionDelay = 30 * time.Second + } + if cfg.Treasury.PollInterval <= 0 { + cfg.Treasury.PollInterval = 30 * time.Second + } + if cfg.Treasury.Ledger.Timeout <= 0 { + cfg.Treasury.Ledger.Timeout = 5 * time.Second + } cfg.Gateway.Rail = discovery.NormalizeRail(cfg.Gateway.Rail) if cfg.Gateway.Rail == "" { return nil, merrors.InvalidArgument("gateway rail is required", "gateway.rail") @@ -151,3 +228,46 @@ func (i *Imp) loadConfig() (*config, error) { } return cfg, nil } + +func telegramUsers(input []telegramUserConfig) []gateway.TelegramUserBinding { + result := make([]gateway.TelegramUserBinding, 0, len(input)) + for _, next := range input { + result = append(result, gateway.TelegramUserBinding{ + TelegramUserID: strings.TrimSpace(next.TelegramUserID), + LedgerAccount: strings.TrimSpace(next.LedgerAccount), + }) + } + return result +} + +func treasuryTelegramConfig(cfg *config, logger mlogger.Logger) telegramConfig { + if cfg == nil { + return telegramConfig{} + } + if len(cfg.Treasury.Telegram.Users) > 0 || len(cfg.Treasury.Telegram.AllowedChats) > 0 { + return cfg.Treasury.Telegram + } + if len(cfg.Telegram.Users) > 0 || len(cfg.Telegram.AllowedChats) > 0 { + if logger != nil { + logger.Warn("Deprecated config path used: telegram.*; move these settings to treasury.telegram.*") + } + return cfg.Telegram + } + return cfg.Treasury.Telegram +} + +func treasuryLedgerConfig(cfg *config, logger mlogger.Logger) ledgerConfig { + if cfg == nil { + return ledgerConfig{} + } + if cfg.Treasury.Ledger.Timeout > 0 { + return cfg.Treasury.Ledger + } + if cfg.Ledger.Timeout > 0 { + if logger != nil { + logger.Warn("Deprecated config path used: ledger.*; move these settings to treasury.ledger.*") + } + return cfg.Ledger + } + return cfg.Treasury.Ledger +} diff --git a/api/gateway/tgsettle/internal/service/gateway/confirmation_flow.go b/api/gateway/tgsettle/internal/service/gateway/confirmation_flow.go index 5750a20e..ba232be0 100644 --- a/api/gateway/tgsettle/internal/service/gateway/confirmation_flow.go +++ b/api/gateway/tgsettle/internal/service/gateway/confirmation_flow.go @@ -146,6 +146,7 @@ func (s *Service) onTelegramUpdate(ctx context.Context, update *model.TelegramWe message := update.Message replyToID := strings.TrimSpace(message.ReplyToMessageID) if replyToID == "" { + s.handleTreasuryTelegramUpdate(ctx, update) return nil } replyFields := telegramReplyLogFields(update) @@ -154,6 +155,9 @@ func (s *Service) onTelegramUpdate(ctx context.Context, update *model.TelegramWe return err } if pending == nil { + if s.handleTreasuryTelegramUpdate(ctx, update) { + return nil + } s.logger.Warn("Telegram confirmation reply dropped", append(replyFields, zap.String("outcome", "dropped"), @@ -272,6 +276,13 @@ func (s *Service) onTelegramUpdate(ctx context.Context, update *model.TelegramWe return nil } +func (s *Service) handleTreasuryTelegramUpdate(ctx context.Context, update *model.TelegramWebhookUpdate) bool { + if s == nil || s.treasury == nil || update == nil || update.Message == nil { + return false + } + return s.treasury.HandleUpdate(ctx, update) +} + func telegramReplyLogFields(update *model.TelegramWebhookUpdate) []zap.Field { if update == nil || update.Message == nil { return nil diff --git a/api/gateway/tgsettle/internal/service/gateway/service.go b/api/gateway/tgsettle/internal/service/gateway/service.go index 61cc96a8..3b18e7c7 100644 --- a/api/gateway/tgsettle/internal/service/gateway/service.go +++ b/api/gateway/tgsettle/internal/service/gateway/service.go @@ -9,6 +9,8 @@ import ( "time" gatewayoutbox "github.com/tech/sendico/gateway/common/outbox" + treasurysvc "github.com/tech/sendico/gateway/tgsettle/internal/service/treasury" + treasuryledger "github.com/tech/sendico/gateway/tgsettle/internal/service/treasury/ledger" "github.com/tech/sendico/gateway/tgsettle/storage" storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" "github.com/tech/sendico/pkg/api/routers" @@ -40,6 +42,9 @@ const ( defaultConfirmationTimeoutSeconds = 345600 defaultTelegramSuccessReaction = "\U0001FAE1" defaultConfirmationSweepInterval = 5 * time.Second + defaultTreasuryExecutionDelay = 30 * time.Second + defaultTreasuryPollInterval = 30 * time.Second + defaultTreasuryLedgerTimeout = 5 * time.Second ) const ( @@ -59,6 +64,35 @@ type Config struct { SuccessReaction string InvokeURI string MessagingSettings pmodel.SettingsT + DiscoveryRegistry *discovery.Registry + Treasury TreasuryConfig +} + +type TelegramConfig struct { + AllowedChats []string + Users []TelegramUserBinding +} + +type TelegramUserBinding struct { + TelegramUserID string + LedgerAccount string +} + +type TreasuryConfig struct { + ExecutionDelay time.Duration + PollInterval time.Duration + Telegram TelegramConfig + Ledger LedgerConfig + Limits TreasuryLimitsConfig +} + +type TreasuryLimitsConfig struct { + MaxAmountPerOperation string + MaxDailyAmount string +} + +type LedgerConfig struct { + Timeout time.Duration } type Service struct { @@ -80,6 +114,8 @@ type Service struct { timeoutCancel context.CancelFunc timeoutWG sync.WaitGroup + treasury *treasurysvc.Module + connectorv1.UnimplementedConnectorServiceServer } @@ -112,6 +148,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro svc.startConsumers() svc.startAnnouncer() svc.startConfirmationTimeoutWatcher() + svc.startTreasuryModule() return svc } @@ -134,12 +171,91 @@ func (s *Service) Shutdown() { consumer.Close() } } + if s.treasury != nil { + s.treasury.Shutdown() + } if s.timeoutCancel != nil { s.timeoutCancel() } s.timeoutWG.Wait() } +func (s *Service) startTreasuryModule() { + if s == nil || s.repo == nil || s.repo.TreasuryRequests() == nil { + return + } + if s.cfg.DiscoveryRegistry == nil { + s.logger.Warn("Treasury module disabled: discovery registry is unavailable") + return + } + if len(s.cfg.Treasury.Telegram.Users) == 0 { + return + } + + ledgerTimeout := s.cfg.Treasury.Ledger.Timeout + if ledgerTimeout <= 0 { + ledgerTimeout = defaultTreasuryLedgerTimeout + } + ledgerClient, err := treasuryledger.NewDiscoveryClient(treasuryledger.DiscoveryConfig{ + Logger: s.logger, + Registry: s.cfg.DiscoveryRegistry, + Timeout: ledgerTimeout, + }) + if err != nil { + s.logger.Warn("Failed to initialise treasury ledger client", zap.Error(err)) + return + } + + executionDelay := s.cfg.Treasury.ExecutionDelay + if executionDelay <= 0 { + executionDelay = defaultTreasuryExecutionDelay + } + pollInterval := s.cfg.Treasury.PollInterval + if pollInterval <= 0 { + pollInterval = defaultTreasuryPollInterval + } + + users := make([]treasurysvc.UserBinding, 0, len(s.cfg.Treasury.Telegram.Users)) + for _, binding := range s.cfg.Treasury.Telegram.Users { + users = append(users, treasurysvc.UserBinding{ + TelegramUserID: binding.TelegramUserID, + LedgerAccount: binding.LedgerAccount, + }) + } + + module, err := treasurysvc.NewModule( + s.logger, + s.repo.TreasuryRequests(), + ledgerClient, + treasurysvc.Config{ + AllowedChats: s.cfg.Treasury.Telegram.AllowedChats, + Users: users, + ExecutionDelay: executionDelay, + PollInterval: pollInterval, + MaxAmountPerOperation: s.cfg.Treasury.Limits.MaxAmountPerOperation, + MaxDailyAmount: s.cfg.Treasury.Limits.MaxDailyAmount, + }, + func(ctx context.Context, chatID string, text string) error { + return s.sendTelegramText(ctx, &model.TelegramTextRequest{ + ChatID: chatID, + Text: text, + }) + }, + ) + if err != nil { + s.logger.Warn("Failed to initialise treasury module", zap.Error(err)) + _ = ledgerClient.Close() + return + } + if !module.Enabled() { + _ = ledgerClient.Close() + return + } + module.Start() + s.treasury = module + s.logger.Info("Treasury module started", zap.Duration("execution_delay", executionDelay), zap.Duration("poll_interval", pollInterval)) +} + func (s *Service) startConsumers() { if s == nil || s.broker == nil { if s != nil && s.logger != nil { diff --git a/api/gateway/tgsettle/internal/service/gateway/service_test.go b/api/gateway/tgsettle/internal/service/gateway/service_test.go index 3cf38757..b40f7833 100644 --- a/api/gateway/tgsettle/internal/service/gateway/service_test.go +++ b/api/gateway/tgsettle/internal/service/gateway/service_test.go @@ -80,6 +80,7 @@ type fakeRepo struct { payments *fakePaymentsStore tg *fakeTelegramStore pending *fakePendingStore + treasury storage.TreasuryRequestsStore } func (f *fakeRepo) Payments() storage.PaymentsStore { @@ -94,6 +95,10 @@ func (f *fakeRepo) PendingConfirmations() storage.PendingConfirmationsStore { return f.pending } +func (f *fakeRepo) TreasuryRequests() storage.TreasuryRequestsStore { + return f.treasury +} + type fakePendingStore struct { mu sync.Mutex records map[string]*storagemodel.PendingConfirmation diff --git a/api/gateway/tgsettle/internal/service/treasury/bot/dialogs.go b/api/gateway/tgsettle/internal/service/treasury/bot/dialogs.go new file mode 100644 index 00000000..64b62196 --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/bot/dialogs.go @@ -0,0 +1,73 @@ +package bot + +import ( + "strings" + "sync" + + storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" +) + +type DialogState string + +const ( + DialogStateWaitingAmount DialogState = "waiting_amount" + DialogStateWaitingConfirmation DialogState = "waiting_confirmation" +) + +type DialogSession struct { + State DialogState + OperationType storagemodel.TreasuryOperationType + LedgerAccountID string + RequestID string +} + +type Dialogs struct { + mu sync.Mutex + sessions map[string]DialogSession +} + +func NewDialogs() *Dialogs { + return &Dialogs{ + sessions: map[string]DialogSession{}, + } +} + +func (d *Dialogs) Get(telegramUserID string) (DialogSession, bool) { + if d == nil { + return DialogSession{}, false + } + telegramUserID = strings.TrimSpace(telegramUserID) + if telegramUserID == "" { + return DialogSession{}, false + } + d.mu.Lock() + defer d.mu.Unlock() + session, ok := d.sessions[telegramUserID] + return session, ok +} + +func (d *Dialogs) Set(telegramUserID string, session DialogSession) { + if d == nil { + return + } + telegramUserID = strings.TrimSpace(telegramUserID) + if telegramUserID == "" { + return + } + d.mu.Lock() + defer d.mu.Unlock() + d.sessions[telegramUserID] = session +} + +func (d *Dialogs) Clear(telegramUserID string) { + if d == nil { + return + } + telegramUserID = strings.TrimSpace(telegramUserID) + if telegramUserID == "" { + return + } + d.mu.Lock() + defer d.mu.Unlock() + delete(d.sessions, telegramUserID) +} diff --git a/api/gateway/tgsettle/internal/service/treasury/bot/router.go b/api/gateway/tgsettle/internal/service/treasury/bot/router.go new file mode 100644 index 00000000..26784763 --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/bot/router.go @@ -0,0 +1,366 @@ +package bot + +import ( + "context" + "errors" + "strconv" + "strings" + "time" + + storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" + "github.com/tech/sendico/pkg/model" + "go.uber.org/zap" +) + +const unauthorizedMessage = "Access denied.\n\nYour Telegram account is not authorized to perform treasury operations." + +type SendTextFunc func(ctx context.Context, chatID string, text string) error + +type ScheduleTracker interface { + TrackScheduled(record *storagemodel.TreasuryRequest) + Untrack(requestID string) +} + +type CreateRequestInput struct { + OperationType storagemodel.TreasuryOperationType + TelegramUserID string + LedgerAccountID string + ChatID string + Amount string +} + +type TreasuryService interface { + ExecutionDelay() time.Duration + MaxPerOperationLimit() string + + GetActiveRequestForAccount(ctx context.Context, ledgerAccountID string) (*storagemodel.TreasuryRequest, error) + CreateRequest(ctx context.Context, input CreateRequestInput) (*storagemodel.TreasuryRequest, error) + ConfirmRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) + CancelRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) +} + +type limitError interface { + error + LimitKind() string + LimitMax() string +} + +type Router struct { + logger mlogger.Logger + + service TreasuryService + dialogs *Dialogs + send SendTextFunc + tracker ScheduleTracker + + allowedChats map[string]struct{} + userAccounts map[string]string + allowAnyChat bool +} + +func NewRouter( + logger mlogger.Logger, + service TreasuryService, + send SendTextFunc, + tracker ScheduleTracker, + allowedChats []string, + userAccounts map[string]string, +) *Router { + if logger != nil { + logger = logger.Named("treasury_router") + } + allowed := map[string]struct{}{} + for _, chatID := range allowedChats { + chatID = strings.TrimSpace(chatID) + if chatID == "" { + continue + } + allowed[chatID] = struct{}{} + } + users := map[string]string{} + for userID, accountID := range userAccounts { + userID = strings.TrimSpace(userID) + accountID = strings.TrimSpace(accountID) + if userID == "" || accountID == "" { + continue + } + users[userID] = accountID + } + return &Router{ + logger: logger, + service: service, + dialogs: NewDialogs(), + send: send, + tracker: tracker, + allowedChats: allowed, + userAccounts: users, + allowAnyChat: len(allowed) == 0, + } +} + +func (r *Router) Enabled() bool { + return r != nil && r.service != nil && len(r.userAccounts) > 0 +} + +func (r *Router) HandleUpdate(ctx context.Context, update *model.TelegramWebhookUpdate) bool { + if !r.Enabled() || update == nil || update.Message == nil { + return false + } + message := update.Message + chatID := strings.TrimSpace(message.ChatID) + userID := strings.TrimSpace(message.FromUserID) + text := strings.TrimSpace(message.Text) + + if chatID == "" || userID == "" { + return false + } + if !r.allowAnyChat { + if _, ok := r.allowedChats[chatID]; !ok { + return true + } + } + + accountID, ok := r.userAccounts[userID] + if !ok || strings.TrimSpace(accountID) == "" { + r.logUnauthorized(update) + _ = r.sendText(ctx, chatID, unauthorizedMessage) + return true + } + + command := parseCommand(text) + switch command { + case "fund": + r.startAmountDialog(ctx, userID, accountID, chatID, storagemodel.TreasuryOperationFund) + return true + case "withdraw": + r.startAmountDialog(ctx, userID, accountID, chatID, storagemodel.TreasuryOperationWithdraw) + return true + case "confirm": + r.confirm(ctx, userID, accountID, chatID) + return true + case "cancel": + r.cancel(ctx, userID, accountID, chatID) + return true + } + + session, hasSession := r.dialogs.Get(userID) + if hasSession { + switch session.State { + case DialogStateWaitingAmount: + r.captureAmount(ctx, userID, accountID, chatID, session.OperationType, text) + return true + case DialogStateWaitingConfirmation: + _ = r.sendText(ctx, chatID, "Confirm operation?\n\n/confirm\n/cancel") + return true + } + } + + if strings.HasPrefix(text, "/") { + _ = r.sendText(ctx, chatID, "Supported commands:\n/fund\n/withdraw\n/confirm\n/cancel") + return true + } + return false +} + +func (r *Router) startAmountDialog(ctx context.Context, userID, accountID, chatID string, operation storagemodel.TreasuryOperationType) { + active, err := r.service.GetActiveRequestForAccount(ctx, accountID) + if err != nil { + r.logger.Warn("Failed to check active treasury request", zap.Error(err), zap.String("telegram_user_id", userID), zap.String("ledger_account_id", accountID)) + return + } + if active != nil { + _ = r.sendText(ctx, chatID, pendingRequestMessage(active)) + r.dialogs.Set(userID, DialogSession{ + State: DialogStateWaitingConfirmation, + LedgerAccountID: accountID, + RequestID: active.RequestID, + }) + return + } + r.dialogs.Set(userID, DialogSession{ + State: DialogStateWaitingAmount, + OperationType: operation, + LedgerAccountID: accountID, + }) + _ = r.sendText(ctx, chatID, "Enter amount:") +} + +func (r *Router) captureAmount(ctx context.Context, userID, accountID, chatID string, operation storagemodel.TreasuryOperationType, amount string) { + record, err := r.service.CreateRequest(ctx, CreateRequestInput{ + OperationType: operation, + TelegramUserID: userID, + LedgerAccountID: accountID, + ChatID: chatID, + Amount: amount, + }) + if err != nil { + if record != nil { + _ = r.sendText(ctx, chatID, pendingRequestMessage(record)) + r.dialogs.Set(userID, DialogSession{ + State: DialogStateWaitingConfirmation, + LedgerAccountID: accountID, + RequestID: record.RequestID, + }) + return + } + if typed, ok := err.(limitError); ok { + switch typed.LimitKind() { + case "per_operation": + _ = r.sendText(ctx, chatID, "Amount exceeds allowed limit.\n\nMax per operation: "+typed.LimitMax()+"\n\nEnter another amount or /cancel") + return + case "daily": + _ = r.sendText(ctx, chatID, "Daily amount limit exceeded.\n\nMax per day: "+typed.LimitMax()+"\n\nEnter another amount or /cancel") + return + } + } + if errors.Is(err, merrors.ErrInvalidArg) { + _ = r.sendText(ctx, chatID, "Invalid amount.\n\nEnter another amount or /cancel") + return + } + _ = r.sendText(ctx, chatID, "Failed to create treasury request.\n\nEnter another amount or /cancel") + return + } + if record == nil { + _ = r.sendText(ctx, chatID, "Failed to create treasury request.\n\nEnter another amount or /cancel") + return + } + r.dialogs.Set(userID, DialogSession{ + State: DialogStateWaitingConfirmation, + LedgerAccountID: accountID, + RequestID: record.RequestID, + }) + _ = r.sendText(ctx, chatID, confirmationPrompt(record)) +} + +func (r *Router) confirm(ctx context.Context, userID string, accountID string, chatID string) { + requestID := "" + if session, ok := r.dialogs.Get(userID); ok && strings.TrimSpace(session.RequestID) != "" { + requestID = strings.TrimSpace(session.RequestID) + } else { + active, err := r.service.GetActiveRequestForAccount(ctx, accountID) + if err == nil && active != nil { + requestID = strings.TrimSpace(active.RequestID) + } + } + if requestID == "" { + _ = r.sendText(ctx, chatID, "No pending treasury operation.") + return + } + record, err := r.service.ConfirmRequest(ctx, requestID, userID) + if err != nil { + _ = r.sendText(ctx, chatID, "Unable to confirm treasury request.\n\nUse /cancel or create a new request with /fund or /withdraw.") + return + } + if r.tracker != nil { + r.tracker.TrackScheduled(record) + } + r.dialogs.Clear(userID) + delay := int64(r.service.ExecutionDelay().Seconds()) + if delay < 0 { + delay = 0 + } + _ = r.sendText(ctx, chatID, "Operation confirmed.\n\nExecution scheduled in "+formatSeconds(delay)+".\n\nRequest ID: "+strings.TrimSpace(record.RequestID)) +} + +func (r *Router) cancel(ctx context.Context, userID string, accountID string, chatID string) { + requestID := "" + if session, ok := r.dialogs.Get(userID); ok && strings.TrimSpace(session.RequestID) != "" { + requestID = strings.TrimSpace(session.RequestID) + } else { + active, err := r.service.GetActiveRequestForAccount(ctx, accountID) + if err == nil && active != nil { + requestID = strings.TrimSpace(active.RequestID) + } + } + if requestID == "" { + r.dialogs.Clear(userID) + _ = r.sendText(ctx, chatID, "No pending treasury operation.") + return + } + record, err := r.service.CancelRequest(ctx, requestID, userID) + if err != nil { + _ = r.sendText(ctx, chatID, "Unable to cancel treasury request.") + return + } + if r.tracker != nil { + r.tracker.Untrack(record.RequestID) + } + r.dialogs.Clear(userID) + _ = r.sendText(ctx, chatID, "Operation cancelled.\n\nRequest ID: "+strings.TrimSpace(record.RequestID)) +} + +func (r *Router) sendText(ctx context.Context, chatID string, text string) error { + if r == nil || r.send == nil { + return nil + } + chatID = strings.TrimSpace(chatID) + text = strings.TrimSpace(text) + if chatID == "" || text == "" { + return nil + } + return r.send(ctx, chatID, text) +} + +func (r *Router) logUnauthorized(update *model.TelegramWebhookUpdate) { + if r == nil || r.logger == nil || update == nil || update.Message == nil { + return + } + message := update.Message + r.logger.Warn("unauthorized_access", + zap.String("event", "unauthorized_access"), + zap.String("telegram_user_id", strings.TrimSpace(message.FromUserID)), + zap.String("chat_id", strings.TrimSpace(message.ChatID)), + zap.String("message_text", strings.TrimSpace(message.Text)), + zap.Time("timestamp", time.Now()), + ) +} + +func pendingRequestMessage(record *storagemodel.TreasuryRequest) string { + if record == nil { + return "You already have a pending treasury operation.\n\n/cancel" + } + return "You already have a pending treasury operation.\n\n" + + "Request ID: " + strings.TrimSpace(record.RequestID) + "\n" + + "Status: " + strings.TrimSpace(string(record.Status)) + "\n" + + "Amount: " + strings.TrimSpace(record.Amount) + " " + strings.TrimSpace(record.Currency) + "\n\n" + + "Wait for execution or cancel it.\n\n/cancel" +} + +func confirmationPrompt(record *storagemodel.TreasuryRequest) string { + if record == nil { + return "Request created.\n\n/confirm\n/cancel" + } + title := "Funding request created." + if record.OperationType == storagemodel.TreasuryOperationWithdraw { + title = "Withdrawal request created." + } + return title + "\n\n" + + "Account: " + strings.TrimSpace(record.LedgerAccountID) + "\n" + + "Amount: " + strings.TrimSpace(record.Amount) + " " + strings.TrimSpace(record.Currency) + "\n\n" + + "Confirm operation?\n\n/confirm\n/cancel" +} + +func parseCommand(text string) string { + text = strings.TrimSpace(text) + if !strings.HasPrefix(text, "/") { + return "" + } + token := text + if idx := strings.IndexAny(token, " \t\n\r"); idx >= 0 { + token = token[:idx] + } + token = strings.TrimPrefix(token, "/") + if idx := strings.Index(token, "@"); idx >= 0 { + token = token[:idx] + } + return strings.ToLower(strings.TrimSpace(token)) +} + +func formatSeconds(value int64) string { + if value == 1 { + return "1 second" + } + return strconv.FormatInt(value, 10) + " seconds" +} diff --git a/api/gateway/tgsettle/internal/service/treasury/bot/router_test.go b/api/gateway/tgsettle/internal/service/treasury/bot/router_test.go new file mode 100644 index 00000000..c5aa955d --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/bot/router_test.go @@ -0,0 +1,158 @@ +package bot + +import ( + "context" + "testing" + "time" + + storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" + mloggerfactory "github.com/tech/sendico/pkg/mlogger/factory" + "github.com/tech/sendico/pkg/model" +) + +type fakeService struct{} + +func (fakeService) ExecutionDelay() time.Duration { + return 30 * time.Second +} + +func (fakeService) MaxPerOperationLimit() string { + return "1000000" +} + +func (fakeService) GetActiveRequestForAccount(context.Context, string) (*storagemodel.TreasuryRequest, error) { + return nil, nil +} + +func (fakeService) CreateRequest(context.Context, CreateRequestInput) (*storagemodel.TreasuryRequest, error) { + return nil, nil +} + +func (fakeService) ConfirmRequest(context.Context, string, string) (*storagemodel.TreasuryRequest, error) { + return nil, nil +} + +func (fakeService) CancelRequest(context.Context, string, string) (*storagemodel.TreasuryRequest, error) { + return nil, nil +} + +func TestRouterUnauthorizedInAllowedChatSendsAccessDenied(t *testing.T) { + var sent []string + router := NewRouter( + mloggerfactory.NewLogger(false), + fakeService{}, + func(_ context.Context, _ string, text string) error { + sent = append(sent, text) + return nil + }, + nil, + []string{"100"}, + map[string]string{"123": "acct-1"}, + ) + handled := router.HandleUpdate(context.Background(), &model.TelegramWebhookUpdate{ + Message: &model.TelegramMessage{ + ChatID: "100", + FromUserID: "999", + Text: "/fund", + }, + }) + if !handled { + t.Fatalf("expected update to be handled") + } + if len(sent) != 1 { + t.Fatalf("expected one message, got %d", len(sent)) + } + if sent[0] != unauthorizedMessage { + t.Fatalf("unexpected message: %q", sent[0]) + } +} + +func TestRouterUnknownChatIsIgnored(t *testing.T) { + var sent []string + router := NewRouter( + mloggerfactory.NewLogger(false), + fakeService{}, + func(_ context.Context, _ string, text string) error { + sent = append(sent, text) + return nil + }, + nil, + []string{"100"}, + map[string]string{"123": "acct-1"}, + ) + handled := router.HandleUpdate(context.Background(), &model.TelegramWebhookUpdate{ + Message: &model.TelegramMessage{ + ChatID: "999", + FromUserID: "123", + Text: "/fund", + }, + }) + if !handled { + t.Fatalf("expected update to be handled") + } + if len(sent) != 0 { + t.Fatalf("expected no messages, got %d", len(sent)) + } +} + +func TestRouterEmptyAllowedChats_AllowsAnyChatForAuthorizedUser(t *testing.T) { + var sent []string + router := NewRouter( + mloggerfactory.NewLogger(false), + fakeService{}, + func(_ context.Context, _ string, text string) error { + sent = append(sent, text) + return nil + }, + nil, + nil, + map[string]string{"123": "acct-1"}, + ) + handled := router.HandleUpdate(context.Background(), &model.TelegramWebhookUpdate{ + Message: &model.TelegramMessage{ + ChatID: "999", + FromUserID: "123", + Text: "/fund", + }, + }) + if !handled { + t.Fatalf("expected update to be handled") + } + if len(sent) != 1 { + t.Fatalf("expected one message, got %d", len(sent)) + } + if sent[0] != "Enter amount:" { + t.Fatalf("unexpected message: %q", sent[0]) + } +} + +func TestRouterEmptyAllowedChats_UnauthorizedUserGetsDenied(t *testing.T) { + var sent []string + router := NewRouter( + mloggerfactory.NewLogger(false), + fakeService{}, + func(_ context.Context, _ string, text string) error { + sent = append(sent, text) + return nil + }, + nil, + nil, + map[string]string{"123": "acct-1"}, + ) + handled := router.HandleUpdate(context.Background(), &model.TelegramWebhookUpdate{ + Message: &model.TelegramMessage{ + ChatID: "777", + FromUserID: "999", + Text: "/fund", + }, + }) + if !handled { + t.Fatalf("expected update to be handled") + } + if len(sent) != 1 { + t.Fatalf("expected one message, got %d", len(sent)) + } + if sent[0] != unauthorizedMessage { + t.Fatalf("unexpected message: %q", sent[0]) + } +} diff --git a/api/gateway/tgsettle/internal/service/treasury/config.go b/api/gateway/tgsettle/internal/service/treasury/config.go new file mode 100644 index 00000000..8b3208f0 --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/config.go @@ -0,0 +1,19 @@ +package treasury + +import "time" + +type UserBinding struct { + TelegramUserID string + LedgerAccount string +} + +type Config struct { + AllowedChats []string + Users []UserBinding + + ExecutionDelay time.Duration + PollInterval time.Duration + + MaxAmountPerOperation string + MaxDailyAmount string +} diff --git a/api/gateway/tgsettle/internal/service/treasury/ledger/client.go b/api/gateway/tgsettle/internal/service/treasury/ledger/client.go new file mode 100644 index 00000000..9a256143 --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/ledger/client.go @@ -0,0 +1,287 @@ +package ledger + +import ( + "context" + "crypto/tls" + "fmt" + "net/url" + "strings" + "time" + + "github.com/tech/sendico/pkg/discovery" + "github.com/tech/sendico/pkg/merrors" + moneyv1 "github.com/tech/sendico/pkg/proto/common/money/v1" + connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/structpb" +) + +const ledgerConnectorID = "ledger" + +type Config struct { + Endpoint string + Timeout time.Duration + Insecure bool +} + +type Account struct { + AccountID string + Currency string + OrganizationRef string +} + +type Balance struct { + AccountID string + Amount string + Currency string +} + +type PostRequest struct { + AccountID string + OrganizationRef string + Amount string + Currency string + Reference string + IdempotencyKey string +} + +type OperationResult struct { + Reference string +} + +type Client interface { + GetAccount(ctx context.Context, accountID string) (*Account, error) + GetBalance(ctx context.Context, accountID string) (*Balance, error) + ExternalCredit(ctx context.Context, req PostRequest) (*OperationResult, error) + ExternalDebit(ctx context.Context, req PostRequest) (*OperationResult, error) + Close() error +} + +type grpcConnectorClient interface { + GetAccount(ctx context.Context, in *connectorv1.GetAccountRequest, opts ...grpc.CallOption) (*connectorv1.GetAccountResponse, error) + GetBalance(ctx context.Context, in *connectorv1.GetBalanceRequest, opts ...grpc.CallOption) (*connectorv1.GetBalanceResponse, error) + SubmitOperation(ctx context.Context, in *connectorv1.SubmitOperationRequest, opts ...grpc.CallOption) (*connectorv1.SubmitOperationResponse, error) +} + +type connectorClient struct { + cfg Config + conn *grpc.ClientConn + client grpcConnectorClient +} + +func New(cfg Config) (Client, error) { + cfg.Endpoint = strings.TrimSpace(cfg.Endpoint) + if cfg.Endpoint == "" { + return nil, merrors.InvalidArgument("ledger endpoint is required", "ledger.endpoint") + } + if normalized, insecure := normalizeEndpoint(cfg.Endpoint); normalized != "" { + cfg.Endpoint = normalized + if insecure { + cfg.Insecure = true + } + } + if cfg.Timeout <= 0 { + cfg.Timeout = 5 * time.Second + } + dialOpts := []grpc.DialOption{} + if cfg.Insecure { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) + } + conn, err := grpc.NewClient(cfg.Endpoint, dialOpts...) + if err != nil { + return nil, merrors.InternalWrap(err, fmt.Sprintf("ledger: dial %s", cfg.Endpoint)) + } + return &connectorClient{ + cfg: cfg, + conn: conn, + client: connectorv1.NewConnectorServiceClient(conn), + }, nil +} + +func (c *connectorClient) Close() error { + if c == nil || c.conn == nil { + return nil + } + return c.conn.Close() +} + +func (c *connectorClient) GetAccount(ctx context.Context, accountID string) (*Account, error) { + accountID = strings.TrimSpace(accountID) + if accountID == "" { + return nil, merrors.InvalidArgument("ledger account_id is required", "account_id") + } + ctx, cancel := c.callContext(ctx) + defer cancel() + + resp, err := c.client.GetAccount(ctx, &connectorv1.GetAccountRequest{ + AccountRef: &connectorv1.AccountRef{ + ConnectorId: ledgerConnectorID, + AccountId: accountID, + }, + }) + if err != nil { + return nil, err + } + account := resp.GetAccount() + if account == nil { + return nil, merrors.NoData("ledger account not found") + } + organizationRef := strings.TrimSpace(account.GetOwnerRef()) + if organizationRef == "" && account.GetProviderDetails() != nil { + if value, ok := account.GetProviderDetails().AsMap()["organization_ref"]; ok { + organizationRef = strings.TrimSpace(fmt.Sprint(value)) + } + } + return &Account{ + AccountID: accountID, + Currency: strings.ToUpper(strings.TrimSpace(account.GetAsset())), + OrganizationRef: organizationRef, + }, nil +} + +func (c *connectorClient) GetBalance(ctx context.Context, accountID string) (*Balance, error) { + accountID = strings.TrimSpace(accountID) + if accountID == "" { + return nil, merrors.InvalidArgument("ledger account_id is required", "account_id") + } + ctx, cancel := c.callContext(ctx) + defer cancel() + + resp, err := c.client.GetBalance(ctx, &connectorv1.GetBalanceRequest{ + AccountRef: &connectorv1.AccountRef{ + ConnectorId: ledgerConnectorID, + AccountId: accountID, + }, + }) + if err != nil { + return nil, err + } + balance := resp.GetBalance() + if balance == nil || balance.GetAvailable() == nil { + return nil, merrors.Internal("ledger balance is unavailable") + } + return &Balance{ + AccountID: accountID, + Amount: strings.TrimSpace(balance.GetAvailable().GetAmount()), + Currency: strings.ToUpper(strings.TrimSpace(balance.GetAvailable().GetCurrency())), + }, nil +} + +func (c *connectorClient) ExternalCredit(ctx context.Context, req PostRequest) (*OperationResult, error) { + return c.submitExternalOperation(ctx, connectorv1.OperationType_CREDIT, discovery.OperationExternalCredit, req) +} + +func (c *connectorClient) ExternalDebit(ctx context.Context, req PostRequest) (*OperationResult, error) { + return c.submitExternalOperation(ctx, connectorv1.OperationType_DEBIT, discovery.OperationExternalDebit, req) +} + +func (c *connectorClient) submitExternalOperation(ctx context.Context, opType connectorv1.OperationType, operation string, req PostRequest) (*OperationResult, error) { + req.AccountID = strings.TrimSpace(req.AccountID) + req.OrganizationRef = strings.TrimSpace(req.OrganizationRef) + req.Amount = strings.TrimSpace(req.Amount) + req.Currency = strings.ToUpper(strings.TrimSpace(req.Currency)) + req.Reference = strings.TrimSpace(req.Reference) + req.IdempotencyKey = strings.TrimSpace(req.IdempotencyKey) + + if req.AccountID == "" { + return nil, merrors.InvalidArgument("ledger account_id is required", "account_id") + } + if req.OrganizationRef == "" { + return nil, merrors.InvalidArgument("ledger organization_ref is required", "organization_ref") + } + if req.Amount == "" || req.Currency == "" { + return nil, merrors.InvalidArgument("ledger amount is required", "amount") + } + if req.IdempotencyKey == "" { + return nil, merrors.InvalidArgument("ledger idempotency_key is required", "idempotency_key") + } + + params := map[string]any{ + "organization_ref": req.OrganizationRef, + "operation": operation, + "description": "tgsettle treasury operation", + "metadata": map[string]any{ + "reference": req.Reference, + }, + } + operationReq := &connectorv1.Operation{ + Type: opType, + IdempotencyKey: req.IdempotencyKey, + Money: &moneyv1.Money{ + Amount: req.Amount, + Currency: req.Currency, + }, + Params: structFromMap(params), + } + account := &connectorv1.AccountRef{ConnectorId: ledgerConnectorID, AccountId: req.AccountID} + switch opType { + case connectorv1.OperationType_CREDIT: + operationReq.To = &connectorv1.OperationParty{Ref: &connectorv1.OperationParty_Account{Account: account}} + case connectorv1.OperationType_DEBIT: + operationReq.From = &connectorv1.OperationParty{Ref: &connectorv1.OperationParty_Account{Account: account}} + } + + ctx, cancel := c.callContext(ctx) + defer cancel() + + resp, err := c.client.SubmitOperation(ctx, &connectorv1.SubmitOperationRequest{Operation: operationReq}) + if err != nil { + return nil, err + } + if resp.GetReceipt() == nil { + return nil, merrors.Internal("ledger receipt is unavailable") + } + if receiptErr := resp.GetReceipt().GetError(); receiptErr != nil { + message := strings.TrimSpace(receiptErr.GetMessage()) + if message == "" { + message = "ledger operation failed" + } + return nil, merrors.InvalidArgument(message) + } + reference := strings.TrimSpace(resp.GetReceipt().GetOperationId()) + if reference == "" { + reference = req.Reference + } + return &OperationResult{Reference: reference}, nil +} + +func (c *connectorClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) { + if ctx == nil { + ctx = context.Background() + } + return context.WithTimeout(ctx, c.cfg.Timeout) +} + +func structFromMap(values map[string]any) *structpb.Struct { + if len(values) == 0 { + return nil + } + result, err := structpb.NewStruct(values) + if err != nil { + return nil + } + return result +} + +func normalizeEndpoint(raw string) (string, bool) { + raw = strings.TrimSpace(raw) + if raw == "" { + return "", false + } + parsed, err := url.Parse(raw) + if err != nil || parsed.Scheme == "" || parsed.Host == "" { + return raw, false + } + switch strings.ToLower(strings.TrimSpace(parsed.Scheme)) { + case "http", "grpc": + return parsed.Host, true + case "https", "grpcs": + return parsed.Host, false + default: + return raw, false + } +} diff --git a/api/gateway/tgsettle/internal/service/treasury/ledger/discovery_client.go b/api/gateway/tgsettle/internal/service/treasury/ledger/discovery_client.go new file mode 100644 index 00000000..1bee6a1d --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/ledger/discovery_client.go @@ -0,0 +1,235 @@ +package ledger + +import ( + "context" + "fmt" + "net" + "net/url" + "sort" + "strings" + "sync" + "time" + + "github.com/tech/sendico/pkg/discovery" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" + "github.com/tech/sendico/pkg/mservice" + "go.uber.org/zap" +) + +type DiscoveryConfig struct { + Logger mlogger.Logger + Registry *discovery.Registry + Timeout time.Duration +} + +type discoveryEndpoint struct { + address string + insecure bool + raw string +} + +func (e discoveryEndpoint) key() string { + return fmt.Sprintf("%s|%t", e.address, e.insecure) +} + +type discoveryClient struct { + logger mlogger.Logger + registry *discovery.Registry + timeout time.Duration + + mu sync.Mutex + client Client + endpointKey string +} + +func NewDiscoveryClient(cfg DiscoveryConfig) (Client, error) { + if cfg.Registry == nil { + return nil, merrors.InvalidArgument("treasury ledger discovery registry is required", "registry") + } + if cfg.Timeout <= 0 { + cfg.Timeout = 5 * time.Second + } + logger := cfg.Logger + if logger != nil { + logger = logger.Named("treasury_ledger_discovery") + } + return &discoveryClient{ + logger: logger, + registry: cfg.Registry, + timeout: cfg.Timeout, + }, nil +} + +func (c *discoveryClient) Close() error { + if c == nil { + return nil + } + c.mu.Lock() + defer c.mu.Unlock() + if c.client != nil { + err := c.client.Close() + c.client = nil + c.endpointKey = "" + return err + } + return nil +} + +func (c *discoveryClient) GetAccount(ctx context.Context, accountID string) (*Account, error) { + client, err := c.resolveClient(ctx) + if err != nil { + return nil, err + } + return client.GetAccount(ctx, accountID) +} + +func (c *discoveryClient) GetBalance(ctx context.Context, accountID string) (*Balance, error) { + client, err := c.resolveClient(ctx) + if err != nil { + return nil, err + } + return client.GetBalance(ctx, accountID) +} + +func (c *discoveryClient) ExternalCredit(ctx context.Context, req PostRequest) (*OperationResult, error) { + client, err := c.resolveClient(ctx) + if err != nil { + return nil, err + } + return client.ExternalCredit(ctx, req) +} + +func (c *discoveryClient) ExternalDebit(ctx context.Context, req PostRequest) (*OperationResult, error) { + client, err := c.resolveClient(ctx) + if err != nil { + return nil, err + } + return client.ExternalDebit(ctx, req) +} + +func (c *discoveryClient) resolveClient(_ context.Context) (Client, error) { + if c == nil || c.registry == nil { + return nil, merrors.Internal("treasury ledger discovery is unavailable") + } + endpoint, err := c.resolveEndpoint() + if err != nil { + return nil, err + } + key := endpoint.key() + + c.mu.Lock() + defer c.mu.Unlock() + + if c.client != nil && c.endpointKey == key { + return c.client, nil + } + if c.client != nil { + _ = c.client.Close() + c.client = nil + c.endpointKey = "" + } + next, err := New(Config{ + Endpoint: endpoint.address, + Timeout: c.timeout, + Insecure: endpoint.insecure, + }) + if err != nil { + return nil, err + } + c.client = next + c.endpointKey = key + if c.logger != nil { + c.logger.Info("Discovered ledger endpoint selected", + zap.String("service", string(mservice.Ledger)), + zap.String("invoke_uri", endpoint.raw), + zap.String("address", endpoint.address), + zap.Bool("insecure", endpoint.insecure)) + } + return c.client, nil +} + +func (c *discoveryClient) resolveEndpoint() (discoveryEndpoint, error) { + entries := c.registry.List(time.Now(), true) + type match struct { + entry discovery.RegistryEntry + opMatch bool + } + matches := make([]match, 0, len(entries)) + requiredOps := discovery.LedgerServiceOperations() + for _, entry := range entries { + if !matchesService(entry.Service, mservice.Ledger) { + continue + } + matches = append(matches, match{ + entry: entry, + opMatch: discovery.HasAnyOperation(entry.Operations, requiredOps), + }) + } + if len(matches) == 0 { + return discoveryEndpoint{}, merrors.NoData("discovery: ledger service unavailable") + } + sort.Slice(matches, func(i, j int) bool { + if matches[i].opMatch != matches[j].opMatch { + return matches[i].opMatch + } + if matches[i].entry.RoutingPriority != matches[j].entry.RoutingPriority { + return matches[i].entry.RoutingPriority > matches[j].entry.RoutingPriority + } + if matches[i].entry.ID != matches[j].entry.ID { + return matches[i].entry.ID < matches[j].entry.ID + } + return matches[i].entry.InstanceID < matches[j].entry.InstanceID + }) + return parseDiscoveryEndpoint(matches[0].entry.InvokeURI) +} + +func matchesService(service string, candidate mservice.Type) bool { + service = strings.TrimSpace(service) + if service == "" || strings.TrimSpace(string(candidate)) == "" { + return false + } + return strings.EqualFold(service, strings.TrimSpace(string(candidate))) +} + +func parseDiscoveryEndpoint(raw string) (discoveryEndpoint, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri is required") + } + + if !strings.Contains(raw, "://") { + if _, _, splitErr := net.SplitHostPort(raw); splitErr != nil { + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") + } + return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil + } + + parsed, err := url.Parse(raw) + if err != nil || parsed.Scheme == "" { + if err != nil { + return discoveryEndpoint{}, err + } + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") + } + + scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme)) + switch scheme { + case "grpc": + address := strings.TrimSpace(parsed.Host) + if _, _, splitErr := net.SplitHostPort(address); splitErr != nil { + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") + } + return discoveryEndpoint{address: address, insecure: true, raw: raw}, nil + case "grpcs": + address := strings.TrimSpace(parsed.Host) + if _, _, splitErr := net.SplitHostPort(address); splitErr != nil { + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") + } + return discoveryEndpoint{address: address, insecure: false, raw: raw}, nil + case "dns", "passthrough": + return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil + default: + return discoveryEndpoint{}, merrors.InvalidArgument("discovery: unsupported invoke uri scheme") + } +} diff --git a/api/gateway/tgsettle/internal/service/treasury/module.go b/api/gateway/tgsettle/internal/service/treasury/module.go new file mode 100644 index 00000000..5b1f89af --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/module.go @@ -0,0 +1,148 @@ +package treasury + +import ( + "context" + "strings" + "time" + + "github.com/tech/sendico/gateway/tgsettle/internal/service/treasury/bot" + "github.com/tech/sendico/gateway/tgsettle/internal/service/treasury/ledger" + "github.com/tech/sendico/gateway/tgsettle/storage" + storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" + "github.com/tech/sendico/pkg/model" +) + +type Module struct { + logger mlogger.Logger + + service *Service + router *bot.Router + scheduler *Scheduler + ledger ledger.Client +} + +func NewModule( + logger mlogger.Logger, + repo storage.TreasuryRequestsStore, + ledgerClient ledger.Client, + cfg Config, + send bot.SendTextFunc, +) (*Module, error) { + if logger != nil { + logger = logger.Named("treasury") + } + service, err := NewService( + logger, + repo, + ledgerClient, + cfg.ExecutionDelay, + cfg.MaxAmountPerOperation, + cfg.MaxDailyAmount, + ) + if err != nil { + return nil, err + } + + users := map[string]string{} + for _, binding := range cfg.Users { + userID := strings.TrimSpace(binding.TelegramUserID) + accountID := strings.TrimSpace(binding.LedgerAccount) + if userID == "" || accountID == "" { + continue + } + users[userID] = accountID + } + + module := &Module{ + logger: logger, + service: service, + ledger: ledgerClient, + } + module.scheduler = NewScheduler(logger, service, NotifyFunc(send), cfg.PollInterval) + module.router = bot.NewRouter(logger, &botServiceAdapter{svc: service}, send, module.scheduler, cfg.AllowedChats, users) + return module, nil +} + +func (m *Module) Enabled() bool { + return m != nil && m.router != nil && m.router.Enabled() && m.scheduler != nil +} + +func (m *Module) Start() { + if m == nil || m.scheduler == nil { + return + } + m.scheduler.Start() +} + +func (m *Module) Shutdown() { + if m == nil { + return + } + if m.scheduler != nil { + m.scheduler.Shutdown() + } + if m.ledger != nil { + _ = m.ledger.Close() + } +} + +func (m *Module) HandleUpdate(ctx context.Context, update *model.TelegramWebhookUpdate) bool { + if m == nil || m.router == nil { + return false + } + return m.router.HandleUpdate(ctx, update) +} + +type botServiceAdapter struct { + svc *Service +} + +func (a *botServiceAdapter) ExecutionDelay() (delay time.Duration) { + if a == nil || a.svc == nil { + return 0 + } + return a.svc.ExecutionDelay() +} + +func (a *botServiceAdapter) MaxPerOperationLimit() string { + if a == nil || a.svc == nil { + return "" + } + return a.svc.MaxPerOperationLimit() +} + +func (a *botServiceAdapter) GetActiveRequestForAccount(ctx context.Context, ledgerAccountID string) (*storagemodel.TreasuryRequest, error) { + if a == nil || a.svc == nil { + return nil, merrors.Internal("treasury service unavailable") + } + return a.svc.GetActiveRequestForAccount(ctx, ledgerAccountID) +} + +func (a *botServiceAdapter) CreateRequest(ctx context.Context, input bot.CreateRequestInput) (*storagemodel.TreasuryRequest, error) { + if a == nil || a.svc == nil { + return nil, merrors.Internal("treasury service unavailable") + } + return a.svc.CreateRequest(ctx, CreateRequestInput{ + OperationType: input.OperationType, + TelegramUserID: input.TelegramUserID, + LedgerAccountID: input.LedgerAccountID, + ChatID: input.ChatID, + Amount: input.Amount, + }) +} + +func (a *botServiceAdapter) ConfirmRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) { + if a == nil || a.svc == nil { + return nil, merrors.Internal("treasury service unavailable") + } + return a.svc.ConfirmRequest(ctx, requestID, telegramUserID) +} + +func (a *botServiceAdapter) CancelRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) { + if a == nil || a.svc == nil { + return nil, merrors.Internal("treasury service unavailable") + } + return a.svc.CancelRequest(ctx, requestID, telegramUserID) +} diff --git a/api/gateway/tgsettle/internal/service/treasury/scheduler.go b/api/gateway/tgsettle/internal/service/treasury/scheduler.go new file mode 100644 index 00000000..08fb0a88 --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/scheduler.go @@ -0,0 +1,261 @@ +package treasury + +import ( + "context" + "strings" + "sync" + "time" + + storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" + "github.com/tech/sendico/pkg/mlogger" + "go.uber.org/zap" +) + +type NotifyFunc func(ctx context.Context, chatID string, text string) error + +type Scheduler struct { + logger mlogger.Logger + service *Service + notify NotifyFunc + safetySweepInterval time.Duration + + cancel context.CancelFunc + wg sync.WaitGroup + + timersMu sync.Mutex + timers map[string]*time.Timer +} + +func NewScheduler(logger mlogger.Logger, service *Service, notify NotifyFunc, safetySweepInterval time.Duration) *Scheduler { + if logger != nil { + logger = logger.Named("treasury_scheduler") + } + if safetySweepInterval <= 0 { + safetySweepInterval = 30 * time.Second + } + return &Scheduler{ + logger: logger, + service: service, + notify: notify, + safetySweepInterval: safetySweepInterval, + timers: map[string]*time.Timer{}, + } +} + +func (s *Scheduler) Start() { + if s == nil || s.service == nil || s.cancel != nil { + return + } + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + + // Rebuild in-memory timers from DB on startup. + s.hydrateTimers(ctx) + // Safety pass for overdue items at startup. + s.sweep(ctx) + + s.wg.Add(1) + go func() { + defer s.wg.Done() + ticker := time.NewTicker(s.safetySweepInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.sweep(ctx) + } + } + }() +} + +func (s *Scheduler) Shutdown() { + if s == nil || s.cancel == nil { + return + } + s.cancel() + s.wg.Wait() + s.timersMu.Lock() + for requestID, timer := range s.timers { + if timer != nil { + timer.Stop() + } + delete(s.timers, requestID) + } + s.timersMu.Unlock() +} + +func (s *Scheduler) TrackScheduled(record *storagemodel.TreasuryRequest) { + if s == nil || s.service == nil || record == nil { + return + } + if strings.TrimSpace(record.RequestID) == "" { + return + } + if record.Status != storagemodel.TreasuryRequestStatusScheduled { + return + } + requestID := strings.TrimSpace(record.RequestID) + when := record.ScheduledAt + if when.IsZero() { + when = time.Now() + } + delay := time.Until(when) + if delay <= 0 { + s.Untrack(requestID) + go s.executeAndNotifyByID(context.Background(), requestID) + return + } + + s.timersMu.Lock() + if existing := s.timers[requestID]; existing != nil { + existing.Stop() + } + s.timers[requestID] = time.AfterFunc(delay, func() { + s.Untrack(requestID) + s.executeAndNotifyByID(context.Background(), requestID) + }) + s.timersMu.Unlock() +} + +func (s *Scheduler) Untrack(requestID string) { + if s == nil { + return + } + requestID = strings.TrimSpace(requestID) + if requestID == "" { + return + } + s.timersMu.Lock() + if timer := s.timers[requestID]; timer != nil { + timer.Stop() + } + delete(s.timers, requestID) + s.timersMu.Unlock() +} + +func (s *Scheduler) hydrateTimers(ctx context.Context) { + if s == nil || s.service == nil { + return + } + scheduled, err := s.service.ScheduledRequests(ctx, 1000) + if err != nil { + s.logger.Warn("Failed to hydrate scheduled treasury requests", zap.Error(err)) + return + } + for _, record := range scheduled { + s.TrackScheduled(record) + } +} + +func (s *Scheduler) sweep(ctx context.Context) { + if s == nil || s.service == nil { + return + } + now := time.Now() + + confirmed, err := s.service.DueRequests(ctx, []storagemodel.TreasuryRequestStatus{ + storagemodel.TreasuryRequestStatusConfirmed, + }, now, 100) + if err != nil { + s.logger.Warn("Failed to list confirmed treasury requests", zap.Error(err)) + return + } + for _, request := range confirmed { + s.executeAndNotifyByID(ctx, strings.TrimSpace(request.RequestID)) + } + + scheduled, err := s.service.DueRequests(ctx, []storagemodel.TreasuryRequestStatus{ + storagemodel.TreasuryRequestStatusScheduled, + }, now, 100) + if err != nil { + s.logger.Warn("Failed to list scheduled treasury requests", zap.Error(err)) + return + } + for _, request := range scheduled { + s.Untrack(strings.TrimSpace(request.RequestID)) + s.executeAndNotifyByID(ctx, strings.TrimSpace(request.RequestID)) + } +} + +func (s *Scheduler) executeAndNotifyByID(ctx context.Context, requestID string) { + if s == nil || s.service == nil { + return + } + requestID = strings.TrimSpace(requestID) + if requestID == "" { + return + } + + runCtx := ctx + if runCtx == nil { + runCtx = context.Background() + } + withTimeout, cancel := context.WithTimeout(runCtx, 30*time.Second) + defer cancel() + + result, err := s.service.ExecuteRequest(withTimeout, requestID) + if err != nil { + s.logger.Warn("Failed to execute treasury request", zap.Error(err), zap.String("request_id", requestID)) + return + } + if result == nil || result.Request == nil || s.notify == nil { + return + } + + text := executionMessage(result) + if strings.TrimSpace(text) == "" { + return + } + if err := s.notify(ctx, strings.TrimSpace(result.Request.ChatID), text); err != nil { + s.logger.Warn("Failed to notify treasury execution result", zap.Error(err), zap.String("request_id", strings.TrimSpace(result.Request.RequestID))) + } +} + +func executionMessage(result *ExecutionResult) string { + if result == nil || result.Request == nil { + return "" + } + request := result.Request + switch request.Status { + case storagemodel.TreasuryRequestStatusExecuted: + op := "Funding" + sign := "+" + if request.OperationType == storagemodel.TreasuryOperationWithdraw { + op = "Withdrawal" + sign = "-" + } + balanceAmount := "unavailable" + balanceCurrency := strings.TrimSpace(request.Currency) + if result.NewBalance != nil { + if strings.TrimSpace(result.NewBalance.Amount) != "" { + balanceAmount = strings.TrimSpace(result.NewBalance.Amount) + } + if strings.TrimSpace(result.NewBalance.Currency) != "" { + balanceCurrency = strings.TrimSpace(result.NewBalance.Currency) + } + } + return op + " completed.\n\n" + + "Account: " + strings.TrimSpace(request.LedgerAccountID) + "\n" + + "Amount: " + sign + strings.TrimSpace(request.Amount) + " " + strings.TrimSpace(request.Currency) + "\n" + + "New balance: " + balanceAmount + " " + balanceCurrency + "\n\n" + + "Reference: " + strings.TrimSpace(request.RequestID) + case storagemodel.TreasuryRequestStatusFailed: + reason := strings.TrimSpace(request.ErrorMessage) + if reason == "" && result.ExecutionError != nil { + reason = strings.TrimSpace(result.ExecutionError.Error()) + } + if reason == "" { + reason = "Unknown error." + } + return "Execution failed.\n\n" + + "Account: " + strings.TrimSpace(request.LedgerAccountID) + "\n" + + "Amount: " + strings.TrimSpace(request.Amount) + " " + strings.TrimSpace(request.Currency) + "\n" + + "Status: FAILED\n\n" + + "Reason:\n" + reason + "\n\n" + + "Request ID: " + strings.TrimSpace(request.RequestID) + default: + return "" + } +} diff --git a/api/gateway/tgsettle/internal/service/treasury/service.go b/api/gateway/tgsettle/internal/service/treasury/service.go new file mode 100644 index 00000000..bcc1013b --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/service.go @@ -0,0 +1,411 @@ +package treasury + +import ( + "context" + "errors" + "fmt" + "math/big" + "strings" + "time" + + "github.com/tech/sendico/gateway/tgsettle/internal/service/treasury/ledger" + "github.com/tech/sendico/gateway/tgsettle/storage" + storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" + "go.mongodb.org/mongo-driver/v2/bson" + "go.uber.org/zap" +) + +var ErrActiveTreasuryRequest = errors.New("active treasury request exists") + +type CreateRequestInput struct { + OperationType storagemodel.TreasuryOperationType + TelegramUserID string + LedgerAccountID string + ChatID string + Amount string +} + +type ExecutionResult struct { + Request *storagemodel.TreasuryRequest + NewBalance *ledger.Balance + ExecutionError error +} + +type Service struct { + logger mlogger.Logger + repo storage.TreasuryRequestsStore + ledger ledger.Client + + validator *Validator + executionDelay time.Duration +} + +func NewService( + logger mlogger.Logger, + repo storage.TreasuryRequestsStore, + ledgerClient ledger.Client, + executionDelay time.Duration, + maxPerOperation string, + maxDaily string, +) (*Service, error) { + if logger == nil { + return nil, merrors.InvalidArgument("logger is required", "logger") + } + if repo == nil { + return nil, merrors.InvalidArgument("treasury repository is required", "repo") + } + if ledgerClient == nil { + return nil, merrors.InvalidArgument("ledger client is required", "ledger_client") + } + if executionDelay <= 0 { + executionDelay = 30 * time.Second + } + validator, err := NewValidator(repo, maxPerOperation, maxDaily) + if err != nil { + return nil, err + } + return &Service{ + logger: logger.Named("treasury_service"), + repo: repo, + ledger: ledgerClient, + validator: validator, + executionDelay: executionDelay, + }, nil +} + +func (s *Service) ExecutionDelay() time.Duration { + if s == nil { + return 0 + } + return s.executionDelay +} + +func (s *Service) MaxPerOperationLimit() string { + if s == nil || s.validator == nil { + return "" + } + return s.validator.MaxPerOperation() +} + +func (s *Service) GetActiveRequestForAccount(ctx context.Context, ledgerAccountID string) (*storagemodel.TreasuryRequest, error) { + if s == nil || s.repo == nil { + return nil, merrors.Internal("treasury service unavailable") + } + return s.repo.FindActiveByLedgerAccountID(ctx, ledgerAccountID) +} + +func (s *Service) GetRequest(ctx context.Context, requestID string) (*storagemodel.TreasuryRequest, error) { + if s == nil || s.repo == nil { + return nil, merrors.Internal("treasury service unavailable") + } + return s.repo.FindByRequestID(ctx, requestID) +} + +func (s *Service) CreateRequest(ctx context.Context, input CreateRequestInput) (*storagemodel.TreasuryRequest, error) { + if s == nil || s.repo == nil || s.ledger == nil || s.validator == nil { + return nil, merrors.Internal("treasury service unavailable") + } + input.TelegramUserID = strings.TrimSpace(input.TelegramUserID) + input.LedgerAccountID = strings.TrimSpace(input.LedgerAccountID) + input.ChatID = strings.TrimSpace(input.ChatID) + input.Amount = strings.TrimSpace(input.Amount) + + switch input.OperationType { + case storagemodel.TreasuryOperationFund, storagemodel.TreasuryOperationWithdraw: + default: + return nil, merrors.InvalidArgument("treasury operation is invalid", "operation_type") + } + if input.TelegramUserID == "" { + return nil, merrors.InvalidArgument("telegram_user_id is required", "telegram_user_id") + } + if input.LedgerAccountID == "" { + return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id") + } + if input.ChatID == "" { + return nil, merrors.InvalidArgument("chat_id is required", "chat_id") + } + + active, err := s.repo.FindActiveByLedgerAccountID(ctx, input.LedgerAccountID) + if err != nil { + return nil, err + } + if active != nil { + return active, ErrActiveTreasuryRequest + } + + amountRat, normalizedAmount, err := s.validator.ValidateAmount(input.Amount) + if err != nil { + return nil, err + } + if err := s.validator.ValidateDailyLimit(ctx, input.LedgerAccountID, amountRat, time.Now()); err != nil { + return nil, err + } + + account, err := s.ledger.GetAccount(ctx, input.LedgerAccountID) + if err != nil { + return nil, err + } + if account == nil || strings.TrimSpace(account.Currency) == "" { + return nil, merrors.Internal("ledger account currency is unavailable") + } + if strings.TrimSpace(account.OrganizationRef) == "" { + return nil, merrors.Internal("ledger account organization is unavailable") + } + + requestID := newRequestID() + record := &storagemodel.TreasuryRequest{ + RequestID: requestID, + OperationType: input.OperationType, + TelegramUserID: input.TelegramUserID, + LedgerAccountID: input.LedgerAccountID, + OrganizationRef: account.OrganizationRef, + ChatID: input.ChatID, + Amount: normalizedAmount, + Currency: strings.ToUpper(strings.TrimSpace(account.Currency)), + Status: storagemodel.TreasuryRequestStatusCreated, + IdempotencyKey: fmt.Sprintf("tgsettle:%s", requestID), + Active: true, + } + if err := s.repo.Create(ctx, record); err != nil { + if errors.Is(err, storage.ErrDuplicate) { + active, fetchErr := s.repo.FindActiveByLedgerAccountID(ctx, input.LedgerAccountID) + if fetchErr != nil { + return nil, fetchErr + } + if active != nil { + return active, ErrActiveTreasuryRequest + } + return nil, err + } + return nil, err + } + + s.logRequest(record, "created", nil) + return record, nil +} + +func (s *Service) ConfirmRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) { + requestID = strings.TrimSpace(requestID) + telegramUserID = strings.TrimSpace(telegramUserID) + if requestID == "" { + return nil, merrors.InvalidArgument("request_id is required", "request_id") + } + record, err := s.repo.FindByRequestID(ctx, requestID) + if err != nil { + return nil, err + } + if record == nil { + return nil, merrors.NoData("treasury request not found") + } + if telegramUserID != "" && record.TelegramUserID != telegramUserID { + return nil, merrors.Unauthorized("treasury request ownership mismatch") + } + + switch record.Status { + case storagemodel.TreasuryRequestStatusScheduled: + return record, nil + case storagemodel.TreasuryRequestStatusCreated, storagemodel.TreasuryRequestStatusConfirmed: + now := time.Now() + record.ConfirmedAt = now + record.ScheduledAt = now.Add(s.executionDelay) + record.Status = storagemodel.TreasuryRequestStatusScheduled + record.Active = true + record.ErrorMessage = "" + default: + return nil, merrors.InvalidArgument("treasury request cannot be confirmed in current status", "status") + } + if err := s.repo.Update(ctx, record); err != nil { + return nil, err + } + s.logRequest(record, "scheduled", nil) + return record, nil +} + +func (s *Service) CancelRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) { + requestID = strings.TrimSpace(requestID) + telegramUserID = strings.TrimSpace(telegramUserID) + if requestID == "" { + return nil, merrors.InvalidArgument("request_id is required", "request_id") + } + record, err := s.repo.FindByRequestID(ctx, requestID) + if err != nil { + return nil, err + } + if record == nil { + return nil, merrors.NoData("treasury request not found") + } + if telegramUserID != "" && record.TelegramUserID != telegramUserID { + return nil, merrors.Unauthorized("treasury request ownership mismatch") + } + + switch record.Status { + case storagemodel.TreasuryRequestStatusCancelled: + return record, nil + case storagemodel.TreasuryRequestStatusCreated, storagemodel.TreasuryRequestStatusConfirmed, storagemodel.TreasuryRequestStatusScheduled: + record.Status = storagemodel.TreasuryRequestStatusCancelled + record.CancelledAt = time.Now() + record.Active = false + default: + return nil, merrors.InvalidArgument("treasury request cannot be cancelled in current status", "status") + } + + if err := s.repo.Update(ctx, record); err != nil { + return nil, err + } + s.logRequest(record, "cancelled", nil) + return record, nil +} + +func (s *Service) ExecuteRequest(ctx context.Context, requestID string) (*ExecutionResult, error) { + requestID = strings.TrimSpace(requestID) + if requestID == "" { + return nil, merrors.InvalidArgument("request_id is required", "request_id") + } + record, err := s.repo.FindByRequestID(ctx, requestID) + if err != nil { + return nil, err + } + if record == nil { + return nil, nil + } + + switch record.Status { + case storagemodel.TreasuryRequestStatusExecuted, + storagemodel.TreasuryRequestStatusCancelled, + storagemodel.TreasuryRequestStatusFailed: + return nil, nil + case storagemodel.TreasuryRequestStatusScheduled: + claimed, err := s.repo.ClaimScheduled(ctx, requestID) + if err != nil { + return nil, err + } + if !claimed { + return nil, nil + } + record, err = s.repo.FindByRequestID(ctx, requestID) + if err != nil { + return nil, err + } + if record == nil { + return nil, nil + } + } + + if record.Status != storagemodel.TreasuryRequestStatusConfirmed { + return nil, nil + } + return s.executeClaimed(ctx, record) +} + +func (s *Service) executeClaimed(ctx context.Context, record *storagemodel.TreasuryRequest) (*ExecutionResult, error) { + if record == nil { + return nil, merrors.InvalidArgument("treasury request is required", "request") + } + postReq := ledger.PostRequest{ + AccountID: record.LedgerAccountID, + OrganizationRef: record.OrganizationRef, + Amount: record.Amount, + Currency: record.Currency, + Reference: record.RequestID, + IdempotencyKey: record.IdempotencyKey, + } + + var ( + opResult *ledger.OperationResult + err error + ) + switch record.OperationType { + case storagemodel.TreasuryOperationFund: + opResult, err = s.ledger.ExternalCredit(ctx, postReq) + case storagemodel.TreasuryOperationWithdraw: + opResult, err = s.ledger.ExternalDebit(ctx, postReq) + default: + err = merrors.InvalidArgument("treasury operation is invalid", "operation_type") + } + now := time.Now() + if err != nil { + record.Status = storagemodel.TreasuryRequestStatusFailed + record.Active = false + record.ExecutedAt = now + record.ErrorMessage = strings.TrimSpace(err.Error()) + if saveErr := s.repo.Update(ctx, record); saveErr != nil { + return nil, saveErr + } + s.logRequest(record, "failed", err) + return &ExecutionResult{ + Request: record, + ExecutionError: err, + }, nil + } + + if opResult != nil { + record.LedgerReference = strings.TrimSpace(opResult.Reference) + } + record.Status = storagemodel.TreasuryRequestStatusExecuted + record.Active = false + record.ExecutedAt = now + record.ErrorMessage = "" + + balance, balanceErr := s.ledger.GetBalance(ctx, record.LedgerAccountID) + if balanceErr != nil { + record.ErrorMessage = strings.TrimSpace(balanceErr.Error()) + } + + if saveErr := s.repo.Update(ctx, record); saveErr != nil { + return nil, saveErr + } + s.logRequest(record, "executed", nil) + return &ExecutionResult{ + Request: record, + NewBalance: balance, + ExecutionError: balanceErr, + }, nil +} + +func (s *Service) DueRequests(ctx context.Context, statuses []storagemodel.TreasuryRequestStatus, now time.Time, limit int64) ([]*storagemodel.TreasuryRequest, error) { + if s == nil || s.repo == nil { + return nil, merrors.Internal("treasury service unavailable") + } + return s.repo.FindDueByStatus(ctx, statuses, now, limit) +} + +func (s *Service) ScheduledRequests(ctx context.Context, limit int64) ([]*storagemodel.TreasuryRequest, error) { + if s == nil || s.repo == nil { + return nil, merrors.Internal("treasury service unavailable") + } + return s.repo.FindDueByStatus( + ctx, + []storagemodel.TreasuryRequestStatus{storagemodel.TreasuryRequestStatusScheduled}, + time.Now().Add(10*365*24*time.Hour), + limit, + ) +} + +func (s *Service) ParseAmount(value string) (*big.Rat, error) { + return parseAmountRat(value) +} + +func (s *Service) logRequest(record *storagemodel.TreasuryRequest, status string, err error) { + if s == nil || s.logger == nil || record == nil { + return + } + fields := []zap.Field{ + zap.String("request_id", strings.TrimSpace(record.RequestID)), + zap.String("telegram_user_id", strings.TrimSpace(record.TelegramUserID)), + zap.String("ledger_account_id", strings.TrimSpace(record.LedgerAccountID)), + zap.String("operation_type", strings.TrimSpace(string(record.OperationType))), + zap.String("amount", strings.TrimSpace(record.Amount)), + zap.String("currency", strings.TrimSpace(record.Currency)), + zap.String("status", status), + } + if err != nil { + fields = append(fields, zap.Error(err)) + } + s.logger.Info("treasury_request", fields...) +} + +func newRequestID() string { + return "TGSETTLE-" + strings.ToUpper(bson.NewObjectID().Hex()[:8]) +} diff --git a/api/gateway/tgsettle/internal/service/treasury/validator.go b/api/gateway/tgsettle/internal/service/treasury/validator.go new file mode 100644 index 00000000..eec5e4d8 --- /dev/null +++ b/api/gateway/tgsettle/internal/service/treasury/validator.go @@ -0,0 +1,181 @@ +package treasury + +import ( + "context" + "math/big" + "regexp" + "strings" + "time" + + "github.com/tech/sendico/gateway/tgsettle/storage" + storagemodel "github.com/tech/sendico/gateway/tgsettle/storage/model" + "github.com/tech/sendico/pkg/merrors" +) + +var treasuryAmountPattern = regexp.MustCompile(`^[0-9]+(\.[0-9]+)?$`) + +type LimitKind string + +const ( + LimitKindPerOperation LimitKind = "per_operation" + LimitKindDaily LimitKind = "daily" +) + +type LimitError struct { + Kind LimitKind + Max string +} + +func (e *LimitError) Error() string { + if e == nil { + return "limit exceeded" + } + switch e.Kind { + case LimitKindPerOperation: + return "max amount per operation exceeded" + case LimitKindDaily: + return "max daily amount exceeded" + default: + return "limit exceeded" + } +} + +func (e *LimitError) LimitKind() string { + if e == nil { + return "" + } + return string(e.Kind) +} + +func (e *LimitError) LimitMax() string { + if e == nil { + return "" + } + return e.Max +} + +type Validator struct { + repo storage.TreasuryRequestsStore + + maxPerOperation *big.Rat + maxDaily *big.Rat + + maxPerOperationRaw string + maxDailyRaw string +} + +func NewValidator(repo storage.TreasuryRequestsStore, maxPerOperation string, maxDaily string) (*Validator, error) { + validator := &Validator{ + repo: repo, + maxPerOperationRaw: strings.TrimSpace(maxPerOperation), + maxDailyRaw: strings.TrimSpace(maxDaily), + } + if validator.maxPerOperationRaw != "" { + value, err := parseAmountRat(validator.maxPerOperationRaw) + if err != nil { + return nil, merrors.InvalidArgument("treasury max_amount_per_operation is invalid", "treasury.limits.max_amount_per_operation") + } + validator.maxPerOperation = value + } + if validator.maxDailyRaw != "" { + value, err := parseAmountRat(validator.maxDailyRaw) + if err != nil { + return nil, merrors.InvalidArgument("treasury max_daily_amount is invalid", "treasury.limits.max_daily_amount") + } + validator.maxDaily = value + } + return validator, nil +} + +func (v *Validator) MaxPerOperation() string { + if v == nil { + return "" + } + return v.maxPerOperationRaw +} + +func (v *Validator) MaxDaily() string { + if v == nil { + return "" + } + return v.maxDailyRaw +} + +func (v *Validator) ValidateAmount(amount string) (*big.Rat, string, error) { + amount = strings.TrimSpace(amount) + value, err := parseAmountRat(amount) + if err != nil { + return nil, "", err + } + if v != nil && v.maxPerOperation != nil && value.Cmp(v.maxPerOperation) > 0 { + return nil, "", &LimitError{ + Kind: LimitKindPerOperation, + Max: v.maxPerOperationRaw, + } + } + return value, amount, nil +} + +func (v *Validator) ValidateDailyLimit(ctx context.Context, ledgerAccountID string, amount *big.Rat, now time.Time) error { + if v == nil || v.maxDaily == nil || v.repo == nil { + return nil + } + if amount == nil { + return merrors.InvalidArgument("amount is required", "amount") + } + dayStart := time.Date(now.UTC().Year(), now.UTC().Month(), now.UTC().Day(), 0, 0, 0, 0, time.UTC) + dayEnd := dayStart.Add(24 * time.Hour) + + records, err := v.repo.ListByAccountAndStatuses( + ctx, + ledgerAccountID, + []storagemodel.TreasuryRequestStatus{ + storagemodel.TreasuryRequestStatusCreated, + storagemodel.TreasuryRequestStatusConfirmed, + storagemodel.TreasuryRequestStatusScheduled, + storagemodel.TreasuryRequestStatusExecuted, + }, + dayStart, + dayEnd, + ) + if err != nil { + return err + } + total := new(big.Rat) + for _, record := range records { + if record == nil { + continue + } + next, err := parseAmountRat(record.Amount) + if err != nil { + return merrors.Internal("treasury request amount is invalid") + } + total.Add(total, next) + } + total.Add(total, amount) + if total.Cmp(v.maxDaily) > 0 { + return &LimitError{ + Kind: LimitKindDaily, + Max: v.maxDailyRaw, + } + } + return nil +} + +func parseAmountRat(value string) (*big.Rat, error) { + value = strings.TrimSpace(value) + if value == "" { + return nil, merrors.InvalidArgument("amount is required", "amount") + } + if !treasuryAmountPattern.MatchString(value) { + return nil, merrors.InvalidArgument("amount format is invalid", "amount") + } + amount := new(big.Rat) + if _, ok := amount.SetString(value); !ok { + return nil, merrors.InvalidArgument("amount format is invalid", "amount") + } + if amount.Sign() <= 0 { + return nil, merrors.InvalidArgument("amount must be positive", "amount") + } + return amount, nil +} diff --git a/api/gateway/tgsettle/storage/model/storable.go b/api/gateway/tgsettle/storage/model/storable.go index 00f9d451..14b46044 100644 --- a/api/gateway/tgsettle/storage/model/storable.go +++ b/api/gateway/tgsettle/storage/model/storable.go @@ -4,6 +4,7 @@ const ( paymentsCollection = "payments" telegramConfirmationsCollection = "telegram_confirmations" pendingConfirmationsCollection = "pending_confirmations" + treasuryRequestsCollection = "treasury_requests" ) func (*PaymentRecord) Collection() string { @@ -17,3 +18,7 @@ func (*TelegramConfirmation) Collection() string { func (*PendingConfirmation) Collection() string { return pendingConfirmationsCollection } + +func (*TreasuryRequest) Collection() string { + return treasuryRequestsCollection +} diff --git a/api/gateway/tgsettle/storage/model/treasury.go b/api/gateway/tgsettle/storage/model/treasury.go new file mode 100644 index 00000000..e32876c0 --- /dev/null +++ b/api/gateway/tgsettle/storage/model/treasury.go @@ -0,0 +1,50 @@ +package model + +import ( + "time" + + "github.com/tech/sendico/pkg/db/storable" +) + +type TreasuryOperationType string + +const ( + TreasuryOperationFund TreasuryOperationType = "fund" + TreasuryOperationWithdraw TreasuryOperationType = "withdraw" +) + +type TreasuryRequestStatus string + +const ( + TreasuryRequestStatusCreated TreasuryRequestStatus = "created" + TreasuryRequestStatusConfirmed TreasuryRequestStatus = "confirmed" + TreasuryRequestStatusScheduled TreasuryRequestStatus = "scheduled" + TreasuryRequestStatusExecuted TreasuryRequestStatus = "executed" + TreasuryRequestStatusCancelled TreasuryRequestStatus = "cancelled" + TreasuryRequestStatusFailed TreasuryRequestStatus = "failed" +) + +type TreasuryRequest struct { + storable.Base `bson:",inline" json:",inline"` + + RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"` + OperationType TreasuryOperationType `bson:"operationType,omitempty" json:"operation_type,omitempty"` + TelegramUserID string `bson:"telegramUserId,omitempty" json:"telegram_user_id,omitempty"` + LedgerAccountID string `bson:"ledgerAccountId,omitempty" json:"ledger_account_id,omitempty"` + OrganizationRef string `bson:"organizationRef,omitempty" json:"organization_ref,omitempty"` + ChatID string `bson:"chatId,omitempty" json:"chat_id,omitempty"` + Amount string `bson:"amount,omitempty" json:"amount,omitempty"` + Currency string `bson:"currency,omitempty" json:"currency,omitempty"` + Status TreasuryRequestStatus `bson:"status,omitempty" json:"status,omitempty"` + + ConfirmedAt time.Time `bson:"confirmedAt,omitempty" json:"confirmed_at,omitempty"` + ScheduledAt time.Time `bson:"scheduledAt,omitempty" json:"scheduled_at,omitempty"` + ExecutedAt time.Time `bson:"executedAt,omitempty" json:"executed_at,omitempty"` + CancelledAt time.Time `bson:"cancelledAt,omitempty" json:"cancelled_at,omitempty"` + + IdempotencyKey string `bson:"idempotencyKey,omitempty" json:"idempotency_key,omitempty"` + LedgerReference string `bson:"ledgerReference,omitempty" json:"ledger_reference,omitempty"` + ErrorMessage string `bson:"errorMessage,omitempty" json:"error_message,omitempty"` + + Active bool `bson:"active,omitempty" json:"active,omitempty"` +} diff --git a/api/gateway/tgsettle/storage/mongo/repository.go b/api/gateway/tgsettle/storage/mongo/repository.go index 9abb8bab..146411d4 100644 --- a/api/gateway/tgsettle/storage/mongo/repository.go +++ b/api/gateway/tgsettle/storage/mongo/repository.go @@ -24,6 +24,7 @@ type Repository struct { payments storage.PaymentsStore tg storage.TelegramConfirmationsStore pending storage.PendingConfirmationsStore + treasury storage.TreasuryRequestsStore outbox gatewayoutbox.Store } @@ -74,6 +75,11 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) { result.logger.Error("Failed to initialise pending confirmations store", zap.Error(err), zap.String("store", "pending_confirmations")) return nil, err } + treasuryStore, err := store.NewTreasuryRequests(result.logger, result.db) + if err != nil { + result.logger.Error("Failed to initialise treasury requests store", zap.Error(err), zap.String("store", "treasury_requests")) + return nil, err + } outboxStore, err := gatewayoutbox.NewMongoStore(result.logger, result.db) if err != nil { result.logger.Error("Failed to initialise outbox store", zap.Error(err), zap.String("store", "outbox")) @@ -82,6 +88,7 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) { result.payments = paymentsStore result.tg = tgStore result.pending = pendingStore + result.treasury = treasuryStore result.outbox = outboxStore result.logger.Info("Payment gateway MongoDB storage initialised") return result, nil @@ -99,6 +106,10 @@ func (r *Repository) PendingConfirmations() storage.PendingConfirmationsStore { return r.pending } +func (r *Repository) TreasuryRequests() storage.TreasuryRequestsStore { + return r.treasury +} + func (r *Repository) Outbox() gatewayoutbox.Store { return r.outbox } diff --git a/api/gateway/tgsettle/storage/mongo/store/treasury_requests.go b/api/gateway/tgsettle/storage/mongo/store/treasury_requests.go new file mode 100644 index 00000000..9c27748a --- /dev/null +++ b/api/gateway/tgsettle/storage/mongo/store/treasury_requests.go @@ -0,0 +1,311 @@ +package store + +import ( + "context" + "errors" + "strings" + "time" + + "github.com/tech/sendico/gateway/tgsettle/storage" + "github.com/tech/sendico/gateway/tgsettle/storage/model" + "github.com/tech/sendico/pkg/db/repository" + "github.com/tech/sendico/pkg/db/repository/builder" + ri "github.com/tech/sendico/pkg/db/repository/index" + "github.com/tech/sendico/pkg/merrors" + "github.com/tech/sendico/pkg/mlogger" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.uber.org/zap" +) + +const ( + treasuryRequestsCollection = "treasury_requests" + + fieldTreasuryRequestID = "requestId" + fieldTreasuryLedgerAccount = "ledgerAccountId" + fieldTreasuryIdempotencyKey = "idempotencyKey" + fieldTreasuryStatus = "status" + fieldTreasuryScheduledAt = "scheduledAt" + fieldTreasuryCreatedAt = "createdAt" + fieldTreasuryActive = "active" +) + +type TreasuryRequests struct { + logger mlogger.Logger + repo repository.Repository +} + +func NewTreasuryRequests(logger mlogger.Logger, db *mongo.Database) (*TreasuryRequests, error) { + if db == nil { + return nil, merrors.InvalidArgument("mongo database is nil") + } + if logger == nil { + logger = zap.NewNop() + } + logger = logger.Named("treasury_requests").With(zap.String("collection", treasuryRequestsCollection)) + + repo := repository.CreateMongoRepository(db, treasuryRequestsCollection) + if err := repo.CreateIndex(&ri.Definition{ + Keys: []ri.Key{{Field: fieldTreasuryRequestID, Sort: ri.Asc}}, + Unique: true, + }); err != nil { + logger.Error("Failed to create treasury requests request_id index", zap.Error(err), zap.String("index_field", fieldTreasuryRequestID)) + return nil, err + } + if err := repo.CreateIndex(&ri.Definition{ + Keys: []ri.Key{{Field: fieldTreasuryIdempotencyKey, Sort: ri.Asc}}, + Unique: true, + }); err != nil { + logger.Error("Failed to create treasury requests idempotency index", zap.Error(err), zap.String("index_field", fieldTreasuryIdempotencyKey)) + return nil, err + } + if err := repo.CreateIndex(&ri.Definition{ + Keys: []ri.Key{ + {Field: fieldTreasuryLedgerAccount, Sort: ri.Asc}, + {Field: fieldTreasuryActive, Sort: ri.Asc}, + }, + Unique: true, + PartialFilter: repository.Filter(fieldTreasuryActive, true), + }); err != nil { + logger.Error("Failed to create treasury requests active-account index", zap.Error(err)) + return nil, err + } + if err := repo.CreateIndex(&ri.Definition{ + Keys: []ri.Key{ + {Field: fieldTreasuryStatus, Sort: ri.Asc}, + {Field: fieldTreasuryScheduledAt, Sort: ri.Asc}, + }, + }); err != nil { + logger.Error("Failed to create treasury requests execution index", zap.Error(err)) + return nil, err + } + if err := repo.CreateIndex(&ri.Definition{ + Keys: []ri.Key{ + {Field: fieldTreasuryLedgerAccount, Sort: ri.Asc}, + {Field: fieldTreasuryCreatedAt, Sort: ri.Asc}, + }, + }); err != nil { + logger.Error("Failed to create treasury requests daily-amount index", zap.Error(err)) + return nil, err + } + + t := &TreasuryRequests{ + logger: logger, + repo: repo, + } + t.logger.Debug("Treasury requests store initialised") + return t, nil +} + +func (t *TreasuryRequests) Create(ctx context.Context, record *model.TreasuryRequest) error { + if record == nil { + return merrors.InvalidArgument("treasury request is nil", "record") + } + record.RequestID = strings.TrimSpace(record.RequestID) + record.TelegramUserID = strings.TrimSpace(record.TelegramUserID) + record.LedgerAccountID = strings.TrimSpace(record.LedgerAccountID) + record.OrganizationRef = strings.TrimSpace(record.OrganizationRef) + record.ChatID = strings.TrimSpace(record.ChatID) + record.Amount = strings.TrimSpace(record.Amount) + record.Currency = strings.ToUpper(strings.TrimSpace(record.Currency)) + record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey) + record.LedgerReference = strings.TrimSpace(record.LedgerReference) + record.ErrorMessage = strings.TrimSpace(record.ErrorMessage) + + if record.RequestID == "" { + return merrors.InvalidArgument("request_id is required", "request_id") + } + if record.TelegramUserID == "" { + return merrors.InvalidArgument("telegram_user_id is required", "telegram_user_id") + } + if record.LedgerAccountID == "" { + return merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id") + } + if record.Amount == "" { + return merrors.InvalidArgument("amount is required", "amount") + } + if record.Currency == "" { + return merrors.InvalidArgument("currency is required", "currency") + } + if record.IdempotencyKey == "" { + return merrors.InvalidArgument("idempotency_key is required", "idempotency_key") + } + if record.Status == "" { + return merrors.InvalidArgument("status is required", "status") + } + + now := time.Now() + if record.CreatedAt.IsZero() { + record.CreatedAt = now + } + record.UpdatedAt = now + record.ID = bson.NilObjectID + + err := t.repo.Insert(ctx, record, repository.Filter(fieldTreasuryRequestID, record.RequestID)) + if errors.Is(err, merrors.ErrDataConflict) { + return storage.ErrDuplicate + } + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + t.logger.Warn("Failed to create treasury request", zap.Error(err), zap.String("request_id", record.RequestID)) + } + return err +} + +func (t *TreasuryRequests) FindByRequestID(ctx context.Context, requestID string) (*model.TreasuryRequest, error) { + requestID = strings.TrimSpace(requestID) + if requestID == "" { + return nil, merrors.InvalidArgument("request_id is required", "request_id") + } + var result model.TreasuryRequest + err := t.repo.FindOneByFilter(ctx, repository.Filter(fieldTreasuryRequestID, requestID), &result) + if errors.Is(err, merrors.ErrNoData) { + return nil, nil + } + if err != nil { + return nil, err + } + return &result, nil +} + +func (t *TreasuryRequests) FindActiveByLedgerAccountID(ctx context.Context, ledgerAccountID string) (*model.TreasuryRequest, error) { + ledgerAccountID = strings.TrimSpace(ledgerAccountID) + if ledgerAccountID == "" { + return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id") + } + var result model.TreasuryRequest + query := repository.Query(). + Filter(repository.Field(fieldTreasuryLedgerAccount), ledgerAccountID). + Filter(repository.Field(fieldTreasuryActive), true) + err := t.repo.FindOneByFilter(ctx, query, &result) + if errors.Is(err, merrors.ErrNoData) { + return nil, nil + } + if err != nil { + return nil, err + } + return &result, nil +} + +func (t *TreasuryRequests) FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]*model.TreasuryRequest, error) { + if len(statuses) == 0 { + return nil, nil + } + if limit <= 0 { + limit = 100 + } + statusValues := make([]any, 0, len(statuses)) + for _, status := range statuses { + next := strings.TrimSpace(string(status)) + if next == "" { + continue + } + statusValues = append(statusValues, next) + } + if len(statusValues) == 0 { + return nil, nil + } + query := repository.Query(). + In(repository.Field(fieldTreasuryStatus), statusValues...). + Comparison(repository.Field(fieldTreasuryScheduledAt), builder.Lte, now). + Sort(repository.Field(fieldTreasuryScheduledAt), true). + Limit(&limit) + + result := make([]*model.TreasuryRequest, 0) + err := t.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error { + next := &model.TreasuryRequest{} + if err := cur.Decode(next); err != nil { + return err + } + result = append(result, next) + return nil + }) + if err != nil && !errors.Is(err, merrors.ErrNoData) { + return nil, err + } + return result, nil +} + +func (t *TreasuryRequests) ClaimScheduled(ctx context.Context, requestID string) (bool, error) { + requestID = strings.TrimSpace(requestID) + if requestID == "" { + return false, merrors.InvalidArgument("request_id is required", "request_id") + } + patch := repository.Patch(). + Set(repository.Field(fieldTreasuryStatus), string(model.TreasuryRequestStatusConfirmed)). + Set(repository.Field("updatedAt"), time.Now()) + updated, err := t.repo.PatchMany(ctx, repository.Filter(fieldTreasuryRequestID, requestID).And( + repository.Filter(fieldTreasuryStatus, string(model.TreasuryRequestStatusScheduled)), + ), patch) + if err != nil { + return false, err + } + return updated > 0, nil +} + +func (t *TreasuryRequests) Update(ctx context.Context, record *model.TreasuryRequest) error { + if record == nil { + return merrors.InvalidArgument("treasury request is nil", "record") + } + record.RequestID = strings.TrimSpace(record.RequestID) + if record.RequestID == "" { + return merrors.InvalidArgument("request_id is required", "request_id") + } + existing, err := t.FindByRequestID(ctx, record.RequestID) + if err != nil { + return err + } + if existing == nil { + return merrors.NoData("treasury request not found") + } + record.ID = existing.ID + if record.CreatedAt.IsZero() { + record.CreatedAt = existing.CreatedAt + } + record.UpdatedAt = time.Now() + if err := t.repo.Update(ctx, record); err != nil { + if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + t.logger.Warn("Failed to update treasury request", zap.Error(err), zap.String("request_id", record.RequestID)) + } + return err + } + return nil +} + +func (t *TreasuryRequests) ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]*model.TreasuryRequest, error) { + ledgerAccountID = strings.TrimSpace(ledgerAccountID) + if ledgerAccountID == "" { + return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id") + } + statusValues := make([]any, 0, len(statuses)) + for _, status := range statuses { + next := strings.TrimSpace(string(status)) + if next == "" { + continue + } + statusValues = append(statusValues, next) + } + if len(statusValues) == 0 { + return nil, nil + } + query := repository.Query(). + Filter(repository.Field(fieldTreasuryLedgerAccount), ledgerAccountID). + In(repository.Field(fieldTreasuryStatus), statusValues...). + Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Gte, dayStart). + Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Lt, dayEnd) + + result := make([]*model.TreasuryRequest, 0) + err := t.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error { + next := &model.TreasuryRequest{} + if err := cur.Decode(next); err != nil { + return err + } + result = append(result, next) + return nil + }) + if err != nil && !errors.Is(err, merrors.ErrNoData) { + return nil, err + } + return result, nil +} + +var _ storage.TreasuryRequestsStore = (*TreasuryRequests)(nil) diff --git a/api/gateway/tgsettle/storage/storage.go b/api/gateway/tgsettle/storage/storage.go index 72eb42b1..26fbce9e 100644 --- a/api/gateway/tgsettle/storage/storage.go +++ b/api/gateway/tgsettle/storage/storage.go @@ -14,6 +14,7 @@ type Repository interface { Payments() PaymentsStore TelegramConfirmations() TelegramConfirmationsStore PendingConfirmations() PendingConfirmationsStore + TreasuryRequests() TreasuryRequestsStore } type PaymentsStore interface { @@ -35,3 +36,13 @@ type PendingConfirmationsStore interface { DeleteByRequestID(ctx context.Context, requestID string) error ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) } + +type TreasuryRequestsStore interface { + Create(ctx context.Context, record *model.TreasuryRequest) error + FindByRequestID(ctx context.Context, requestID string) (*model.TreasuryRequest, error) + FindActiveByLedgerAccountID(ctx context.Context, ledgerAccountID string) (*model.TreasuryRequest, error) + FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]*model.TreasuryRequest, error) + ClaimScheduled(ctx context.Context, requestID string) (bool, error) + Update(ctx context.Context, record *model.TreasuryRequest) error + ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]*model.TreasuryRequest, error) +} diff --git a/api/ledger/storage/mongo/store/accounts.go b/api/ledger/storage/mongo/store/accounts.go index c1221343..ecc67985 100644 --- a/api/ledger/storage/mongo/store/accounts.go +++ b/api/ledger/storage/mongo/store/accounts.go @@ -3,11 +3,11 @@ package store import ( "context" "errors" + "fmt" "strings" "github.com/tech/sendico/ledger/storage" "github.com/tech/sendico/pkg/db/repository" - "github.com/tech/sendico/pkg/db/repository/builder" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" @@ -26,10 +26,27 @@ type accountsStore struct { } const ( - orgCurrencyRoleNonOperatingIndex = "org_currency_role_non_operating_unique" + orgCurrencyRoleNonOperatingPrefix = "org_currency_role_non_operating_unique" orgCurrencyRoleSystemOperatingName = "org_currency_role_system_operating_unique" ) +var nonOperatingUniqueRoles = []account_role.AccountRole{ + account_role.AccountRoleHold, + account_role.AccountRoleTransit, + account_role.AccountRoleSettlement, + account_role.AccountRoleClearing, + account_role.AccountRolePending, + account_role.AccountRoleReserve, + account_role.AccountRoleLiquidity, + account_role.AccountRoleFee, + account_role.AccountRoleChargeback, + account_role.AccountRoleAdjustment, +} + +func nonOperatingRoleIndexName(role account_role.AccountRole) string { + return fmt.Sprintf("%s_%s", orgCurrencyRoleNonOperatingPrefix, role) +} + func NewAccounts(logger mlogger.Logger, db *mongo.Database) (storage.AccountsStore, error) { repo := repository.CreateMongoRepository(db, mservice.LedgerAccounts) @@ -48,21 +65,25 @@ func NewAccounts(logger mlogger.Logger, db *mongo.Database) (storage.AccountsSto } // Keep role uniqueness for non-operating organization accounts. - roleIndex := &ri.Definition{ - Keys: []ri.Key{ - {Field: "organizationRef", Sort: ri.Asc}, - {Field: "currency", Sort: ri.Asc}, - {Field: "role", Sort: ri.Asc}, - }, - Unique: true, - Name: orgCurrencyRoleNonOperatingIndex, - PartialFilter: repository.Query(). - Filter(repository.Field("scope"), pkm.LedgerAccountScopeOrganization). - Comparison(repository.Field("role"), builder.Ne, account_role.AccountRoleOperating), - } - if err := repo.CreateIndex(roleIndex); err != nil { - logger.Error("Failed to ensure accounts role index", zap.Error(err)) - return nil, err + // Some Mongo-compatible backends reject partial filters that use negation ($ne/$not). + // Build one equality-based partial index per non-operating role for compatibility. + for _, role := range nonOperatingUniqueRoles { + roleIndex := &ri.Definition{ + Keys: []ri.Key{ + {Field: "organizationRef", Sort: ri.Asc}, + {Field: "currency", Sort: ri.Asc}, + {Field: "role", Sort: ri.Asc}, + }, + Unique: true, + Name: nonOperatingRoleIndexName(role), + PartialFilter: repository.Query(). + Filter(repository.Field("scope"), pkm.LedgerAccountScopeOrganization). + Filter(repository.Field("role"), role), + } + if err := repo.CreateIndex(roleIndex); err != nil { + logger.Error("Failed to ensure accounts role index", zap.String("role", string(role)), zap.Error(err)) + return nil, err + } } // Ensure only one system-tagged operating role per organization/currency. -- 2.49.1