From 5e59fea7e53d135664571c3cd9339422e5ac8fa4 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Thu, 5 Mar 2026 11:54:07 +0100 Subject: [PATCH] improved tgsettle messages + storage fixes --- .../service/gateway/confirmation_flow.go | 5 +- .../internal/service/gateway/service_test.go | 7 +- .../internal/service/treasury/bot/commands.go | 28 ++++ .../internal/service/treasury/bot/router.go | 157 +++++++++++++++++- .../service/treasury/bot/router_test.go | 47 +++++- .../service/treasury/ledger/client.go | 29 +++- .../internal/service/treasury/module.go | 18 ++ .../internal/service/treasury/scheduler.go | 58 ++++++- .../internal/service/treasury/service.go | 72 ++++++-- .../internal/service/treasury/validator.go | 3 - .../tgsettle/storage/model/treasury.go | 19 ++- .../tgsettle/storage/mongo/store/payments.go | 43 ++--- .../mongo/store/pending_confirmations.go | 55 ++---- .../mongo/store/telegram_confirmations.go | 27 +-- .../storage/mongo/store/treasury_requests.go | 135 +++++++++++---- api/gateway/tgsettle/storage/storage.go | 6 +- 16 files changed, 537 insertions(+), 172 deletions(-) diff --git a/api/gateway/tgsettle/internal/service/gateway/confirmation_flow.go b/api/gateway/tgsettle/internal/service/gateway/confirmation_flow.go index ba232be0..21587340 100644 --- a/api/gateway/tgsettle/internal/service/gateway/confirmation_flow.go +++ b/api/gateway/tgsettle/internal/service/gateway/confirmation_flow.go @@ -55,8 +55,9 @@ func (s *Service) sweepExpiredConfirmations(ctx context.Context) { s.logger.Warn("Failed to list expired pending confirmations", zap.Error(err)) return } - for _, pending := range expired { - if pending == nil || strings.TrimSpace(pending.RequestID) == "" { + for i := range expired { + pending := &expired[i] + if strings.TrimSpace(pending.RequestID) == "" { continue } result := &model.ConfirmationResult{ diff --git a/api/gateway/tgsettle/internal/service/gateway/service_test.go b/api/gateway/tgsettle/internal/service/gateway/service_test.go index b40f7833..1ae84c4a 100644 --- a/api/gateway/tgsettle/internal/service/gateway/service_test.go +++ b/api/gateway/tgsettle/internal/service/gateway/service_test.go @@ -162,19 +162,18 @@ func (f *fakePendingStore) DeleteByRequestID(_ context.Context, requestID string return nil } -func (f *fakePendingStore) ListExpired(_ context.Context, now time.Time, limit int64) ([]*storagemodel.PendingConfirmation, error) { +func (f *fakePendingStore) ListExpired(_ context.Context, now time.Time, limit int64) ([]storagemodel.PendingConfirmation, error) { f.mu.Lock() defer f.mu.Unlock() if limit <= 0 { limit = 100 } - result := make([]*storagemodel.PendingConfirmation, 0) + result := make([]storagemodel.PendingConfirmation, 0) for _, record := range f.records { if record == nil || record.ExpiresAt.IsZero() || record.ExpiresAt.After(now) { continue } - cp := *record - result = append(result, &cp) + result = append(result, *record) if int64(len(result)) >= limit { break } diff --git a/api/gateway/tgsettle/internal/service/treasury/bot/commands.go b/api/gateway/tgsettle/internal/service/treasury/bot/commands.go index ad39e3e0..bc8ea944 100644 --- a/api/gateway/tgsettle/internal/service/treasury/bot/commands.go +++ b/api/gateway/tgsettle/internal/service/treasury/bot/commands.go @@ -6,6 +6,7 @@ type Command string const ( CommandStart Command = "start" + CommandHelp Command = "help" CommandFund Command = "fund" CommandWithdraw Command = "withdraw" CommandConfirm Command = "confirm" @@ -14,6 +15,7 @@ const ( var supportedCommands = []Command{ CommandStart, + CommandHelp, CommandFund, CommandWithdraw, CommandConfirm, @@ -56,3 +58,29 @@ func supportedCommandsMessage() string { func confirmationCommandsMessage() string { return "Confirm operation?\n\n" + CommandConfirm.Slash() + "\n" + CommandCancel.Slash() } + +func helpMessage(accountCode string, currency string) string { + accountCode = strings.TrimSpace(accountCode) + currency = strings.ToUpper(strings.TrimSpace(currency)) + if accountCode == "" { + accountCode = "N/A" + } + if currency == "" { + currency = "N/A" + } + + lines := []string{ + "Treasury bot help", + "", + "Attached account: " + accountCode + " (" + currency + ")", + "", + "How to use:", + "1) Start funding with " + CommandFund.Slash() + " or withdrawal with " + CommandWithdraw.Slash(), + "2) Enter amount as decimal, dot separator, no currency (example: 1250.75)", + "3) Confirm with " + CommandConfirm.Slash() + " or abort with " + CommandCancel.Slash(), + "", + "After confirmation there is a cooldown window. You can cancel during it with " + CommandCancel.Slash() + ".", + "You will receive a follow-up message with execution success or failure.", + } + return strings.Join(lines, "\n") +} diff --git a/api/gateway/tgsettle/internal/service/treasury/bot/router.go b/api/gateway/tgsettle/internal/service/treasury/bot/router.go index 752bd159..c6adf427 100644 --- a/api/gateway/tgsettle/internal/service/treasury/bot/router.go +++ b/api/gateway/tgsettle/internal/service/treasury/bot/router.go @@ -17,7 +17,7 @@ import ( const unauthorizedMessage = "Sorry, your Telegram account is not authorized to perform treasury operations." const unauthorizedChatMessage = "Sorry, this Telegram chat is not authorized to perform treasury operations." -var welcomeMessage = "Welcome to tgsettle treasury bot.\n\nUse " + CommandFund.Slash() + " to credit your account and " + CommandWithdraw.Slash() + " to debit it.\nAfter entering an amount, use " + CommandConfirm.Slash() + " or " + CommandCancel.Slash() + "." +const amountInputHint = "Enter amount as a decimal number using a dot separator and without currency.\nExample: 1250.75" type SendTextFunc func(ctx context.Context, chatID string, text string) error @@ -26,6 +26,12 @@ type ScheduleTracker interface { Untrack(requestID string) } +type AccountProfile struct { + AccountID string + AccountCode string + Currency string +} + type CreateRequestInput struct { OperationType storagemodel.TreasuryOperationType TelegramUserID string @@ -39,6 +45,7 @@ type TreasuryService interface { MaxPerOperationLimit() string GetActiveRequestForAccount(ctx context.Context, ledgerAccountID string) (*storagemodel.TreasuryRequest, error) + GetAccountProfile(ctx context.Context, ledgerAccountID string) (*AccountProfile, error) CreateRequest(ctx context.Context, input CreateRequestInput) (*storagemodel.TreasuryRequest, error) ConfirmRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) CancelRequest(ctx context.Context, requestID string, telegramUserID string) (*storagemodel.TreasuryRequest, error) @@ -119,6 +126,18 @@ func (r *Router) HandleUpdate(ctx context.Context, update *model.TelegramWebhook if chatID == "" || userID == "" { return false } + command := parseCommand(text) + if r.logger != nil { + r.logger.Debug("Telegram treasury update received", + zap.Int64("update_id", update.UpdateID), + zap.String("chat_id", chatID), + zap.String("telegram_user_id", userID), + zap.String("command", strings.TrimSpace(string(command))), + zap.String("message_text", text), + zap.String("reply_to_message_id", strings.TrimSpace(message.ReplyToMessageID)), + ) + } + if !r.allowAnyChat { if _, ok := r.allowedChats[chatID]; !ok { r.logUnauthorized(update) @@ -134,21 +153,49 @@ func (r *Router) HandleUpdate(ctx context.Context, update *model.TelegramWebhook return true } - command := parseCommand(text) switch command { case CommandStart: - _ = r.sendText(ctx, chatID, welcomeMessage) + profile := r.resolveAccountProfile(ctx, accountID) + _ = r.sendText(ctx, chatID, welcomeMessage(profile)) + return true + case CommandHelp: + profile := r.resolveAccountProfile(ctx, accountID) + _ = r.sendText(ctx, chatID, helpMessage(displayAccountCode(profile, accountID), profile.Currency)) return true case CommandFund: + if r.logger != nil { + r.logger.Info("Treasury funding dialog requested", + zap.String("chat_id", chatID), + zap.String("telegram_user_id", userID), + zap.String("ledger_account_id", accountID)) + } r.startAmountDialog(ctx, userID, accountID, chatID, storagemodel.TreasuryOperationFund) return true case CommandWithdraw: + if r.logger != nil { + r.logger.Info("Treasury withdrawal dialog requested", + zap.String("chat_id", chatID), + zap.String("telegram_user_id", userID), + zap.String("ledger_account_id", accountID)) + } r.startAmountDialog(ctx, userID, accountID, chatID, storagemodel.TreasuryOperationWithdraw) return true case CommandConfirm: + if r.logger != nil { + r.logger.Info("Treasury confirmation requested", + zap.String("chat_id", chatID), + zap.String("telegram_user_id", userID), + zap.String("ledger_account_id", accountID)) + } r.confirm(ctx, userID, accountID, chatID) return true case CommandCancel: + if r.logger != nil { + r.logger.Info("Treasury cancellation requested", + zap.String("chat_id", chatID), + zap.String("telegram_user_id", userID), + zap.String("ledger_account_id", accountID)) + } r.cancel(ctx, userID, accountID, chatID) return true } @@ -182,7 +229,10 @@ func (r *Router) HandleUpdate(ctx context.Context, update *model.TelegramWebhook func (r *Router) startAmountDialog(ctx context.Context, userID, accountID, chatID string, operation storagemodel.TreasuryOperationType) { active, err := r.service.GetActiveRequestForAccount(ctx, accountID) if err != nil { - r.logger.Warn("Failed to check active treasury request", zap.Error(err), zap.String("telegram_user_id", userID), zap.String("ledger_account_id", accountID)) + if r.logger != nil { + r.logger.Warn("Failed to check active treasury request", zap.Error(err), zap.String("telegram_user_id", userID), zap.String("ledger_account_id", accountID)) + } + _ = r.sendText(ctx, chatID, "Unable to check pending treasury operations right now. Please try again.") return } if active != nil { @@ -199,7 +249,8 @@ func (r *Router) startAmountDialog(ctx context.Context, userID, accountID, chatI OperationType: operation, LedgerAccountID: accountID, }) - _ = r.sendText(ctx, chatID, "Enter amount:") + profile := r.resolveAccountProfile(ctx, accountID) + _ = r.sendText(ctx, chatID, amountPromptMessage(operation, profile, accountID)) } func (r *Router) captureAmount(ctx context.Context, userID, accountID, chatID string, operation storagemodel.TreasuryOperationType, amount string) { @@ -231,7 +282,7 @@ func (r *Router) captureAmount(ctx context.Context, userID, accountID, chatID st } } if errors.Is(err, merrors.ErrInvalidArg) { - _ = r.sendText(ctx, chatID, "Invalid amount.\n\nEnter another amount or "+CommandCancel.Slash()) + _ = r.sendText(ctx, chatID, "Invalid amount.\n\n"+amountInputHint+"\n\nEnter another amount or "+CommandCancel.Slash()) return } _ = r.sendText(ctx, chatID, "Failed to create treasury request.\n\nEnter another amount or "+CommandCancel.Slash()) @@ -276,7 +327,7 @@ func (r *Router) confirm(ctx context.Context, userID string, accountID string, c if delay < 0 { delay = 0 } - _ = r.sendText(ctx, chatID, "Operation confirmed.\n\nExecution scheduled in "+formatSeconds(delay)+".\n\nRequest ID: "+strings.TrimSpace(record.RequestID)) + _ = r.sendText(ctx, chatID, "Operation confirmed.\n\nExecution scheduled in "+formatSeconds(delay)+".\nYou can cancel during this cooldown with "+CommandCancel.Slash()+".\n\nYou will receive a follow-up message with execution success or failure.\n\nRequest ID: "+strings.TrimSpace(record.RequestID)) } func (r *Router) cancel(ctx context.Context, userID string, accountID string, chatID string) { @@ -315,7 +366,16 @@ func (r *Router) sendText(ctx context.Context, chatID string, text string) error if chatID == "" || text == "" { return nil } - return r.send(ctx, chatID, text) + if err := r.send(ctx, chatID, text); err != nil { + if r.logger != nil { + r.logger.Warn("Failed to send treasury bot response", + zap.Error(err), + zap.String("chat_id", chatID), + zap.String("message_text", text)) + } + return err + } + return nil } func (r *Router) logUnauthorized(update *model.TelegramWebhookUpdate) { @@ -337,6 +397,7 @@ func pendingRequestMessage(record *storagemodel.TreasuryRequest) string { return "You already have a pending treasury operation.\n\n" + CommandCancel.Slash() } return "You already have a pending treasury operation.\n\n" + + "Account: " + requestAccountDisplay(record) + "\n" + "Request ID: " + strings.TrimSpace(record.RequestID) + "\n" + "Status: " + strings.TrimSpace(string(record.Status)) + "\n" + "Amount: " + strings.TrimSpace(record.Amount) + " " + strings.TrimSpace(record.Currency) + "\n\n" + @@ -352,11 +413,89 @@ func confirmationPrompt(record *storagemodel.TreasuryRequest) string { title = "Withdrawal request created." } return title + "\n\n" + - "Account: " + strings.TrimSpace(record.LedgerAccountID) + "\n" + + "Account: " + requestAccountDisplay(record) + "\n" + "Amount: " + strings.TrimSpace(record.Amount) + " " + strings.TrimSpace(record.Currency) + "\n\n" + confirmationCommandsMessage() } +func welcomeMessage(profile *AccountProfile) string { + accountCode := displayAccountCode(profile, "") + currency := "" + if profile != nil { + currency = strings.ToUpper(strings.TrimSpace(profile.Currency)) + } + if accountCode == "" { + accountCode = "N/A" + } + if currency == "" { + currency = "N/A" + } + return "Welcome to Sendico treasury bot.\n\nAttached account: " + accountCode + " (" + currency + ").\nUse " + CommandFund.Slash() + " to credit your account and " + CommandWithdraw.Slash() + " to debit it.\nAfter entering an amount, use " + CommandConfirm.Slash() + " or " + CommandCancel.Slash() + ".\nUse " + CommandHelp.Slash() + " for detailed usage." +} + +func amountPromptMessage(operation storagemodel.TreasuryOperationType, profile *AccountProfile, fallbackAccountID string) string { + action := "fund" + if operation == storagemodel.TreasuryOperationWithdraw { + action = "withdraw" + } + accountCode := displayAccountCode(profile, fallbackAccountID) + currency := "" + if profile != nil { + currency = strings.ToUpper(strings.TrimSpace(profile.Currency)) + } + if accountCode == "" { + accountCode = "N/A" + } + if currency == "" { + currency = "N/A" + } + return "Preparing to " + action + " account " + accountCode + " (" + currency + ").\n\n" + amountInputHint +} + +func requestAccountDisplay(record *storagemodel.TreasuryRequest) string { + if record == nil { + return "" + } + if code := strings.TrimSpace(record.LedgerAccountCode); code != "" { + return code + } + return strings.TrimSpace(record.LedgerAccountID) +} + +func displayAccountCode(profile *AccountProfile, fallbackAccountID string) string { + if profile != nil { + if code := strings.TrimSpace(profile.AccountCode); code != "" { + return code + } + if id := strings.TrimSpace(profile.AccountID); id != "" { + return id + } + } + return strings.TrimSpace(fallbackAccountID) +} + +func (r *Router) resolveAccountProfile(ctx context.Context, ledgerAccountID string) *AccountProfile { + if r == nil || r.service == nil { + return &AccountProfile{AccountID: strings.TrimSpace(ledgerAccountID)} + } + profile, err := r.service.GetAccountProfile(ctx, ledgerAccountID) + if err != nil { + if r.logger != nil { + r.logger.Warn("Failed to resolve treasury account profile", + zap.Error(err), + zap.String("ledger_account_id", strings.TrimSpace(ledgerAccountID))) + } + return &AccountProfile{AccountID: strings.TrimSpace(ledgerAccountID)} + } + if profile == nil { + return &AccountProfile{AccountID: strings.TrimSpace(ledgerAccountID)} + } + if strings.TrimSpace(profile.AccountID) == "" { + profile.AccountID = strings.TrimSpace(ledgerAccountID) + } + return profile +} + func formatSeconds(value int64) string { if value == 1 { return "1 second" diff --git a/api/gateway/tgsettle/internal/service/treasury/bot/router_test.go b/api/gateway/tgsettle/internal/service/treasury/bot/router_test.go index d6e4d98e..d685f525 100644 --- a/api/gateway/tgsettle/internal/service/treasury/bot/router_test.go +++ b/api/gateway/tgsettle/internal/service/treasury/bot/router_test.go @@ -24,6 +24,14 @@ func (fakeService) GetActiveRequestForAccount(context.Context, string) (*storage return nil, nil } +func (fakeService) GetAccountProfile(_ context.Context, ledgerAccountID string) (*AccountProfile, error) { + return &AccountProfile{ + AccountID: ledgerAccountID, + AccountCode: ledgerAccountID, + Currency: "USD", + }, nil +} + func (fakeService) CreateRequest(context.Context, CreateRequestInput) (*storagemodel.TreasuryRequest, error) { return nil, nil } @@ -124,7 +132,11 @@ func TestRouterEmptyAllowedChats_AllowsAnyChatForAuthorizedUser(t *testing.T) { if len(sent) != 1 { t.Fatalf("expected one message, got %d", len(sent)) } - if sent[0] != "Enter amount:" { + if sent[0] != amountPromptMessage( + storagemodel.TreasuryOperationFund, + &AccountProfile{AccountID: "acct-1", AccountCode: "acct-1", Currency: "USD"}, + "acct-1", + ) { t.Fatalf("unexpected message: %q", sent[0]) } } @@ -186,7 +198,38 @@ func TestRouterStartAuthorizedShowsWelcome(t *testing.T) { if len(sent) != 1 { t.Fatalf("expected one message, got %d", len(sent)) } - if sent[0] != welcomeMessage { + if sent[0] != welcomeMessage(&AccountProfile{AccountID: "acct-1", AccountCode: "acct-1", Currency: "USD"}) { + t.Fatalf("unexpected message: %q", sent[0]) + } +} + +func TestRouterHelpAuthorizedShowsHelp(t *testing.T) { + var sent []string + router := NewRouter( + mloggerfactory.NewLogger(false), + fakeService{}, + func(_ context.Context, _ string, text string) error { + sent = append(sent, text) + return nil + }, + nil, + nil, + map[string]string{"123": "acct-1"}, + ) + handled := router.HandleUpdate(context.Background(), &model.TelegramWebhookUpdate{ + Message: &model.TelegramMessage{ + ChatID: "777", + FromUserID: "123", + Text: "/help", + }, + }) + if !handled { + t.Fatalf("expected update to be handled") + } + if len(sent) != 1 { + t.Fatalf("expected one message, got %d", len(sent)) + } + if sent[0] != helpMessage("acct-1", "USD") { t.Fatalf("unexpected message: %q", sent[0]) } } diff --git a/api/gateway/tgsettle/internal/service/treasury/ledger/client.go b/api/gateway/tgsettle/internal/service/treasury/ledger/client.go index 9a256143..3fa70002 100644 --- a/api/gateway/tgsettle/internal/service/treasury/ledger/client.go +++ b/api/gateway/tgsettle/internal/service/treasury/ledger/client.go @@ -28,6 +28,7 @@ type Config struct { type Account struct { AccountID string + AccountCode string Currency string OrganizationRef string } @@ -130,14 +131,20 @@ func (c *connectorClient) GetAccount(ctx context.Context, accountID string) (*Ac if account == nil { return nil, merrors.NoData("ledger account not found") } + accountCode := strings.TrimSpace(account.GetLabel()) organizationRef := strings.TrimSpace(account.GetOwnerRef()) if organizationRef == "" && account.GetProviderDetails() != nil { - if value, ok := account.GetProviderDetails().AsMap()["organization_ref"]; ok { - organizationRef = strings.TrimSpace(fmt.Sprint(value)) + details := account.GetProviderDetails().AsMap() + if organizationRef == "" { + organizationRef = firstDetailValue(details, "organization_ref", "organizationRef", "org_ref") + } + if accountCode == "" { + accountCode = firstDetailValue(details, "account_code", "accountCode", "code", "ledger_account_code") } } return &Account{ AccountID: accountID, + AccountCode: accountCode, Currency: strings.ToUpper(strings.TrimSpace(account.GetAsset())), OrganizationRef: organizationRef, }, nil @@ -285,3 +292,21 @@ func normalizeEndpoint(raw string) (string, bool) { return raw, false } } + +func firstDetailValue(values map[string]any, keys ...string) string { + if len(values) == 0 || len(keys) == 0 { + return "" + } + for _, key := range keys { + key = strings.TrimSpace(key) + if key == "" { + continue + } + if value, ok := values[key]; ok { + if text := strings.TrimSpace(fmt.Sprint(value)); text != "" { + return text + } + } + } + return "" +} diff --git a/api/gateway/tgsettle/internal/service/treasury/module.go b/api/gateway/tgsettle/internal/service/treasury/module.go index 5b1f89af..7fa5669a 100644 --- a/api/gateway/tgsettle/internal/service/treasury/module.go +++ b/api/gateway/tgsettle/internal/service/treasury/module.go @@ -120,6 +120,24 @@ func (a *botServiceAdapter) GetActiveRequestForAccount(ctx context.Context, ledg return a.svc.GetActiveRequestForAccount(ctx, ledgerAccountID) } +func (a *botServiceAdapter) GetAccountProfile(ctx context.Context, ledgerAccountID string) (*bot.AccountProfile, error) { + if a == nil || a.svc == nil { + return nil, merrors.Internal("treasury service unavailable") + } + profile, err := a.svc.GetAccountProfile(ctx, ledgerAccountID) + if err != nil { + return nil, err + } + if profile == nil { + return nil, nil + } + return &bot.AccountProfile{ + AccountID: strings.TrimSpace(profile.AccountID), + AccountCode: strings.TrimSpace(profile.AccountCode), + Currency: strings.TrimSpace(profile.Currency), + }, nil +} + func (a *botServiceAdapter) CreateRequest(ctx context.Context, input bot.CreateRequestInput) (*storagemodel.TreasuryRequest, error) { if a == nil || a.svc == nil { return nil, merrors.Internal("treasury service unavailable") diff --git a/api/gateway/tgsettle/internal/service/treasury/scheduler.go b/api/gateway/tgsettle/internal/service/treasury/scheduler.go index 08fb0a88..48f28d5b 100644 --- a/api/gateway/tgsettle/internal/service/treasury/scheduler.go +++ b/api/gateway/tgsettle/internal/service/treasury/scheduler.go @@ -145,7 +145,7 @@ func (s *Scheduler) hydrateTimers(ctx context.Context) { return } for _, record := range scheduled { - s.TrackScheduled(record) + s.TrackScheduled(&record) } } @@ -200,17 +200,53 @@ func (s *Scheduler) executeAndNotifyByID(ctx context.Context, requestID string) s.logger.Warn("Failed to execute treasury request", zap.Error(err), zap.String("request_id", requestID)) return } - if result == nil || result.Request == nil || s.notify == nil { + if result == nil || result.Request == nil { + s.logger.Debug("Treasury execution produced no result", zap.String("request_id", requestID)) + return + } + if s.notify == nil { + s.logger.Warn("Treasury execution notifier is unavailable", zap.String("request_id", requestID)) return } text := executionMessage(result) if strings.TrimSpace(text) == "" { + s.logger.Debug("Treasury execution result has no notification text", + zap.String("request_id", strings.TrimSpace(result.Request.RequestID)), + zap.String("status", strings.TrimSpace(string(result.Request.Status)))) return } - if err := s.notify(ctx, strings.TrimSpace(result.Request.ChatID), text); err != nil { - s.logger.Warn("Failed to notify treasury execution result", zap.Error(err), zap.String("request_id", strings.TrimSpace(result.Request.RequestID))) + chatID := strings.TrimSpace(result.Request.ChatID) + if chatID == "" { + s.logger.Warn("Treasury execution notification skipped: empty chat_id", + zap.String("request_id", strings.TrimSpace(result.Request.RequestID))) + return } + + s.logger.Info("Sending treasury execution notification", + zap.String("request_id", strings.TrimSpace(result.Request.RequestID)), + zap.String("chat_id", chatID), + zap.String("status", strings.TrimSpace(string(result.Request.Status)))) + + notifyCtx := context.Background() + if ctx != nil { + notifyCtx = ctx + } + notifyCtx, notifyCancel := context.WithTimeout(notifyCtx, 15*time.Second) + defer notifyCancel() + + if err := s.notify(notifyCtx, chatID, text); err != nil { + s.logger.Warn("Failed to notify treasury execution result", + zap.Error(err), + zap.String("request_id", strings.TrimSpace(result.Request.RequestID)), + zap.String("chat_id", chatID), + zap.String("status", strings.TrimSpace(string(result.Request.Status)))) + return + } + s.logger.Info("Treasury execution notification sent", + zap.String("request_id", strings.TrimSpace(result.Request.RequestID)), + zap.String("chat_id", chatID), + zap.String("status", strings.TrimSpace(string(result.Request.Status)))) } func executionMessage(result *ExecutionResult) string { @@ -237,7 +273,7 @@ func executionMessage(result *ExecutionResult) string { } } return op + " completed.\n\n" + - "Account: " + strings.TrimSpace(request.LedgerAccountID) + "\n" + + "Account: " + requestAccountCode(request) + "\n" + "Amount: " + sign + strings.TrimSpace(request.Amount) + " " + strings.TrimSpace(request.Currency) + "\n" + "New balance: " + balanceAmount + " " + balanceCurrency + "\n\n" + "Reference: " + strings.TrimSpace(request.RequestID) @@ -250,7 +286,7 @@ func executionMessage(result *ExecutionResult) string { reason = "Unknown error." } return "Execution failed.\n\n" + - "Account: " + strings.TrimSpace(request.LedgerAccountID) + "\n" + + "Account: " + requestAccountCode(request) + "\n" + "Amount: " + strings.TrimSpace(request.Amount) + " " + strings.TrimSpace(request.Currency) + "\n" + "Status: FAILED\n\n" + "Reason:\n" + reason + "\n\n" + @@ -259,3 +295,13 @@ func executionMessage(result *ExecutionResult) string { return "" } } + +func requestAccountCode(request *storagemodel.TreasuryRequest) string { + if request == nil { + return "" + } + if code := strings.TrimSpace(request.LedgerAccountCode); code != "" { + return code + } + return strings.TrimSpace(request.LedgerAccountID) +} diff --git a/api/gateway/tgsettle/internal/service/treasury/service.go b/api/gateway/tgsettle/internal/service/treasury/service.go index bcc1013b..88c0915a 100644 --- a/api/gateway/tgsettle/internal/service/treasury/service.go +++ b/api/gateway/tgsettle/internal/service/treasury/service.go @@ -27,6 +27,12 @@ type CreateRequestInput struct { Amount string } +type AccountProfile struct { + AccountID string + AccountCode string + Currency string +} + type ExecutionResult struct { Request *storagemodel.TreasuryRequest NewBalance *ledger.Balance @@ -103,6 +109,29 @@ func (s *Service) GetRequest(ctx context.Context, requestID string) (*storagemod return s.repo.FindByRequestID(ctx, requestID) } +func (s *Service) GetAccountProfile(ctx context.Context, ledgerAccountID string) (*AccountProfile, error) { + if s == nil || s.ledger == nil { + return nil, merrors.Internal("treasury service unavailable") + } + ledgerAccountID = strings.TrimSpace(ledgerAccountID) + if ledgerAccountID == "" { + return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id") + } + + account, err := s.ledger.GetAccount(ctx, ledgerAccountID) + if err != nil { + return nil, err + } + if account == nil { + return nil, merrors.NoData("ledger account not found") + } + return &AccountProfile{ + AccountID: ledgerAccountID, + AccountCode: resolveAccountCode(account, ledgerAccountID), + Currency: strings.ToUpper(strings.TrimSpace(account.Currency)), + }, nil +} + func (s *Service) CreateRequest(ctx context.Context, input CreateRequestInput) (*storagemodel.TreasuryRequest, error) { if s == nil || s.repo == nil || s.ledger == nil || s.validator == nil { return nil, merrors.Internal("treasury service unavailable") @@ -156,17 +185,18 @@ func (s *Service) CreateRequest(ctx context.Context, input CreateRequestInput) ( requestID := newRequestID() record := &storagemodel.TreasuryRequest{ - RequestID: requestID, - OperationType: input.OperationType, - TelegramUserID: input.TelegramUserID, - LedgerAccountID: input.LedgerAccountID, - OrganizationRef: account.OrganizationRef, - ChatID: input.ChatID, - Amount: normalizedAmount, - Currency: strings.ToUpper(strings.TrimSpace(account.Currency)), - Status: storagemodel.TreasuryRequestStatusCreated, - IdempotencyKey: fmt.Sprintf("tgsettle:%s", requestID), - Active: true, + RequestID: requestID, + OperationType: input.OperationType, + TelegramUserID: input.TelegramUserID, + LedgerAccountID: input.LedgerAccountID, + LedgerAccountCode: resolveAccountCode(account, input.LedgerAccountID), + OrganizationRef: account.OrganizationRef, + ChatID: input.ChatID, + Amount: normalizedAmount, + Currency: strings.ToUpper(strings.TrimSpace(account.Currency)), + Status: storagemodel.TreasuryRequestStatusCreated, + IdempotencyKey: fmt.Sprintf("tgsettle:%s", requestID), + Active: true, } if err := s.repo.Create(ctx, record); err != nil { if errors.Is(err, storage.ErrDuplicate) { @@ -364,14 +394,14 @@ func (s *Service) executeClaimed(ctx context.Context, record *storagemodel.Treas }, nil } -func (s *Service) DueRequests(ctx context.Context, statuses []storagemodel.TreasuryRequestStatus, now time.Time, limit int64) ([]*storagemodel.TreasuryRequest, error) { +func (s *Service) DueRequests(ctx context.Context, statuses []storagemodel.TreasuryRequestStatus, now time.Time, limit int64) ([]storagemodel.TreasuryRequest, error) { if s == nil || s.repo == nil { return nil, merrors.Internal("treasury service unavailable") } return s.repo.FindDueByStatus(ctx, statuses, now, limit) } -func (s *Service) ScheduledRequests(ctx context.Context, limit int64) ([]*storagemodel.TreasuryRequest, error) { +func (s *Service) ScheduledRequests(ctx context.Context, limit int64) ([]storagemodel.TreasuryRequest, error) { if s == nil || s.repo == nil { return nil, merrors.Internal("treasury service unavailable") } @@ -395,10 +425,14 @@ func (s *Service) logRequest(record *storagemodel.TreasuryRequest, status string zap.String("request_id", strings.TrimSpace(record.RequestID)), zap.String("telegram_user_id", strings.TrimSpace(record.TelegramUserID)), zap.String("ledger_account_id", strings.TrimSpace(record.LedgerAccountID)), + zap.String("ledger_account_code", strings.TrimSpace(record.LedgerAccountCode)), + zap.String("chat_id", strings.TrimSpace(record.ChatID)), zap.String("operation_type", strings.TrimSpace(string(record.OperationType))), zap.String("amount", strings.TrimSpace(record.Amount)), zap.String("currency", strings.TrimSpace(record.Currency)), zap.String("status", status), + zap.String("ledger_reference", strings.TrimSpace(record.LedgerReference)), + zap.String("error_message", strings.TrimSpace(record.ErrorMessage)), } if err != nil { fields = append(fields, zap.Error(err)) @@ -409,3 +443,15 @@ func (s *Service) logRequest(record *storagemodel.TreasuryRequest, status string func newRequestID() string { return "TGSETTLE-" + strings.ToUpper(bson.NewObjectID().Hex()[:8]) } + +func resolveAccountCode(account *ledger.Account, fallbackAccountID string) string { + if account != nil { + if code := strings.TrimSpace(account.AccountCode); code != "" { + return code + } + if code := strings.TrimSpace(account.AccountID); code != "" { + return code + } + } + return strings.TrimSpace(fallbackAccountID) +} diff --git a/api/gateway/tgsettle/internal/service/treasury/validator.go b/api/gateway/tgsettle/internal/service/treasury/validator.go index eec5e4d8..53ccea4b 100644 --- a/api/gateway/tgsettle/internal/service/treasury/validator.go +++ b/api/gateway/tgsettle/internal/service/treasury/validator.go @@ -143,9 +143,6 @@ func (v *Validator) ValidateDailyLimit(ctx context.Context, ledgerAccountID stri } total := new(big.Rat) for _, record := range records { - if record == nil { - continue - } next, err := parseAmountRat(record.Amount) if err != nil { return merrors.Internal("treasury request amount is invalid") diff --git a/api/gateway/tgsettle/storage/model/treasury.go b/api/gateway/tgsettle/storage/model/treasury.go index e32876c0..37740cb9 100644 --- a/api/gateway/tgsettle/storage/model/treasury.go +++ b/api/gateway/tgsettle/storage/model/treasury.go @@ -27,15 +27,16 @@ const ( type TreasuryRequest struct { storable.Base `bson:",inline" json:",inline"` - RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"` - OperationType TreasuryOperationType `bson:"operationType,omitempty" json:"operation_type,omitempty"` - TelegramUserID string `bson:"telegramUserId,omitempty" json:"telegram_user_id,omitempty"` - LedgerAccountID string `bson:"ledgerAccountId,omitempty" json:"ledger_account_id,omitempty"` - OrganizationRef string `bson:"organizationRef,omitempty" json:"organization_ref,omitempty"` - ChatID string `bson:"chatId,omitempty" json:"chat_id,omitempty"` - Amount string `bson:"amount,omitempty" json:"amount,omitempty"` - Currency string `bson:"currency,omitempty" json:"currency,omitempty"` - Status TreasuryRequestStatus `bson:"status,omitempty" json:"status,omitempty"` + RequestID string `bson:"requestId,omitempty" json:"request_id,omitempty"` + OperationType TreasuryOperationType `bson:"operationType,omitempty" json:"operation_type,omitempty"` + TelegramUserID string `bson:"telegramUserId,omitempty" json:"telegram_user_id,omitempty"` + LedgerAccountID string `bson:"ledgerAccountId,omitempty" json:"ledger_account_id,omitempty"` + LedgerAccountCode string `bson:"ledgerAccountCode,omitempty" json:"ledger_account_code,omitempty"` + OrganizationRef string `bson:"organizationRef,omitempty" json:"organization_ref,omitempty"` + ChatID string `bson:"chatId,omitempty" json:"chat_id,omitempty"` + Amount string `bson:"amount,omitempty" json:"amount,omitempty"` + Currency string `bson:"currency,omitempty" json:"currency,omitempty"` + Status TreasuryRequestStatus `bson:"status,omitempty" json:"status,omitempty"` ConfirmedAt time.Time `bson:"confirmedAt,omitempty" json:"confirmed_at,omitempty"` ScheduledAt time.Time `bson:"scheduledAt,omitempty" json:"scheduled_at,omitempty"` diff --git a/api/gateway/tgsettle/storage/mongo/store/payments.go b/api/gateway/tgsettle/storage/mongo/store/payments.go index 7bc4f000..8b3c0fbe 100644 --- a/api/gateway/tgsettle/storage/mongo/store/payments.go +++ b/api/gateway/tgsettle/storage/mongo/store/payments.go @@ -4,7 +4,6 @@ import ( "context" "errors" "strings" - "time" "github.com/tech/sendico/gateway/tgsettle/storage" "github.com/tech/sendico/gateway/tgsettle/storage/model" @@ -12,7 +11,6 @@ import ( ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" - "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.uber.org/zap" ) @@ -120,31 +118,26 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro if record.IntentRef == "" { return merrors.InvalidArgument("intention reference key is required", "intent_ref") } - now := time.Now() - if record.CreatedAt.IsZero() { - record.CreatedAt = now - } - record.UpdatedAt = now - record.ID = bson.NilObjectID filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey) - existing := &model.PaymentRecord{} - err := p.repo.FindOneByFilter(ctx, filter, existing) - switch { - case err == nil: - record.ID = existing.ID - err = p.repo.Update(ctx, record) - case errors.Is(err, merrors.ErrNoData): - record.ID = bson.NilObjectID - err = p.repo.Insert(ctx, record, filter) - if errors.Is(err, merrors.ErrDataConflict) { - if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil { - err = findErr - break - } - record.ID = existing.ID - err = p.repo.Update(ctx, record) - } + err := p.repo.Insert(ctx, record, filter) + if errors.Is(err, merrors.ErrDataConflict) { + patch := repository.Patch(). + Set(repository.Field(fieldOperationRef), record.OperationRef). + Set(repository.Field("paymentIntentId"), record.PaymentIntentID). + Set(repository.Field("quoteRef"), record.QuoteRef). + Set(repository.Field("intentRef"), record.IntentRef). + Set(repository.Field("paymentRef"), record.PaymentRef). + Set(repository.Field("outgoingLeg"), record.OutgoingLeg). + Set(repository.Field("targetChatId"), record.TargetChatID). + Set(repository.Field("requestedMoney"), record.RequestedMoney). + Set(repository.Field("executedMoney"), record.ExecutedMoney). + Set(repository.Field("status"), record.Status). + Set(repository.Field("failureReason"), record.FailureReason). + Set(repository.Field("executedAt"), record.ExecutedAt). + Set(repository.Field("expiresAt"), record.ExpiresAt). + Set(repository.Field("expiredAt"), record.ExpiredAt) + _, err = p.repo.PatchMany(ctx, filter, patch) } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { diff --git a/api/gateway/tgsettle/storage/mongo/store/pending_confirmations.go b/api/gateway/tgsettle/storage/mongo/store/pending_confirmations.go index 3ab345a4..b663fef3 100644 --- a/api/gateway/tgsettle/storage/mongo/store/pending_confirmations.go +++ b/api/gateway/tgsettle/storage/mongo/store/pending_confirmations.go @@ -13,7 +13,7 @@ import ( ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" - "go.mongodb.org/mongo-driver/v2/bson" + mutil "github.com/tech/sendico/pkg/mutil/db" "go.mongodb.org/mongo-driver/v2/mongo" "go.uber.org/zap" ) @@ -86,34 +86,19 @@ func (p *PendingConfirmations) Upsert(ctx context.Context, record *model.Pending return merrors.InvalidArgument("expires_at is required", "expires_at") } - now := time.Now() - createdAt := record.CreatedAt - if createdAt.IsZero() { - createdAt = now - } - record.UpdatedAt = now - record.CreatedAt = createdAt filter := repository.Filter(fieldPendingRequestID, record.RequestID) - existing := &model.PendingConfirmation{} - - err := p.repo.FindOneByFilter(ctx, filter, existing) - switch { - case err == nil: - record.ID = existing.ID - record.CreatedAt = existing.CreatedAt - err = p.repo.Update(ctx, record) - case errors.Is(err, merrors.ErrNoData): - record.ID = bson.NilObjectID - err = p.repo.Insert(ctx, record, filter) - if errors.Is(err, merrors.ErrDataConflict) { - if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil { - err = findErr - break - } - record.ID = existing.ID - record.CreatedAt = existing.CreatedAt - err = p.repo.Update(ctx, record) - } + err := p.repo.Insert(ctx, record, filter) + if errors.Is(err, merrors.ErrDataConflict) { + patch := repository.Patch(). + Set(repository.Field(fieldPendingMessageID), record.MessageID). + Set(repository.Field("targetChatId"), record.TargetChatID). + Set(repository.Field("acceptedUserIds"), record.AcceptedUserIDs). + Set(repository.Field("requestedMoney"), record.RequestedMoney). + Set(repository.Field("sourceService"), record.SourceService). + Set(repository.Field("rail"), record.Rail). + Set(repository.Field("clarified"), record.Clarified). + Set(repository.Field(fieldPendingExpiresAt), record.ExpiresAt) + _, err = p.repo.PatchMany(ctx, filter, patch) } if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID)) @@ -201,7 +186,7 @@ func (p *PendingConfirmations) DeleteByRequestID(ctx context.Context, requestID return p.repo.DeleteMany(ctx, repository.Filter(fieldPendingRequestID, requestID)) } -func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) { +func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]model.PendingConfirmation, error) { if limit <= 0 { limit = 100 } @@ -210,19 +195,11 @@ func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, l Sort(repository.Field(fieldPendingExpiresAt), true). Limit(&limit) - result := make([]*model.PendingConfirmation, 0) - err := p.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error { - next := &model.PendingConfirmation{} - if err := cur.Decode(next); err != nil { - return err - } - result = append(result, next) - return nil - }) + items, err := mutil.GetObjects[model.PendingConfirmation](ctx, p.logger, query, nil, p.repo) if err != nil && !errors.Is(err, merrors.ErrNoData) { return nil, err } - return result, nil + return items, nil } var _ storage.PendingConfirmationsStore = (*PendingConfirmations)(nil) diff --git a/api/gateway/tgsettle/storage/mongo/store/telegram_confirmations.go b/api/gateway/tgsettle/storage/mongo/store/telegram_confirmations.go index d3f0746b..bd3fe04d 100644 --- a/api/gateway/tgsettle/storage/mongo/store/telegram_confirmations.go +++ b/api/gateway/tgsettle/storage/mongo/store/telegram_confirmations.go @@ -12,7 +12,6 @@ import ( ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" - "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.uber.org/zap" ) @@ -67,24 +66,14 @@ func (t *TelegramConfirmations) Upsert(ctx context.Context, record *model.Telegr record.ReceivedAt = time.Now() } filter := repository.Filter(fieldRequestID, record.RequestID) - existing := &model.TelegramConfirmation{} - - err := t.repo.FindOneByFilter(ctx, filter, existing) - switch { - case err == nil: - record.ID = existing.ID - err = t.repo.Update(ctx, record) - case errors.Is(err, merrors.ErrNoData): - record.ID = bson.NilObjectID - err = t.repo.Insert(ctx, record, filter) - if errors.Is(err, merrors.ErrDataConflict) { - if findErr := t.repo.FindOneByFilter(ctx, filter, existing); findErr != nil { - err = findErr - break - } - record.ID = existing.ID - err = t.repo.Update(ctx, record) - } + err := t.repo.Insert(ctx, record, filter) + if errors.Is(err, merrors.ErrDataConflict) { + patch := repository.Patch(). + Set(repository.Field("paymentIntentId"), record.PaymentIntentID). + Set(repository.Field("quoteRef"), record.QuoteRef). + Set(repository.Field("rawReply"), record.RawReply). + Set(repository.Field("receivedAt"), record.ReceivedAt) + _, err = t.repo.PatchMany(ctx, filter, patch) } if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { fields := []zap.Field{zap.String("request_id", record.RequestID)} diff --git a/api/gateway/tgsettle/storage/mongo/store/treasury_requests.go b/api/gateway/tgsettle/storage/mongo/store/treasury_requests.go index 9c27748a..8a2fb112 100644 --- a/api/gateway/tgsettle/storage/mongo/store/treasury_requests.go +++ b/api/gateway/tgsettle/storage/mongo/store/treasury_requests.go @@ -13,7 +13,7 @@ import ( ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" - "go.mongodb.org/mongo-driver/v2/bson" + mutil "github.com/tech/sendico/pkg/mutil/db" "go.mongodb.org/mongo-driver/v2/mongo" "go.uber.org/zap" ) @@ -104,6 +104,7 @@ func (t *TreasuryRequests) Create(ctx context.Context, record *model.TreasuryReq record.RequestID = strings.TrimSpace(record.RequestID) record.TelegramUserID = strings.TrimSpace(record.TelegramUserID) record.LedgerAccountID = strings.TrimSpace(record.LedgerAccountID) + record.LedgerAccountCode = strings.TrimSpace(record.LedgerAccountCode) record.OrganizationRef = strings.TrimSpace(record.OrganizationRef) record.ChatID = strings.TrimSpace(record.ChatID) record.Amount = strings.TrimSpace(record.Amount) @@ -134,20 +135,24 @@ func (t *TreasuryRequests) Create(ctx context.Context, record *model.TreasuryReq return merrors.InvalidArgument("status is required", "status") } - now := time.Now() - if record.CreatedAt.IsZero() { - record.CreatedAt = now - } - record.UpdatedAt = now - record.ID = bson.NilObjectID - err := t.repo.Insert(ctx, record, repository.Filter(fieldTreasuryRequestID, record.RequestID)) if errors.Is(err, merrors.ErrDataConflict) { return storage.ErrDuplicate } if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { t.logger.Warn("Failed to create treasury request", zap.Error(err), zap.String("request_id", record.RequestID)) + return err } + t.logger.Info("Treasury request created", + zap.String("request_id", record.RequestID), + zap.String("telegram_user_id", record.TelegramUserID), + zap.String("chat_id", record.ChatID), + zap.String("ledger_account_id", record.LedgerAccountID), + zap.String("ledger_account_code", record.LedgerAccountCode), + zap.String("operation_type", strings.TrimSpace(string(record.OperationType))), + zap.String("status", strings.TrimSpace(string(record.Status))), + zap.String("amount", record.Amount), + zap.String("currency", record.Currency)) return err } @@ -159,11 +164,17 @@ func (t *TreasuryRequests) FindByRequestID(ctx context.Context, requestID string var result model.TreasuryRequest err := t.repo.FindOneByFilter(ctx, repository.Filter(fieldTreasuryRequestID, requestID), &result) if errors.Is(err, merrors.ErrNoData) { + t.logger.Debug("Treasury request not found", zap.String("request_id", requestID)) return nil, nil } if err != nil { + t.logger.Warn("Failed to load treasury request", zap.Error(err), zap.String("request_id", requestID)) return nil, err } + t.logger.Debug("Treasury request loaded", + zap.String("request_id", requestID), + zap.String("status", strings.TrimSpace(string(result.Status))), + zap.String("ledger_account_id", strings.TrimSpace(result.LedgerAccountID))) return &result, nil } @@ -178,15 +189,21 @@ func (t *TreasuryRequests) FindActiveByLedgerAccountID(ctx context.Context, ledg Filter(repository.Field(fieldTreasuryActive), true) err := t.repo.FindOneByFilter(ctx, query, &result) if errors.Is(err, merrors.ErrNoData) { + t.logger.Debug("Active treasury request not found", zap.String("ledger_account_id", ledgerAccountID)) return nil, nil } if err != nil { + t.logger.Warn("Failed to load active treasury request", zap.Error(err), zap.String("ledger_account_id", ledgerAccountID)) return nil, err } + t.logger.Debug("Active treasury request loaded", + zap.String("request_id", strings.TrimSpace(result.RequestID)), + zap.String("ledger_account_id", ledgerAccountID), + zap.String("status", strings.TrimSpace(string(result.Status)))) return &result, nil } -func (t *TreasuryRequests) FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]*model.TreasuryRequest, error) { +func (t *TreasuryRequests) FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]model.TreasuryRequest, error) { if len(statuses) == 0 { return nil, nil } @@ -210,18 +227,20 @@ func (t *TreasuryRequests) FindDueByStatus(ctx context.Context, statuses []model Sort(repository.Field(fieldTreasuryScheduledAt), true). Limit(&limit) - result := make([]*model.TreasuryRequest, 0) - err := t.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error { - next := &model.TreasuryRequest{} - if err := cur.Decode(next); err != nil { - return err - } - result = append(result, next) - return nil - }) + result, err := mutil.GetObjects[model.TreasuryRequest](ctx, t.logger, query, nil, t.repo) if err != nil && !errors.Is(err, merrors.ErrNoData) { + t.logger.Warn("Failed to list due treasury requests", + zap.Error(err), + zap.Any("statuses", statusValues), + zap.Time("scheduled_before", now), + zap.Int64("limit", limit)) return nil, err } + t.logger.Debug("Due treasury requests loaded", + zap.Any("statuses", statusValues), + zap.Time("scheduled_before", now), + zap.Int64("limit", limit), + zap.Int("count", len(result))) return result, nil } @@ -231,14 +250,19 @@ func (t *TreasuryRequests) ClaimScheduled(ctx context.Context, requestID string) return false, merrors.InvalidArgument("request_id is required", "request_id") } patch := repository.Patch(). - Set(repository.Field(fieldTreasuryStatus), string(model.TreasuryRequestStatusConfirmed)). - Set(repository.Field("updatedAt"), time.Now()) + Set(repository.Field(fieldTreasuryStatus), string(model.TreasuryRequestStatusConfirmed)) updated, err := t.repo.PatchMany(ctx, repository.Filter(fieldTreasuryRequestID, requestID).And( repository.Filter(fieldTreasuryStatus, string(model.TreasuryRequestStatusScheduled)), ), patch) if err != nil { + t.logger.Warn("Failed to claim scheduled treasury request", zap.Error(err), zap.String("request_id", requestID)) return false, err } + if updated > 0 { + t.logger.Info("Scheduled treasury request claimed", zap.String("request_id", requestID)) + } else { + t.logger.Debug("Scheduled treasury request claim skipped", zap.String("request_id", requestID)) + } return updated > 0, nil } @@ -247,6 +271,16 @@ func (t *TreasuryRequests) Update(ctx context.Context, record *model.TreasuryReq return merrors.InvalidArgument("treasury request is nil", "record") } record.RequestID = strings.TrimSpace(record.RequestID) + record.TelegramUserID = strings.TrimSpace(record.TelegramUserID) + record.LedgerAccountID = strings.TrimSpace(record.LedgerAccountID) + record.LedgerAccountCode = strings.TrimSpace(record.LedgerAccountCode) + record.OrganizationRef = strings.TrimSpace(record.OrganizationRef) + record.ChatID = strings.TrimSpace(record.ChatID) + record.Amount = strings.TrimSpace(record.Amount) + record.Currency = strings.ToUpper(strings.TrimSpace(record.Currency)) + record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey) + record.LedgerReference = strings.TrimSpace(record.LedgerReference) + record.ErrorMessage = strings.TrimSpace(record.ErrorMessage) if record.RequestID == "" { return merrors.InvalidArgument("request_id is required", "request_id") } @@ -257,21 +291,46 @@ func (t *TreasuryRequests) Update(ctx context.Context, record *model.TreasuryReq if existing == nil { return merrors.NoData("treasury request not found") } - record.ID = existing.ID - if record.CreatedAt.IsZero() { - record.CreatedAt = existing.CreatedAt - } - record.UpdatedAt = time.Now() - if err := t.repo.Update(ctx, record); err != nil { + + patch := repository.Patch(). + Set(repository.Field("operationType"), record.OperationType). + Set(repository.Field("telegramUserId"), record.TelegramUserID). + Set(repository.Field("ledgerAccountId"), record.LedgerAccountID). + Set(repository.Field("ledgerAccountCode"), record.LedgerAccountCode). + Set(repository.Field("organizationRef"), record.OrganizationRef). + Set(repository.Field("chatId"), record.ChatID). + Set(repository.Field("amount"), record.Amount). + Set(repository.Field("currency"), record.Currency). + Set(repository.Field(fieldTreasuryStatus), record.Status). + Set(repository.Field("confirmedAt"), record.ConfirmedAt). + Set(repository.Field("scheduledAt"), record.ScheduledAt). + Set(repository.Field("executedAt"), record.ExecutedAt). + Set(repository.Field("cancelledAt"), record.CancelledAt). + Set(repository.Field(fieldTreasuryIdempotencyKey), record.IdempotencyKey). + Set(repository.Field("ledgerReference"), record.LedgerReference). + Set(repository.Field("errorMessage"), record.ErrorMessage). + Set(repository.Field(fieldTreasuryActive), record.Active) + if _, err := t.repo.PatchMany(ctx, repository.Filter(fieldTreasuryRequestID, record.RequestID), patch); err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { t.logger.Warn("Failed to update treasury request", zap.Error(err), zap.String("request_id", record.RequestID)) } return err } + t.logger.Info("Treasury request updated", + zap.String("request_id", record.RequestID), + zap.String("telegram_user_id", strings.TrimSpace(record.TelegramUserID)), + zap.String("chat_id", strings.TrimSpace(record.ChatID)), + zap.String("ledger_account_id", strings.TrimSpace(record.LedgerAccountID)), + zap.String("ledger_account_code", strings.TrimSpace(record.LedgerAccountCode)), + zap.String("operation_type", strings.TrimSpace(string(record.OperationType))), + zap.String("status", strings.TrimSpace(string(record.Status))), + zap.String("amount", strings.TrimSpace(record.Amount)), + zap.String("currency", strings.TrimSpace(record.Currency)), + zap.String("error_message", strings.TrimSpace(record.ErrorMessage))) return nil } -func (t *TreasuryRequests) ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]*model.TreasuryRequest, error) { +func (t *TreasuryRequests) ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]model.TreasuryRequest, error) { ledgerAccountID = strings.TrimSpace(ledgerAccountID) if ledgerAccountID == "" { return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id") @@ -293,18 +352,22 @@ func (t *TreasuryRequests) ListByAccountAndStatuses(ctx context.Context, ledgerA Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Gte, dayStart). Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Lt, dayEnd) - result := make([]*model.TreasuryRequest, 0) - err := t.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error { - next := &model.TreasuryRequest{} - if err := cur.Decode(next); err != nil { - return err - } - result = append(result, next) - return nil - }) + result, err := mutil.GetObjects[model.TreasuryRequest](ctx, t.logger, query, nil, t.repo) if err != nil && !errors.Is(err, merrors.ErrNoData) { + t.logger.Warn("Failed to list treasury requests by account and statuses", + zap.Error(err), + zap.String("ledger_account_id", ledgerAccountID), + zap.Any("statuses", statusValues), + zap.Time("day_start", dayStart), + zap.Time("day_end", dayEnd)) return nil, err } + t.logger.Debug("Treasury requests loaded by account and statuses", + zap.String("ledger_account_id", ledgerAccountID), + zap.Any("statuses", statusValues), + zap.Time("day_start", dayStart), + zap.Time("day_end", dayEnd), + zap.Int("count", len(result))) return result, nil } diff --git a/api/gateway/tgsettle/storage/storage.go b/api/gateway/tgsettle/storage/storage.go index 26fbce9e..1cd2138d 100644 --- a/api/gateway/tgsettle/storage/storage.go +++ b/api/gateway/tgsettle/storage/storage.go @@ -34,15 +34,15 @@ type PendingConfirmationsStore interface { MarkClarified(ctx context.Context, requestID string) error AttachMessage(ctx context.Context, requestID string, messageID string) error DeleteByRequestID(ctx context.Context, requestID string) error - ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) + ListExpired(ctx context.Context, now time.Time, limit int64) ([]model.PendingConfirmation, error) } type TreasuryRequestsStore interface { Create(ctx context.Context, record *model.TreasuryRequest) error FindByRequestID(ctx context.Context, requestID string) (*model.TreasuryRequest, error) FindActiveByLedgerAccountID(ctx context.Context, ledgerAccountID string) (*model.TreasuryRequest, error) - FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]*model.TreasuryRequest, error) + FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]model.TreasuryRequest, error) ClaimScheduled(ctx context.Context, requestID string) (bool, error) Update(ctx context.Context, record *model.TreasuryRequest) error - ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]*model.TreasuryRequest, error) + ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]model.TreasuryRequest, error) }