improved tgsettle messages + storage fixes

This commit is contained in:
Stephan D
2026-03-05 11:54:07 +01:00
parent 801f349aa8
commit 5e59fea7e5
16 changed files with 537 additions and 172 deletions

View File

@@ -4,7 +4,6 @@ import (
"context"
"errors"
"strings"
"time"
"github.com/tech/sendico/gateway/tgsettle/storage"
"github.com/tech/sendico/gateway/tgsettle/storage/model"
@@ -12,7 +11,6 @@ import (
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.uber.org/zap"
)
@@ -120,31 +118,26 @@ func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) erro
if record.IntentRef == "" {
return merrors.InvalidArgument("intention reference key is required", "intent_ref")
}
now := time.Now()
if record.CreatedAt.IsZero() {
record.CreatedAt = now
}
record.UpdatedAt = now
record.ID = bson.NilObjectID
filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey)
existing := &model.PaymentRecord{}
err := p.repo.FindOneByFilter(ctx, filter, existing)
switch {
case err == nil:
record.ID = existing.ID
err = p.repo.Update(ctx, record)
case errors.Is(err, merrors.ErrNoData):
record.ID = bson.NilObjectID
err = p.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
err = findErr
break
}
record.ID = existing.ID
err = p.repo.Update(ctx, record)
}
err := p.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
patch := repository.Patch().
Set(repository.Field(fieldOperationRef), record.OperationRef).
Set(repository.Field("paymentIntentId"), record.PaymentIntentID).
Set(repository.Field("quoteRef"), record.QuoteRef).
Set(repository.Field("intentRef"), record.IntentRef).
Set(repository.Field("paymentRef"), record.PaymentRef).
Set(repository.Field("outgoingLeg"), record.OutgoingLeg).
Set(repository.Field("targetChatId"), record.TargetChatID).
Set(repository.Field("requestedMoney"), record.RequestedMoney).
Set(repository.Field("executedMoney"), record.ExecutedMoney).
Set(repository.Field("status"), record.Status).
Set(repository.Field("failureReason"), record.FailureReason).
Set(repository.Field("executedAt"), record.ExecutedAt).
Set(repository.Field("expiresAt"), record.ExpiresAt).
Set(repository.Field("expiredAt"), record.ExpiredAt)
_, err = p.repo.PatchMany(ctx, filter, patch)
}
if err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {

View File

@@ -13,7 +13,7 @@ import (
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson"
mutil "github.com/tech/sendico/pkg/mutil/db"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.uber.org/zap"
)
@@ -86,34 +86,19 @@ func (p *PendingConfirmations) Upsert(ctx context.Context, record *model.Pending
return merrors.InvalidArgument("expires_at is required", "expires_at")
}
now := time.Now()
createdAt := record.CreatedAt
if createdAt.IsZero() {
createdAt = now
}
record.UpdatedAt = now
record.CreatedAt = createdAt
filter := repository.Filter(fieldPendingRequestID, record.RequestID)
existing := &model.PendingConfirmation{}
err := p.repo.FindOneByFilter(ctx, filter, existing)
switch {
case err == nil:
record.ID = existing.ID
record.CreatedAt = existing.CreatedAt
err = p.repo.Update(ctx, record)
case errors.Is(err, merrors.ErrNoData):
record.ID = bson.NilObjectID
err = p.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
err = findErr
break
}
record.ID = existing.ID
record.CreatedAt = existing.CreatedAt
err = p.repo.Update(ctx, record)
}
err := p.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
patch := repository.Patch().
Set(repository.Field(fieldPendingMessageID), record.MessageID).
Set(repository.Field("targetChatId"), record.TargetChatID).
Set(repository.Field("acceptedUserIds"), record.AcceptedUserIDs).
Set(repository.Field("requestedMoney"), record.RequestedMoney).
Set(repository.Field("sourceService"), record.SourceService).
Set(repository.Field("rail"), record.Rail).
Set(repository.Field("clarified"), record.Clarified).
Set(repository.Field(fieldPendingExpiresAt), record.ExpiresAt)
_, err = p.repo.PatchMany(ctx, filter, patch)
}
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID))
@@ -201,7 +186,7 @@ func (p *PendingConfirmations) DeleteByRequestID(ctx context.Context, requestID
return p.repo.DeleteMany(ctx, repository.Filter(fieldPendingRequestID, requestID))
}
func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]*model.PendingConfirmation, error) {
func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]model.PendingConfirmation, error) {
if limit <= 0 {
limit = 100
}
@@ -210,19 +195,11 @@ func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, l
Sort(repository.Field(fieldPendingExpiresAt), true).
Limit(&limit)
result := make([]*model.PendingConfirmation, 0)
err := p.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error {
next := &model.PendingConfirmation{}
if err := cur.Decode(next); err != nil {
return err
}
result = append(result, next)
return nil
})
items, err := mutil.GetObjects[model.PendingConfirmation](ctx, p.logger, query, nil, p.repo)
if err != nil && !errors.Is(err, merrors.ErrNoData) {
return nil, err
}
return result, nil
return items, nil
}
var _ storage.PendingConfirmationsStore = (*PendingConfirmations)(nil)

View File

@@ -12,7 +12,6 @@ import (
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.uber.org/zap"
)
@@ -67,24 +66,14 @@ func (t *TelegramConfirmations) Upsert(ctx context.Context, record *model.Telegr
record.ReceivedAt = time.Now()
}
filter := repository.Filter(fieldRequestID, record.RequestID)
existing := &model.TelegramConfirmation{}
err := t.repo.FindOneByFilter(ctx, filter, existing)
switch {
case err == nil:
record.ID = existing.ID
err = t.repo.Update(ctx, record)
case errors.Is(err, merrors.ErrNoData):
record.ID = bson.NilObjectID
err = t.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
if findErr := t.repo.FindOneByFilter(ctx, filter, existing); findErr != nil {
err = findErr
break
}
record.ID = existing.ID
err = t.repo.Update(ctx, record)
}
err := t.repo.Insert(ctx, record, filter)
if errors.Is(err, merrors.ErrDataConflict) {
patch := repository.Patch().
Set(repository.Field("paymentIntentId"), record.PaymentIntentID).
Set(repository.Field("quoteRef"), record.QuoteRef).
Set(repository.Field("rawReply"), record.RawReply).
Set(repository.Field("receivedAt"), record.ReceivedAt)
_, err = t.repo.PatchMany(ctx, filter, patch)
}
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
fields := []zap.Field{zap.String("request_id", record.RequestID)}

View File

@@ -13,7 +13,7 @@ import (
ri "github.com/tech/sendico/pkg/db/repository/index"
"github.com/tech/sendico/pkg/merrors"
"github.com/tech/sendico/pkg/mlogger"
"go.mongodb.org/mongo-driver/v2/bson"
mutil "github.com/tech/sendico/pkg/mutil/db"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.uber.org/zap"
)
@@ -104,6 +104,7 @@ func (t *TreasuryRequests) Create(ctx context.Context, record *model.TreasuryReq
record.RequestID = strings.TrimSpace(record.RequestID)
record.TelegramUserID = strings.TrimSpace(record.TelegramUserID)
record.LedgerAccountID = strings.TrimSpace(record.LedgerAccountID)
record.LedgerAccountCode = strings.TrimSpace(record.LedgerAccountCode)
record.OrganizationRef = strings.TrimSpace(record.OrganizationRef)
record.ChatID = strings.TrimSpace(record.ChatID)
record.Amount = strings.TrimSpace(record.Amount)
@@ -134,20 +135,24 @@ func (t *TreasuryRequests) Create(ctx context.Context, record *model.TreasuryReq
return merrors.InvalidArgument("status is required", "status")
}
now := time.Now()
if record.CreatedAt.IsZero() {
record.CreatedAt = now
}
record.UpdatedAt = now
record.ID = bson.NilObjectID
err := t.repo.Insert(ctx, record, repository.Filter(fieldTreasuryRequestID, record.RequestID))
if errors.Is(err, merrors.ErrDataConflict) {
return storage.ErrDuplicate
}
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("Failed to create treasury request", zap.Error(err), zap.String("request_id", record.RequestID))
return err
}
t.logger.Info("Treasury request created",
zap.String("request_id", record.RequestID),
zap.String("telegram_user_id", record.TelegramUserID),
zap.String("chat_id", record.ChatID),
zap.String("ledger_account_id", record.LedgerAccountID),
zap.String("ledger_account_code", record.LedgerAccountCode),
zap.String("operation_type", strings.TrimSpace(string(record.OperationType))),
zap.String("status", strings.TrimSpace(string(record.Status))),
zap.String("amount", record.Amount),
zap.String("currency", record.Currency))
return err
}
@@ -159,11 +164,17 @@ func (t *TreasuryRequests) FindByRequestID(ctx context.Context, requestID string
var result model.TreasuryRequest
err := t.repo.FindOneByFilter(ctx, repository.Filter(fieldTreasuryRequestID, requestID), &result)
if errors.Is(err, merrors.ErrNoData) {
t.logger.Debug("Treasury request not found", zap.String("request_id", requestID))
return nil, nil
}
if err != nil {
t.logger.Warn("Failed to load treasury request", zap.Error(err), zap.String("request_id", requestID))
return nil, err
}
t.logger.Debug("Treasury request loaded",
zap.String("request_id", requestID),
zap.String("status", strings.TrimSpace(string(result.Status))),
zap.String("ledger_account_id", strings.TrimSpace(result.LedgerAccountID)))
return &result, nil
}
@@ -178,15 +189,21 @@ func (t *TreasuryRequests) FindActiveByLedgerAccountID(ctx context.Context, ledg
Filter(repository.Field(fieldTreasuryActive), true)
err := t.repo.FindOneByFilter(ctx, query, &result)
if errors.Is(err, merrors.ErrNoData) {
t.logger.Debug("Active treasury request not found", zap.String("ledger_account_id", ledgerAccountID))
return nil, nil
}
if err != nil {
t.logger.Warn("Failed to load active treasury request", zap.Error(err), zap.String("ledger_account_id", ledgerAccountID))
return nil, err
}
t.logger.Debug("Active treasury request loaded",
zap.String("request_id", strings.TrimSpace(result.RequestID)),
zap.String("ledger_account_id", ledgerAccountID),
zap.String("status", strings.TrimSpace(string(result.Status))))
return &result, nil
}
func (t *TreasuryRequests) FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]*model.TreasuryRequest, error) {
func (t *TreasuryRequests) FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]model.TreasuryRequest, error) {
if len(statuses) == 0 {
return nil, nil
}
@@ -210,18 +227,20 @@ func (t *TreasuryRequests) FindDueByStatus(ctx context.Context, statuses []model
Sort(repository.Field(fieldTreasuryScheduledAt), true).
Limit(&limit)
result := make([]*model.TreasuryRequest, 0)
err := t.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error {
next := &model.TreasuryRequest{}
if err := cur.Decode(next); err != nil {
return err
}
result = append(result, next)
return nil
})
result, err := mutil.GetObjects[model.TreasuryRequest](ctx, t.logger, query, nil, t.repo)
if err != nil && !errors.Is(err, merrors.ErrNoData) {
t.logger.Warn("Failed to list due treasury requests",
zap.Error(err),
zap.Any("statuses", statusValues),
zap.Time("scheduled_before", now),
zap.Int64("limit", limit))
return nil, err
}
t.logger.Debug("Due treasury requests loaded",
zap.Any("statuses", statusValues),
zap.Time("scheduled_before", now),
zap.Int64("limit", limit),
zap.Int("count", len(result)))
return result, nil
}
@@ -231,14 +250,19 @@ func (t *TreasuryRequests) ClaimScheduled(ctx context.Context, requestID string)
return false, merrors.InvalidArgument("request_id is required", "request_id")
}
patch := repository.Patch().
Set(repository.Field(fieldTreasuryStatus), string(model.TreasuryRequestStatusConfirmed)).
Set(repository.Field("updatedAt"), time.Now())
Set(repository.Field(fieldTreasuryStatus), string(model.TreasuryRequestStatusConfirmed))
updated, err := t.repo.PatchMany(ctx, repository.Filter(fieldTreasuryRequestID, requestID).And(
repository.Filter(fieldTreasuryStatus, string(model.TreasuryRequestStatusScheduled)),
), patch)
if err != nil {
t.logger.Warn("Failed to claim scheduled treasury request", zap.Error(err), zap.String("request_id", requestID))
return false, err
}
if updated > 0 {
t.logger.Info("Scheduled treasury request claimed", zap.String("request_id", requestID))
} else {
t.logger.Debug("Scheduled treasury request claim skipped", zap.String("request_id", requestID))
}
return updated > 0, nil
}
@@ -247,6 +271,16 @@ func (t *TreasuryRequests) Update(ctx context.Context, record *model.TreasuryReq
return merrors.InvalidArgument("treasury request is nil", "record")
}
record.RequestID = strings.TrimSpace(record.RequestID)
record.TelegramUserID = strings.TrimSpace(record.TelegramUserID)
record.LedgerAccountID = strings.TrimSpace(record.LedgerAccountID)
record.LedgerAccountCode = strings.TrimSpace(record.LedgerAccountCode)
record.OrganizationRef = strings.TrimSpace(record.OrganizationRef)
record.ChatID = strings.TrimSpace(record.ChatID)
record.Amount = strings.TrimSpace(record.Amount)
record.Currency = strings.ToUpper(strings.TrimSpace(record.Currency))
record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey)
record.LedgerReference = strings.TrimSpace(record.LedgerReference)
record.ErrorMessage = strings.TrimSpace(record.ErrorMessage)
if record.RequestID == "" {
return merrors.InvalidArgument("request_id is required", "request_id")
}
@@ -257,21 +291,46 @@ func (t *TreasuryRequests) Update(ctx context.Context, record *model.TreasuryReq
if existing == nil {
return merrors.NoData("treasury request not found")
}
record.ID = existing.ID
if record.CreatedAt.IsZero() {
record.CreatedAt = existing.CreatedAt
}
record.UpdatedAt = time.Now()
if err := t.repo.Update(ctx, record); err != nil {
patch := repository.Patch().
Set(repository.Field("operationType"), record.OperationType).
Set(repository.Field("telegramUserId"), record.TelegramUserID).
Set(repository.Field("ledgerAccountId"), record.LedgerAccountID).
Set(repository.Field("ledgerAccountCode"), record.LedgerAccountCode).
Set(repository.Field("organizationRef"), record.OrganizationRef).
Set(repository.Field("chatId"), record.ChatID).
Set(repository.Field("amount"), record.Amount).
Set(repository.Field("currency"), record.Currency).
Set(repository.Field(fieldTreasuryStatus), record.Status).
Set(repository.Field("confirmedAt"), record.ConfirmedAt).
Set(repository.Field("scheduledAt"), record.ScheduledAt).
Set(repository.Field("executedAt"), record.ExecutedAt).
Set(repository.Field("cancelledAt"), record.CancelledAt).
Set(repository.Field(fieldTreasuryIdempotencyKey), record.IdempotencyKey).
Set(repository.Field("ledgerReference"), record.LedgerReference).
Set(repository.Field("errorMessage"), record.ErrorMessage).
Set(repository.Field(fieldTreasuryActive), record.Active)
if _, err := t.repo.PatchMany(ctx, repository.Filter(fieldTreasuryRequestID, record.RequestID), patch); err != nil {
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
t.logger.Warn("Failed to update treasury request", zap.Error(err), zap.String("request_id", record.RequestID))
}
return err
}
t.logger.Info("Treasury request updated",
zap.String("request_id", record.RequestID),
zap.String("telegram_user_id", strings.TrimSpace(record.TelegramUserID)),
zap.String("chat_id", strings.TrimSpace(record.ChatID)),
zap.String("ledger_account_id", strings.TrimSpace(record.LedgerAccountID)),
zap.String("ledger_account_code", strings.TrimSpace(record.LedgerAccountCode)),
zap.String("operation_type", strings.TrimSpace(string(record.OperationType))),
zap.String("status", strings.TrimSpace(string(record.Status))),
zap.String("amount", strings.TrimSpace(record.Amount)),
zap.String("currency", strings.TrimSpace(record.Currency)),
zap.String("error_message", strings.TrimSpace(record.ErrorMessage)))
return nil
}
func (t *TreasuryRequests) ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]*model.TreasuryRequest, error) {
func (t *TreasuryRequests) ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]model.TreasuryRequest, error) {
ledgerAccountID = strings.TrimSpace(ledgerAccountID)
if ledgerAccountID == "" {
return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id")
@@ -293,18 +352,22 @@ func (t *TreasuryRequests) ListByAccountAndStatuses(ctx context.Context, ledgerA
Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Gte, dayStart).
Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Lt, dayEnd)
result := make([]*model.TreasuryRequest, 0)
err := t.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error {
next := &model.TreasuryRequest{}
if err := cur.Decode(next); err != nil {
return err
}
result = append(result, next)
return nil
})
result, err := mutil.GetObjects[model.TreasuryRequest](ctx, t.logger, query, nil, t.repo)
if err != nil && !errors.Is(err, merrors.ErrNoData) {
t.logger.Warn("Failed to list treasury requests by account and statuses",
zap.Error(err),
zap.String("ledger_account_id", ledgerAccountID),
zap.Any("statuses", statusValues),
zap.Time("day_start", dayStart),
zap.Time("day_end", dayEnd))
return nil, err
}
t.logger.Debug("Treasury requests loaded by account and statuses",
zap.String("ledger_account_id", ledgerAccountID),
zap.Any("statuses", statusValues),
zap.Time("day_start", dayStart),
zap.Time("day_end", dayEnd),
zap.Int("count", len(result)))
return result, nil
}