Treasury bot + ledger fix
This commit is contained in:
@@ -24,6 +24,7 @@ type Repository struct {
|
||||
payments storage.PaymentsStore
|
||||
tg storage.TelegramConfirmationsStore
|
||||
pending storage.PendingConfirmationsStore
|
||||
treasury storage.TreasuryRequestsStore
|
||||
outbox gatewayoutbox.Store
|
||||
}
|
||||
|
||||
@@ -74,6 +75,11 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) {
|
||||
result.logger.Error("Failed to initialise pending confirmations store", zap.Error(err), zap.String("store", "pending_confirmations"))
|
||||
return nil, err
|
||||
}
|
||||
treasuryStore, err := store.NewTreasuryRequests(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise treasury requests store", zap.Error(err), zap.String("store", "treasury_requests"))
|
||||
return nil, err
|
||||
}
|
||||
outboxStore, err := gatewayoutbox.NewMongoStore(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise outbox store", zap.Error(err), zap.String("store", "outbox"))
|
||||
@@ -82,6 +88,7 @@ func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) {
|
||||
result.payments = paymentsStore
|
||||
result.tg = tgStore
|
||||
result.pending = pendingStore
|
||||
result.treasury = treasuryStore
|
||||
result.outbox = outboxStore
|
||||
result.logger.Info("Payment gateway MongoDB storage initialised")
|
||||
return result, nil
|
||||
@@ -99,6 +106,10 @@ func (r *Repository) PendingConfirmations() storage.PendingConfirmationsStore {
|
||||
return r.pending
|
||||
}
|
||||
|
||||
func (r *Repository) TreasuryRequests() storage.TreasuryRequestsStore {
|
||||
return r.treasury
|
||||
}
|
||||
|
||||
func (r *Repository) Outbox() gatewayoutbox.Store {
|
||||
return r.outbox
|
||||
}
|
||||
|
||||
311
api/gateway/tgsettle/storage/mongo/store/treasury_requests.go
Normal file
311
api/gateway/tgsettle/storage/mongo/store/treasury_requests.go
Normal file
@@ -0,0 +1,311 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/tgsettle/storage"
|
||||
"github.com/tech/sendico/gateway/tgsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/db/repository"
|
||||
"github.com/tech/sendico/pkg/db/repository/builder"
|
||||
ri "github.com/tech/sendico/pkg/db/repository/index"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
treasuryRequestsCollection = "treasury_requests"
|
||||
|
||||
fieldTreasuryRequestID = "requestId"
|
||||
fieldTreasuryLedgerAccount = "ledgerAccountId"
|
||||
fieldTreasuryIdempotencyKey = "idempotencyKey"
|
||||
fieldTreasuryStatus = "status"
|
||||
fieldTreasuryScheduledAt = "scheduledAt"
|
||||
fieldTreasuryCreatedAt = "createdAt"
|
||||
fieldTreasuryActive = "active"
|
||||
)
|
||||
|
||||
type TreasuryRequests struct {
|
||||
logger mlogger.Logger
|
||||
repo repository.Repository
|
||||
}
|
||||
|
||||
func NewTreasuryRequests(logger mlogger.Logger, db *mongo.Database) (*TreasuryRequests, error) {
|
||||
if db == nil {
|
||||
return nil, merrors.InvalidArgument("mongo database is nil")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
logger = logger.Named("treasury_requests").With(zap.String("collection", treasuryRequestsCollection))
|
||||
|
||||
repo := repository.CreateMongoRepository(db, treasuryRequestsCollection)
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldTreasuryRequestID, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests request_id index", zap.Error(err), zap.String("index_field", fieldTreasuryRequestID))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldTreasuryIdempotencyKey, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests idempotency index", zap.Error(err), zap.String("index_field", fieldTreasuryIdempotencyKey))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{
|
||||
{Field: fieldTreasuryLedgerAccount, Sort: ri.Asc},
|
||||
{Field: fieldTreasuryActive, Sort: ri.Asc},
|
||||
},
|
||||
Unique: true,
|
||||
PartialFilter: repository.Filter(fieldTreasuryActive, true),
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests active-account index", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{
|
||||
{Field: fieldTreasuryStatus, Sort: ri.Asc},
|
||||
{Field: fieldTreasuryScheduledAt, Sort: ri.Asc},
|
||||
},
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests execution index", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{
|
||||
{Field: fieldTreasuryLedgerAccount, Sort: ri.Asc},
|
||||
{Field: fieldTreasuryCreatedAt, Sort: ri.Asc},
|
||||
},
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests daily-amount index", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t := &TreasuryRequests{
|
||||
logger: logger,
|
||||
repo: repo,
|
||||
}
|
||||
t.logger.Debug("Treasury requests store initialised")
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) Create(ctx context.Context, record *model.TreasuryRequest) error {
|
||||
if record == nil {
|
||||
return merrors.InvalidArgument("treasury request is nil", "record")
|
||||
}
|
||||
record.RequestID = strings.TrimSpace(record.RequestID)
|
||||
record.TelegramUserID = strings.TrimSpace(record.TelegramUserID)
|
||||
record.LedgerAccountID = strings.TrimSpace(record.LedgerAccountID)
|
||||
record.OrganizationRef = strings.TrimSpace(record.OrganizationRef)
|
||||
record.ChatID = strings.TrimSpace(record.ChatID)
|
||||
record.Amount = strings.TrimSpace(record.Amount)
|
||||
record.Currency = strings.ToUpper(strings.TrimSpace(record.Currency))
|
||||
record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey)
|
||||
record.LedgerReference = strings.TrimSpace(record.LedgerReference)
|
||||
record.ErrorMessage = strings.TrimSpace(record.ErrorMessage)
|
||||
|
||||
if record.RequestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
if record.TelegramUserID == "" {
|
||||
return merrors.InvalidArgument("telegram_user_id is required", "telegram_user_id")
|
||||
}
|
||||
if record.LedgerAccountID == "" {
|
||||
return merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id")
|
||||
}
|
||||
if record.Amount == "" {
|
||||
return merrors.InvalidArgument("amount is required", "amount")
|
||||
}
|
||||
if record.Currency == "" {
|
||||
return merrors.InvalidArgument("currency is required", "currency")
|
||||
}
|
||||
if record.IdempotencyKey == "" {
|
||||
return merrors.InvalidArgument("idempotency_key is required", "idempotency_key")
|
||||
}
|
||||
if record.Status == "" {
|
||||
return merrors.InvalidArgument("status is required", "status")
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if record.CreatedAt.IsZero() {
|
||||
record.CreatedAt = now
|
||||
}
|
||||
record.UpdatedAt = now
|
||||
record.ID = bson.NilObjectID
|
||||
|
||||
err := t.repo.Insert(ctx, record, repository.Filter(fieldTreasuryRequestID, record.RequestID))
|
||||
if errors.Is(err, merrors.ErrDataConflict) {
|
||||
return storage.ErrDuplicate
|
||||
}
|
||||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
t.logger.Warn("Failed to create treasury request", zap.Error(err), zap.String("request_id", record.RequestID))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) FindByRequestID(ctx context.Context, requestID string) (*model.TreasuryRequest, error) {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
var result model.TreasuryRequest
|
||||
err := t.repo.FindOneByFilter(ctx, repository.Filter(fieldTreasuryRequestID, requestID), &result)
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) FindActiveByLedgerAccountID(ctx context.Context, ledgerAccountID string) (*model.TreasuryRequest, error) {
|
||||
ledgerAccountID = strings.TrimSpace(ledgerAccountID)
|
||||
if ledgerAccountID == "" {
|
||||
return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id")
|
||||
}
|
||||
var result model.TreasuryRequest
|
||||
query := repository.Query().
|
||||
Filter(repository.Field(fieldTreasuryLedgerAccount), ledgerAccountID).
|
||||
Filter(repository.Field(fieldTreasuryActive), true)
|
||||
err := t.repo.FindOneByFilter(ctx, query, &result)
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]*model.TreasuryRequest, error) {
|
||||
if len(statuses) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
statusValues := make([]any, 0, len(statuses))
|
||||
for _, status := range statuses {
|
||||
next := strings.TrimSpace(string(status))
|
||||
if next == "" {
|
||||
continue
|
||||
}
|
||||
statusValues = append(statusValues, next)
|
||||
}
|
||||
if len(statusValues) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
query := repository.Query().
|
||||
In(repository.Field(fieldTreasuryStatus), statusValues...).
|
||||
Comparison(repository.Field(fieldTreasuryScheduledAt), builder.Lte, now).
|
||||
Sort(repository.Field(fieldTreasuryScheduledAt), true).
|
||||
Limit(&limit)
|
||||
|
||||
result := make([]*model.TreasuryRequest, 0)
|
||||
err := t.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error {
|
||||
next := &model.TreasuryRequest{}
|
||||
if err := cur.Decode(next); err != nil {
|
||||
return err
|
||||
}
|
||||
result = append(result, next)
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) ClaimScheduled(ctx context.Context, requestID string) (bool, error) {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return false, merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
patch := repository.Patch().
|
||||
Set(repository.Field(fieldTreasuryStatus), string(model.TreasuryRequestStatusConfirmed)).
|
||||
Set(repository.Field("updatedAt"), time.Now())
|
||||
updated, err := t.repo.PatchMany(ctx, repository.Filter(fieldTreasuryRequestID, requestID).And(
|
||||
repository.Filter(fieldTreasuryStatus, string(model.TreasuryRequestStatusScheduled)),
|
||||
), patch)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return updated > 0, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) Update(ctx context.Context, record *model.TreasuryRequest) error {
|
||||
if record == nil {
|
||||
return merrors.InvalidArgument("treasury request is nil", "record")
|
||||
}
|
||||
record.RequestID = strings.TrimSpace(record.RequestID)
|
||||
if record.RequestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
existing, err := t.FindByRequestID(ctx, record.RequestID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if existing == nil {
|
||||
return merrors.NoData("treasury request not found")
|
||||
}
|
||||
record.ID = existing.ID
|
||||
if record.CreatedAt.IsZero() {
|
||||
record.CreatedAt = existing.CreatedAt
|
||||
}
|
||||
record.UpdatedAt = time.Now()
|
||||
if err := t.repo.Update(ctx, record); err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
t.logger.Warn("Failed to update treasury request", zap.Error(err), zap.String("request_id", record.RequestID))
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]*model.TreasuryRequest, error) {
|
||||
ledgerAccountID = strings.TrimSpace(ledgerAccountID)
|
||||
if ledgerAccountID == "" {
|
||||
return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id")
|
||||
}
|
||||
statusValues := make([]any, 0, len(statuses))
|
||||
for _, status := range statuses {
|
||||
next := strings.TrimSpace(string(status))
|
||||
if next == "" {
|
||||
continue
|
||||
}
|
||||
statusValues = append(statusValues, next)
|
||||
}
|
||||
if len(statusValues) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
query := repository.Query().
|
||||
Filter(repository.Field(fieldTreasuryLedgerAccount), ledgerAccountID).
|
||||
In(repository.Field(fieldTreasuryStatus), statusValues...).
|
||||
Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Gte, dayStart).
|
||||
Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Lt, dayEnd)
|
||||
|
||||
result := make([]*model.TreasuryRequest, 0)
|
||||
err := t.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error {
|
||||
next := &model.TreasuryRequest{}
|
||||
if err := cur.Decode(next); err != nil {
|
||||
return err
|
||||
}
|
||||
result = append(result, next)
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
var _ storage.TreasuryRequestsStore = (*TreasuryRequests)(nil)
|
||||
Reference in New Issue
Block a user