Chimera Settle service
This commit is contained in:
132
api/gateway/chsettle/storage/mongo/repository.go
Normal file
132
api/gateway/chsettle/storage/mongo/repository.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package mongo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/chsettle/storage"
|
||||
"github.com/tech/sendico/gateway/chsettle/storage/mongo/store"
|
||||
gatewayoutbox "github.com/tech/sendico/gateway/common/outbox"
|
||||
"github.com/tech/sendico/pkg/db"
|
||||
"github.com/tech/sendico/pkg/db/transaction"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Repository struct {
|
||||
logger mlogger.Logger
|
||||
conn *db.MongoConnection
|
||||
db *mongo.Database
|
||||
txFactory transaction.Factory
|
||||
|
||||
payments storage.PaymentsStore
|
||||
tg storage.TelegramConfirmationsStore
|
||||
pending storage.PendingConfirmationsStore
|
||||
treasury storage.TreasuryRequestsStore
|
||||
users storage.TreasuryTelegramUsersStore
|
||||
outbox gatewayoutbox.Store
|
||||
}
|
||||
|
||||
func New(logger mlogger.Logger, conn *db.MongoConnection) (*Repository, error) {
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
if conn == nil {
|
||||
return nil, merrors.InvalidArgument("mongo connection is nil")
|
||||
}
|
||||
client := conn.Client()
|
||||
if client == nil {
|
||||
return nil, merrors.Internal("mongo client is not initialised")
|
||||
}
|
||||
db := conn.Database()
|
||||
if db == nil {
|
||||
return nil, merrors.Internal("mongo database is not initialised")
|
||||
}
|
||||
dbName := db.Name()
|
||||
logger = logger.Named("storage").Named("mongo")
|
||||
if dbName != "" {
|
||||
logger = logger.With(zap.String("database", dbName))
|
||||
}
|
||||
result := &Repository{
|
||||
logger: logger,
|
||||
conn: conn,
|
||||
db: db,
|
||||
txFactory: newMongoTransactionFactory(client),
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := result.conn.Ping(ctx); err != nil {
|
||||
result.logger.Error("Mongo ping failed during repository initialisation", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
paymentsStore, err := store.NewPayments(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise payments store", zap.Error(err), zap.String("store", "payments"))
|
||||
return nil, err
|
||||
}
|
||||
tgStore, err := store.NewTelegramConfirmations(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise telegram confirmations store", zap.Error(err), zap.String("store", "telegram_confirmations"))
|
||||
return nil, err
|
||||
}
|
||||
pendingStore, err := store.NewPendingConfirmations(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise pending confirmations store", zap.Error(err), zap.String("store", "pending_confirmations"))
|
||||
return nil, err
|
||||
}
|
||||
treasuryStore, err := store.NewTreasuryRequests(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise treasury requests store", zap.Error(err), zap.String("store", "treasury_requests"))
|
||||
return nil, err
|
||||
}
|
||||
treasuryUsersStore, err := store.NewTreasuryTelegramUsers(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise treasury telegram users store", zap.Error(err), zap.String("store", "treasury_telegram_users"))
|
||||
return nil, err
|
||||
}
|
||||
outboxStore, err := gatewayoutbox.NewMongoStore(result.logger, result.db)
|
||||
if err != nil {
|
||||
result.logger.Error("Failed to initialise outbox store", zap.Error(err), zap.String("store", "outbox"))
|
||||
return nil, err
|
||||
}
|
||||
result.payments = paymentsStore
|
||||
result.tg = tgStore
|
||||
result.pending = pendingStore
|
||||
result.treasury = treasuryStore
|
||||
result.users = treasuryUsersStore
|
||||
result.outbox = outboxStore
|
||||
result.logger.Info("Payment gateway MongoDB storage initialised")
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *Repository) Payments() storage.PaymentsStore {
|
||||
return r.payments
|
||||
}
|
||||
|
||||
func (r *Repository) TelegramConfirmations() storage.TelegramConfirmationsStore {
|
||||
return r.tg
|
||||
}
|
||||
|
||||
func (r *Repository) PendingConfirmations() storage.PendingConfirmationsStore {
|
||||
return r.pending
|
||||
}
|
||||
|
||||
func (r *Repository) TreasuryRequests() storage.TreasuryRequestsStore {
|
||||
return r.treasury
|
||||
}
|
||||
|
||||
func (r *Repository) TreasuryTelegramUsers() storage.TreasuryTelegramUsersStore {
|
||||
return r.users
|
||||
}
|
||||
|
||||
func (r *Repository) Outbox() gatewayoutbox.Store {
|
||||
return r.outbox
|
||||
}
|
||||
|
||||
func (r *Repository) TransactionFactory() transaction.Factory {
|
||||
return r.txFactory
|
||||
}
|
||||
|
||||
var _ storage.Repository = (*Repository)(nil)
|
||||
159
api/gateway/chsettle/storage/mongo/store/payments.go
Normal file
159
api/gateway/chsettle/storage/mongo/store/payments.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/tech/sendico/gateway/chsettle/storage"
|
||||
"github.com/tech/sendico/gateway/chsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/db/repository"
|
||||
ri "github.com/tech/sendico/pkg/db/repository/index"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
paymentsCollection = "payments"
|
||||
fieldIdempotencyKey = "idempotencyKey"
|
||||
fieldOperationRef = "operationRef"
|
||||
)
|
||||
|
||||
type Payments struct {
|
||||
logger mlogger.Logger
|
||||
repo repository.Repository
|
||||
}
|
||||
|
||||
func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) {
|
||||
if db == nil {
|
||||
return nil, merrors.InvalidArgument("mongo database is nil")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
logger = logger.Named("payments").With(zap.String("collection", paymentsCollection))
|
||||
|
||||
repo := repository.CreateMongoRepository(db, paymentsCollection)
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldIdempotencyKey, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create payments idempotency index", zap.Error(err), zap.String("index_field", fieldIdempotencyKey))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldOperationRef, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
Sparse: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create payments operation index", zap.Error(err), zap.String("index_field", fieldOperationRef))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := &Payments{
|
||||
logger: logger,
|
||||
repo: repo,
|
||||
}
|
||||
p.logger.Debug("Payments store initialised")
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error) {
|
||||
key = strings.TrimSpace(key)
|
||||
if key == "" {
|
||||
return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key")
|
||||
}
|
||||
var result model.PaymentRecord
|
||||
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldIdempotencyKey, key), &result)
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
p.logger.Warn("Payment record lookup failed", zap.String("idempotency_key", key), zap.Error(err))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (p *Payments) FindByOperationRef(ctx context.Context, key string) (*model.PaymentRecord, error) {
|
||||
key = strings.TrimSpace(key)
|
||||
if key == "" {
|
||||
return nil, merrors.InvalidArgument("operation reference is required", "operation_ref")
|
||||
}
|
||||
var result model.PaymentRecord
|
||||
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldOperationRef, key), &result)
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
p.logger.Warn("Payment record lookup by operation ref failed", zap.String("operation_ref", key), zap.Error(err))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) error {
|
||||
if record == nil {
|
||||
return merrors.InvalidArgument("payment record is nil", "record")
|
||||
}
|
||||
record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey)
|
||||
record.QuoteRef = strings.TrimSpace(record.QuoteRef)
|
||||
record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg)
|
||||
record.TargetChatID = strings.TrimSpace(record.TargetChatID)
|
||||
record.IntentRef = strings.TrimSpace(record.IntentRef)
|
||||
record.OperationRef = strings.TrimSpace(record.OperationRef)
|
||||
if record.IntentRef == "" {
|
||||
return merrors.InvalidArgument("intention reference is required", "intent_ref")
|
||||
}
|
||||
if record.IdempotencyKey == "" {
|
||||
return merrors.InvalidArgument("idempotency key is required", "idempotency_key")
|
||||
}
|
||||
if record.IntentRef == "" {
|
||||
return merrors.InvalidArgument("intention reference key is required", "intent_ref")
|
||||
}
|
||||
|
||||
existing, err := p.FindByIdempotencyKey(ctx, record.IdempotencyKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if existing != nil {
|
||||
record.ID = existing.ID
|
||||
if record.CreatedAt.IsZero() {
|
||||
record.CreatedAt = existing.CreatedAt
|
||||
}
|
||||
}
|
||||
|
||||
err = p.repo.Upsert(ctx, record)
|
||||
if mongo.IsDuplicateKeyError(err) {
|
||||
// Concurrent insert by idempotency key: resolve existing ID and retry replace-by-ID.
|
||||
existing, lookupErr := p.FindByIdempotencyKey(ctx, record.IdempotencyKey)
|
||||
if lookupErr != nil {
|
||||
err = lookupErr
|
||||
} else if existing != nil {
|
||||
record.ID = existing.ID
|
||||
if record.CreatedAt.IsZero() {
|
||||
record.CreatedAt = existing.CreatedAt
|
||||
}
|
||||
err = p.repo.Upsert(ctx, record)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
p.logger.Warn("Failed to upsert payment record",
|
||||
zap.String("idempotency_key", record.IdempotencyKey),
|
||||
zap.String("intent_ref", record.IntentRef),
|
||||
zap.String("quote_ref", record.QuoteRef),
|
||||
zap.Error(err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ storage.PaymentsStore = (*Payments)(nil)
|
||||
245
api/gateway/chsettle/storage/mongo/store/payments_test.go
Normal file
245
api/gateway/chsettle/storage/mongo/store/payments_test.go
Normal file
@@ -0,0 +1,245 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/chsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/db/repository"
|
||||
"github.com/tech/sendico/pkg/db/storable"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type fakePaymentsRepo struct {
|
||||
repository.Repository
|
||||
|
||||
records map[string]*model.PaymentRecord
|
||||
findErrByCall map[int]error
|
||||
duplicateWhenZeroID bool
|
||||
findCalls int
|
||||
upsertCalls int
|
||||
upsertIDs []bson.ObjectID
|
||||
upsertIdempotencyKey []string
|
||||
}
|
||||
|
||||
func (f *fakePaymentsRepo) FindOneByFilter(_ context.Context, query repository.FilterQuery, result storable.Storable) error {
|
||||
f.findCalls++
|
||||
if err, ok := f.findErrByCall[f.findCalls]; ok {
|
||||
return err
|
||||
}
|
||||
|
||||
rec, ok := result.(*model.PaymentRecord)
|
||||
if !ok {
|
||||
return merrors.InvalidDataType("expected *model.PaymentRecord")
|
||||
}
|
||||
|
||||
doc := query.BuildQuery()
|
||||
if key := stringField(doc, fieldIdempotencyKey); key != "" {
|
||||
stored, ok := f.records[key]
|
||||
if !ok {
|
||||
return merrors.NoData("payment not found by filter")
|
||||
}
|
||||
*rec = *stored
|
||||
return nil
|
||||
}
|
||||
if operationRef := stringField(doc, fieldOperationRef); operationRef != "" {
|
||||
for _, stored := range f.records {
|
||||
if strings.TrimSpace(stored.OperationRef) == operationRef {
|
||||
*rec = *stored
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return merrors.NoData("payment not found by operation ref")
|
||||
}
|
||||
|
||||
return merrors.NoData("payment not found")
|
||||
}
|
||||
|
||||
func (f *fakePaymentsRepo) Upsert(_ context.Context, obj storable.Storable) error {
|
||||
f.upsertCalls++
|
||||
|
||||
rec, ok := obj.(*model.PaymentRecord)
|
||||
if !ok {
|
||||
return merrors.InvalidDataType("expected *model.PaymentRecord")
|
||||
}
|
||||
f.upsertIDs = append(f.upsertIDs, rec.ID)
|
||||
f.upsertIdempotencyKey = append(f.upsertIdempotencyKey, rec.IdempotencyKey)
|
||||
|
||||
if f.duplicateWhenZeroID && rec.ID.IsZero() {
|
||||
if _, exists := f.records[rec.IdempotencyKey]; exists {
|
||||
return mongo.WriteException{
|
||||
WriteErrors: mongo.WriteErrors{
|
||||
{
|
||||
Code: 11000,
|
||||
Message: "E11000 duplicate key error collection: chsettle_gateway.payments",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
copyRec := *rec
|
||||
if copyRec.ID.IsZero() {
|
||||
copyRec.ID = bson.NewObjectID()
|
||||
}
|
||||
if copyRec.CreatedAt.IsZero() {
|
||||
copyRec.CreatedAt = time.Now().UTC()
|
||||
}
|
||||
copyRec.UpdatedAt = time.Now().UTC()
|
||||
if f.records == nil {
|
||||
f.records = map[string]*model.PaymentRecord{}
|
||||
}
|
||||
f.records[copyRec.IdempotencyKey] = ©Rec
|
||||
*rec = copyRec
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPaymentsUpsert_ReusesExistingIDFromIdempotencyLookup(t *testing.T) {
|
||||
key := "idem-existing"
|
||||
existingID := bson.NewObjectID()
|
||||
existingCreatedAt := time.Date(2026, 3, 6, 10, 0, 0, 0, time.UTC)
|
||||
|
||||
repo := &fakePaymentsRepo{
|
||||
records: map[string]*model.PaymentRecord{
|
||||
key: {
|
||||
Base: storable.Base{
|
||||
ID: existingID,
|
||||
CreatedAt: existingCreatedAt,
|
||||
UpdatedAt: existingCreatedAt,
|
||||
},
|
||||
IdempotencyKey: key,
|
||||
IntentRef: "pi-old",
|
||||
},
|
||||
},
|
||||
duplicateWhenZeroID: true,
|
||||
}
|
||||
store := &Payments{logger: zap.NewNop(), repo: repo}
|
||||
|
||||
record := &model.PaymentRecord{
|
||||
IdempotencyKey: key,
|
||||
IntentRef: "pi-new",
|
||||
QuoteRef: "quote-new",
|
||||
}
|
||||
|
||||
if err := store.Upsert(context.Background(), record); err != nil {
|
||||
t.Fatalf("upsert failed: %v", err)
|
||||
}
|
||||
|
||||
if repo.upsertCalls != 1 {
|
||||
t.Fatalf("expected one upsert call, got %d", repo.upsertCalls)
|
||||
}
|
||||
if len(repo.upsertIDs) != 1 || repo.upsertIDs[0] != existingID {
|
||||
t.Fatalf("expected upsert to reuse existing id %s, got %+v", existingID.Hex(), repo.upsertIDs)
|
||||
}
|
||||
if record.ID != existingID {
|
||||
t.Fatalf("record ID mismatch: got %s want %s", record.ID.Hex(), existingID.Hex())
|
||||
}
|
||||
}
|
||||
|
||||
func TestPaymentsUpsert_RetriesAfterDuplicateKeyRace(t *testing.T) {
|
||||
key := "idem-race"
|
||||
existingID := bson.NewObjectID()
|
||||
|
||||
repo := &fakePaymentsRepo{
|
||||
records: map[string]*model.PaymentRecord{
|
||||
key: {
|
||||
Base: storable.Base{
|
||||
ID: existingID,
|
||||
CreatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC),
|
||||
UpdatedAt: time.Date(2026, 3, 6, 10, 1, 0, 0, time.UTC),
|
||||
},
|
||||
IdempotencyKey: key,
|
||||
IntentRef: "pi-existing",
|
||||
},
|
||||
},
|
||||
findErrByCall: map[int]error{
|
||||
1: merrors.NoData("payment not found by filter"),
|
||||
},
|
||||
duplicateWhenZeroID: true,
|
||||
}
|
||||
store := &Payments{logger: zap.NewNop(), repo: repo}
|
||||
|
||||
record := &model.PaymentRecord{
|
||||
IdempotencyKey: key,
|
||||
IntentRef: "pi-new",
|
||||
QuoteRef: "quote-new",
|
||||
}
|
||||
|
||||
if err := store.Upsert(context.Background(), record); err != nil {
|
||||
t.Fatalf("upsert failed: %v", err)
|
||||
}
|
||||
|
||||
if repo.upsertCalls != 2 {
|
||||
t.Fatalf("expected two upsert calls, got %d", repo.upsertCalls)
|
||||
}
|
||||
if len(repo.upsertIDs) != 2 {
|
||||
t.Fatalf("expected two upsert IDs, got %d", len(repo.upsertIDs))
|
||||
}
|
||||
if !repo.upsertIDs[0].IsZero() {
|
||||
t.Fatalf("expected first upsert to use zero id due stale read, got %s", repo.upsertIDs[0].Hex())
|
||||
}
|
||||
if repo.upsertIDs[1] != existingID {
|
||||
t.Fatalf("expected retry to use existing id %s, got %s", existingID.Hex(), repo.upsertIDs[1].Hex())
|
||||
}
|
||||
}
|
||||
|
||||
func TestPaymentsUpsert_PropagatesNoSuchTransactionAfterDuplicate(t *testing.T) {
|
||||
key := "idem-nosuchtx"
|
||||
|
||||
repo := &fakePaymentsRepo{
|
||||
records: map[string]*model.PaymentRecord{
|
||||
key: {
|
||||
Base: storable.Base{
|
||||
ID: bson.NewObjectID(),
|
||||
CreatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC),
|
||||
UpdatedAt: time.Date(2026, 3, 6, 10, 2, 0, 0, time.UTC),
|
||||
},
|
||||
IdempotencyKey: key,
|
||||
IntentRef: "pi-existing",
|
||||
},
|
||||
},
|
||||
findErrByCall: map[int]error{
|
||||
1: merrors.NoData("payment not found by filter"),
|
||||
2: mongo.CommandError{
|
||||
Code: 251,
|
||||
Name: "NoSuchTransaction",
|
||||
Message: "Transaction with { txnNumber: 2 } has been aborted.",
|
||||
},
|
||||
},
|
||||
duplicateWhenZeroID: true,
|
||||
}
|
||||
store := &Payments{logger: zap.NewNop(), repo: repo}
|
||||
|
||||
record := &model.PaymentRecord{
|
||||
IdempotencyKey: key,
|
||||
IntentRef: "pi-new",
|
||||
QuoteRef: "quote-new",
|
||||
}
|
||||
|
||||
err := store.Upsert(context.Background(), record)
|
||||
if err == nil {
|
||||
t.Fatal("expected error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "NoSuchTransaction") {
|
||||
t.Fatalf("expected NoSuchTransaction error, got %v", err)
|
||||
}
|
||||
if repo.upsertCalls != 1 {
|
||||
t.Fatalf("expected one upsert attempt before lookup failure, got %d", repo.upsertCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func stringField(doc bson.D, key string) string {
|
||||
for _, entry := range doc {
|
||||
if entry.Key != key {
|
||||
continue
|
||||
}
|
||||
res, _ := entry.Value.(string)
|
||||
return strings.TrimSpace(res)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -0,0 +1,205 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/chsettle/storage"
|
||||
"github.com/tech/sendico/gateway/chsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/db/repository"
|
||||
"github.com/tech/sendico/pkg/db/repository/builder"
|
||||
ri "github.com/tech/sendico/pkg/db/repository/index"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
mutil "github.com/tech/sendico/pkg/mutil/db"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
pendingConfirmationsCollection = "pending_confirmations"
|
||||
fieldPendingRequestID = "requestId"
|
||||
fieldPendingMessageID = "messageId"
|
||||
fieldPendingExpiresAt = "expiresAt"
|
||||
)
|
||||
|
||||
type PendingConfirmations struct {
|
||||
logger mlogger.Logger
|
||||
repo repository.Repository
|
||||
}
|
||||
|
||||
func NewPendingConfirmations(logger mlogger.Logger, db *mongo.Database) (*PendingConfirmations, error) {
|
||||
if db == nil {
|
||||
return nil, merrors.InvalidArgument("mongo database is nil")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
logger = logger.Named("pending_confirmations").With(zap.String("collection", pendingConfirmationsCollection))
|
||||
|
||||
repo := repository.CreateMongoRepository(db, pendingConfirmationsCollection)
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldPendingRequestID, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create pending confirmations request_id index", zap.Error(err), zap.String("index_field", fieldPendingRequestID))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldPendingMessageID, Sort: ri.Asc}},
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create pending confirmations message_id index", zap.Error(err), zap.String("index_field", fieldPendingMessageID))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldPendingExpiresAt, Sort: ri.Asc}},
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create pending confirmations expires_at index", zap.Error(err), zap.String("index_field", fieldPendingExpiresAt))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p := &PendingConfirmations{
|
||||
logger: logger,
|
||||
repo: repo,
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) Upsert(ctx context.Context, record *model.PendingConfirmation) error {
|
||||
if record == nil {
|
||||
return merrors.InvalidArgument("pending confirmation is nil", "record")
|
||||
}
|
||||
record.RequestID = strings.TrimSpace(record.RequestID)
|
||||
record.MessageID = strings.TrimSpace(record.MessageID)
|
||||
record.TargetChatID = strings.TrimSpace(record.TargetChatID)
|
||||
record.SourceService = strings.TrimSpace(record.SourceService)
|
||||
record.Rail = strings.TrimSpace(record.Rail)
|
||||
if record.RequestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
if record.TargetChatID == "" {
|
||||
return merrors.InvalidArgument("target_chat_id is required", "target_chat_id")
|
||||
}
|
||||
if record.ExpiresAt.IsZero() {
|
||||
return merrors.InvalidArgument("expires_at is required", "expires_at")
|
||||
}
|
||||
|
||||
filter := repository.Filter(fieldPendingRequestID, record.RequestID)
|
||||
err := p.repo.Insert(ctx, record, filter)
|
||||
if errors.Is(err, merrors.ErrDataConflict) {
|
||||
patch := repository.Patch().
|
||||
Set(repository.Field(fieldPendingMessageID), record.MessageID).
|
||||
Set(repository.Field("targetChatId"), record.TargetChatID).
|
||||
Set(repository.Field("acceptedUserIds"), record.AcceptedUserIDs).
|
||||
Set(repository.Field("requestedMoney"), record.RequestedMoney).
|
||||
Set(repository.Field("sourceService"), record.SourceService).
|
||||
Set(repository.Field("rail"), record.Rail).
|
||||
Set(repository.Field("clarified"), record.Clarified).
|
||||
Set(repository.Field(fieldPendingExpiresAt), record.ExpiresAt)
|
||||
_, err = p.repo.PatchMany(ctx, filter, patch)
|
||||
}
|
||||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
p.logger.Warn("Failed to upsert pending confirmation", zap.Error(err), zap.String("request_id", record.RequestID))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) FindByRequestID(ctx context.Context, requestID string) (*model.PendingConfirmation, error) {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
var result model.PendingConfirmation
|
||||
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingRequestID, requestID), &result)
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) FindByMessageID(ctx context.Context, messageID string) (*model.PendingConfirmation, error) {
|
||||
messageID = strings.TrimSpace(messageID)
|
||||
if messageID == "" {
|
||||
return nil, merrors.InvalidArgument("message_id is required", "message_id")
|
||||
}
|
||||
var result model.PendingConfirmation
|
||||
err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldPendingMessageID, messageID), &result)
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) MarkClarified(ctx context.Context, requestID string) error {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
patch := repository.Patch().
|
||||
Set(repository.Field("clarified"), true)
|
||||
_, err := p.repo.PatchMany(ctx, repository.Filter(fieldPendingRequestID, requestID), patch)
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) AttachMessage(ctx context.Context, requestID string, messageID string) error {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
messageID = strings.TrimSpace(messageID)
|
||||
if requestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
if messageID == "" {
|
||||
return merrors.InvalidArgument("message_id is required", "message_id")
|
||||
}
|
||||
|
||||
filter := repository.Filter(fieldPendingRequestID, requestID).And(
|
||||
repository.Query().Or(
|
||||
repository.Exists(repository.Field(fieldPendingMessageID), false),
|
||||
repository.Filter(fieldPendingMessageID, ""),
|
||||
repository.Filter(fieldPendingMessageID, messageID),
|
||||
),
|
||||
)
|
||||
patch := repository.Patch().
|
||||
Set(repository.Field(fieldPendingMessageID), messageID)
|
||||
updated, err := p.repo.PatchMany(ctx, filter, patch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if updated == 0 {
|
||||
return merrors.NoData("pending confirmation not found")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) DeleteByRequestID(ctx context.Context, requestID string) error {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
return p.repo.DeleteMany(ctx, repository.Filter(fieldPendingRequestID, requestID))
|
||||
}
|
||||
|
||||
func (p *PendingConfirmations) ListExpired(ctx context.Context, now time.Time, limit int64) ([]model.PendingConfirmation, error) {
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
query := repository.Query().
|
||||
Comparison(repository.Field(fieldPendingExpiresAt), builder.Lte, now).
|
||||
Sort(repository.Field(fieldPendingExpiresAt), true).
|
||||
Limit(&limit)
|
||||
|
||||
items, err := mutil.GetObjects[model.PendingConfirmation](ctx, p.logger, query, nil, p.repo)
|
||||
if err != nil && !errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
var _ storage.PendingConfirmationsStore = (*PendingConfirmations)(nil)
|
||||
@@ -0,0 +1,91 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/chsettle/storage"
|
||||
"github.com/tech/sendico/gateway/chsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/db/repository"
|
||||
ri "github.com/tech/sendico/pkg/db/repository/index"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
telegramCollection = "telegram_confirmations"
|
||||
fieldRequestID = "requestId"
|
||||
)
|
||||
|
||||
type TelegramConfirmations struct {
|
||||
logger mlogger.Logger
|
||||
repo repository.Repository
|
||||
}
|
||||
|
||||
func NewTelegramConfirmations(logger mlogger.Logger, db *mongo.Database) (*TelegramConfirmations, error) {
|
||||
if db == nil {
|
||||
return nil, merrors.InvalidArgument("mongo database is nil")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
logger = logger.Named("telegram_confirmations").With(zap.String("collection", telegramCollection))
|
||||
|
||||
repo := repository.CreateMongoRepository(db, telegramCollection)
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldRequestID, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create telegram confirmations request_id index", zap.Error(err), zap.String("index_field", fieldRequestID))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t := &TelegramConfirmations{
|
||||
logger: logger,
|
||||
repo: repo,
|
||||
}
|
||||
t.logger.Debug("Telegram confirmations store initialised")
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (t *TelegramConfirmations) Upsert(ctx context.Context, record *model.TelegramConfirmation) error {
|
||||
if record == nil {
|
||||
return merrors.InvalidArgument("telegram confirmation is nil", "record")
|
||||
}
|
||||
record.RequestID = strings.TrimSpace(record.RequestID)
|
||||
record.PaymentIntentID = strings.TrimSpace(record.PaymentIntentID)
|
||||
record.QuoteRef = strings.TrimSpace(record.QuoteRef)
|
||||
if record.RequestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
if record.ReceivedAt.IsZero() {
|
||||
record.ReceivedAt = time.Now()
|
||||
}
|
||||
filter := repository.Filter(fieldRequestID, record.RequestID)
|
||||
err := t.repo.Insert(ctx, record, filter)
|
||||
if errors.Is(err, merrors.ErrDataConflict) {
|
||||
patch := repository.Patch().
|
||||
Set(repository.Field("paymentIntentId"), record.PaymentIntentID).
|
||||
Set(repository.Field("quoteRef"), record.QuoteRef).
|
||||
Set(repository.Field("rawReply"), record.RawReply).
|
||||
Set(repository.Field("receivedAt"), record.ReceivedAt)
|
||||
_, err = t.repo.PatchMany(ctx, filter, patch)
|
||||
}
|
||||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
fields := []zap.Field{zap.String("request_id", record.RequestID)}
|
||||
if record.PaymentIntentID != "" {
|
||||
fields = append(fields, zap.String("payment_intent_id", record.PaymentIntentID))
|
||||
}
|
||||
if record.QuoteRef != "" {
|
||||
fields = append(fields, zap.String("quote_ref", record.QuoteRef))
|
||||
}
|
||||
t.logger.Warn("Failed to upsert telegram confirmation", append(fields, zap.Error(err))...)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var _ storage.TelegramConfirmationsStore = (*TelegramConfirmations)(nil)
|
||||
402
api/gateway/chsettle/storage/mongo/store/treasury_requests.go
Normal file
402
api/gateway/chsettle/storage/mongo/store/treasury_requests.go
Normal file
@@ -0,0 +1,402 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tech/sendico/gateway/chsettle/storage"
|
||||
"github.com/tech/sendico/gateway/chsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/db/repository"
|
||||
"github.com/tech/sendico/pkg/db/repository/builder"
|
||||
ri "github.com/tech/sendico/pkg/db/repository/index"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
mutil "github.com/tech/sendico/pkg/mutil/db"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
treasuryRequestsCollection = "treasury_requests"
|
||||
|
||||
fieldTreasuryRequestID = "requestId"
|
||||
fieldTreasuryLedgerAccount = "ledgerAccountId"
|
||||
fieldTreasuryIdempotencyKey = "idempotencyKey"
|
||||
fieldTreasuryStatus = "status"
|
||||
fieldTreasuryScheduledAt = "scheduledAt"
|
||||
fieldTreasuryCreatedAt = "createdAt"
|
||||
fieldTreasuryActive = "active"
|
||||
)
|
||||
|
||||
type TreasuryRequests struct {
|
||||
logger mlogger.Logger
|
||||
repo repository.Repository
|
||||
}
|
||||
|
||||
func NewTreasuryRequests(logger mlogger.Logger, db *mongo.Database) (*TreasuryRequests, error) {
|
||||
if db == nil {
|
||||
return nil, merrors.InvalidArgument("mongo database is nil")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
logger = logger.Named("treasury_requests").With(zap.String("collection", treasuryRequestsCollection))
|
||||
|
||||
repo := repository.CreateMongoRepository(db, treasuryRequestsCollection)
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldTreasuryRequestID, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests request_id index", zap.Error(err), zap.String("index_field", fieldTreasuryRequestID))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldTreasuryIdempotencyKey, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests idempotency index", zap.Error(err), zap.String("index_field", fieldTreasuryIdempotencyKey))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{
|
||||
{Field: fieldTreasuryLedgerAccount, Sort: ri.Asc},
|
||||
{Field: fieldTreasuryActive, Sort: ri.Asc},
|
||||
},
|
||||
Unique: true,
|
||||
PartialFilter: repository.Filter(fieldTreasuryActive, true),
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests active-account index", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{
|
||||
{Field: fieldTreasuryStatus, Sort: ri.Asc},
|
||||
{Field: fieldTreasuryScheduledAt, Sort: ri.Asc},
|
||||
},
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests execution index", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{
|
||||
{Field: fieldTreasuryLedgerAccount, Sort: ri.Asc},
|
||||
{Field: fieldTreasuryCreatedAt, Sort: ri.Asc},
|
||||
},
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury requests daily-amount index", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t := &TreasuryRequests{
|
||||
logger: logger,
|
||||
repo: repo,
|
||||
}
|
||||
t.logger.Debug("Treasury requests store initialised")
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) Create(ctx context.Context, record *model.TreasuryRequest) error {
|
||||
if record == nil {
|
||||
return merrors.InvalidArgument("treasury request is nil", "record")
|
||||
}
|
||||
record.RequestID = strings.TrimSpace(record.RequestID)
|
||||
record.TelegramUserID = strings.TrimSpace(record.TelegramUserID)
|
||||
record.LedgerAccountID = strings.TrimSpace(record.LedgerAccountID)
|
||||
record.LedgerAccountCode = strings.TrimSpace(record.LedgerAccountCode)
|
||||
record.OrganizationRef = strings.TrimSpace(record.OrganizationRef)
|
||||
record.ChatID = strings.TrimSpace(record.ChatID)
|
||||
record.Amount = strings.TrimSpace(record.Amount)
|
||||
record.Currency = strings.ToUpper(strings.TrimSpace(record.Currency))
|
||||
record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey)
|
||||
record.LedgerReference = strings.TrimSpace(record.LedgerReference)
|
||||
record.ErrorMessage = strings.TrimSpace(record.ErrorMessage)
|
||||
|
||||
if record.RequestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
if record.TelegramUserID == "" {
|
||||
return merrors.InvalidArgument("telegram_user_id is required", "telegram_user_id")
|
||||
}
|
||||
if record.LedgerAccountID == "" {
|
||||
return merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id")
|
||||
}
|
||||
if record.Amount == "" {
|
||||
return merrors.InvalidArgument("amount is required", "amount")
|
||||
}
|
||||
if record.Currency == "" {
|
||||
return merrors.InvalidArgument("currency is required", "currency")
|
||||
}
|
||||
if record.IdempotencyKey == "" {
|
||||
return merrors.InvalidArgument("idempotency_key is required", "idempotency_key")
|
||||
}
|
||||
if record.Status == "" {
|
||||
return merrors.InvalidArgument("status is required", "status")
|
||||
}
|
||||
|
||||
err := t.repo.Insert(ctx, record, repository.Filter(fieldTreasuryRequestID, record.RequestID))
|
||||
if errors.Is(err, merrors.ErrDataConflict) {
|
||||
return storage.ErrDuplicate
|
||||
}
|
||||
if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
t.logger.Warn("Failed to create treasury request", zap.Error(err), zap.String("request_id", record.RequestID))
|
||||
return err
|
||||
}
|
||||
t.logger.Info("Treasury request created",
|
||||
zap.String("request_id", record.RequestID),
|
||||
zap.String("telegram_user_id", record.TelegramUserID),
|
||||
zap.String("chat_id", record.ChatID),
|
||||
zap.String("ledger_account_id", record.LedgerAccountID),
|
||||
zap.String("ledger_account_code", record.LedgerAccountCode),
|
||||
zap.String("operation_type", strings.TrimSpace(string(record.OperationType))),
|
||||
zap.String("status", strings.TrimSpace(string(record.Status))),
|
||||
zap.String("amount", record.Amount),
|
||||
zap.String("currency", record.Currency))
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) FindByRequestID(ctx context.Context, requestID string) (*model.TreasuryRequest, error) {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return nil, merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
var result model.TreasuryRequest
|
||||
err := t.repo.FindOneByFilter(ctx, repository.Filter(fieldTreasuryRequestID, requestID), &result)
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
t.logger.Debug("Treasury request not found", zap.String("request_id", requestID))
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
t.logger.Warn("Failed to load treasury request", zap.Error(err), zap.String("request_id", requestID))
|
||||
return nil, err
|
||||
}
|
||||
t.logger.Debug("Treasury request loaded",
|
||||
zap.String("request_id", requestID),
|
||||
zap.String("status", strings.TrimSpace(string(result.Status))),
|
||||
zap.String("ledger_account_id", strings.TrimSpace(result.LedgerAccountID)))
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) FindActiveByLedgerAccountID(ctx context.Context, ledgerAccountID string) (*model.TreasuryRequest, error) {
|
||||
ledgerAccountID = strings.TrimSpace(ledgerAccountID)
|
||||
if ledgerAccountID == "" {
|
||||
return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id")
|
||||
}
|
||||
var result model.TreasuryRequest
|
||||
query := repository.Query().
|
||||
Filter(repository.Field(fieldTreasuryLedgerAccount), ledgerAccountID).
|
||||
Filter(repository.Field(fieldTreasuryActive), true)
|
||||
err := t.repo.FindOneByFilter(ctx, query, &result)
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
t.logger.Debug("Active treasury request not found", zap.String("ledger_account_id", ledgerAccountID))
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
t.logger.Warn("Failed to load active treasury request", zap.Error(err), zap.String("ledger_account_id", ledgerAccountID))
|
||||
return nil, err
|
||||
}
|
||||
t.logger.Debug("Active treasury request loaded",
|
||||
zap.String("request_id", strings.TrimSpace(result.RequestID)),
|
||||
zap.String("ledger_account_id", ledgerAccountID),
|
||||
zap.String("status", strings.TrimSpace(string(result.Status))))
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) FindDueByStatus(ctx context.Context, statuses []model.TreasuryRequestStatus, now time.Time, limit int64) ([]model.TreasuryRequest, error) {
|
||||
if len(statuses) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
statusValues := make([]any, 0, len(statuses))
|
||||
for _, status := range statuses {
|
||||
next := strings.TrimSpace(string(status))
|
||||
if next == "" {
|
||||
continue
|
||||
}
|
||||
statusValues = append(statusValues, next)
|
||||
}
|
||||
if len(statusValues) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
query := repository.Query().
|
||||
In(repository.Field(fieldTreasuryStatus), statusValues...).
|
||||
Comparison(repository.Field(fieldTreasuryScheduledAt), builder.Lte, now).
|
||||
Sort(repository.Field(fieldTreasuryScheduledAt), true).
|
||||
Limit(&limit)
|
||||
|
||||
result, err := mutil.GetObjects[model.TreasuryRequest](ctx, t.logger, query, nil, t.repo)
|
||||
if err != nil && !errors.Is(err, merrors.ErrNoData) {
|
||||
t.logger.Warn("Failed to list due treasury requests",
|
||||
zap.Error(err),
|
||||
zap.Any("statuses", statusValues),
|
||||
zap.Time("scheduled_before", now),
|
||||
zap.Int64("limit", limit))
|
||||
return nil, err
|
||||
}
|
||||
t.logger.Debug("Due treasury requests loaded",
|
||||
zap.Any("statuses", statusValues),
|
||||
zap.Time("scheduled_before", now),
|
||||
zap.Int64("limit", limit),
|
||||
zap.Int("count", len(result)))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) ClaimScheduled(ctx context.Context, requestID string) (bool, error) {
|
||||
requestID = strings.TrimSpace(requestID)
|
||||
if requestID == "" {
|
||||
return false, merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
patch := repository.Patch().
|
||||
Set(repository.Field(fieldTreasuryStatus), string(model.TreasuryRequestStatusConfirmed))
|
||||
updated, err := t.repo.PatchMany(ctx, repository.Filter(fieldTreasuryRequestID, requestID).And(
|
||||
repository.Filter(fieldTreasuryStatus, string(model.TreasuryRequestStatusScheduled)),
|
||||
), patch)
|
||||
if err != nil {
|
||||
t.logger.Warn("Failed to claim scheduled treasury request", zap.Error(err), zap.String("request_id", requestID))
|
||||
return false, err
|
||||
}
|
||||
if updated > 0 {
|
||||
t.logger.Info("Scheduled treasury request claimed", zap.String("request_id", requestID))
|
||||
} else {
|
||||
t.logger.Debug("Scheduled treasury request claim skipped", zap.String("request_id", requestID))
|
||||
}
|
||||
return updated > 0, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) Update(ctx context.Context, record *model.TreasuryRequest) error {
|
||||
if record == nil {
|
||||
return merrors.InvalidArgument("treasury request is nil", "record")
|
||||
}
|
||||
record.RequestID = strings.TrimSpace(record.RequestID)
|
||||
record.TelegramUserID = strings.TrimSpace(record.TelegramUserID)
|
||||
record.LedgerAccountID = strings.TrimSpace(record.LedgerAccountID)
|
||||
record.LedgerAccountCode = strings.TrimSpace(record.LedgerAccountCode)
|
||||
record.OrganizationRef = strings.TrimSpace(record.OrganizationRef)
|
||||
record.ChatID = strings.TrimSpace(record.ChatID)
|
||||
record.Amount = strings.TrimSpace(record.Amount)
|
||||
record.Currency = strings.ToUpper(strings.TrimSpace(record.Currency))
|
||||
record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey)
|
||||
record.LedgerReference = strings.TrimSpace(record.LedgerReference)
|
||||
record.ErrorMessage = strings.TrimSpace(record.ErrorMessage)
|
||||
if record.RequestID == "" {
|
||||
return merrors.InvalidArgument("request_id is required", "request_id")
|
||||
}
|
||||
existing, err := t.FindByRequestID(ctx, record.RequestID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if existing == nil {
|
||||
return merrors.NoData("treasury request not found")
|
||||
}
|
||||
|
||||
patch := repository.Patch().
|
||||
Set(repository.Field("operationType"), record.OperationType).
|
||||
Set(repository.Field("telegramUserId"), record.TelegramUserID).
|
||||
Set(repository.Field("ledgerAccountId"), record.LedgerAccountID).
|
||||
Set(repository.Field("organizationRef"), record.OrganizationRef).
|
||||
Set(repository.Field("chatId"), record.ChatID).
|
||||
Set(repository.Field("amount"), record.Amount).
|
||||
Set(repository.Field("currency"), record.Currency).
|
||||
Set(repository.Field(fieldTreasuryStatus), record.Status).
|
||||
Set(repository.Field(fieldTreasuryIdempotencyKey), record.IdempotencyKey).
|
||||
Set(repository.Field(fieldTreasuryActive), record.Active)
|
||||
if record.LedgerAccountCode != "" {
|
||||
patch = patch.Set(repository.Field("ledgerAccountCode"), record.LedgerAccountCode)
|
||||
} else {
|
||||
patch = patch.Unset(repository.Field("ledgerAccountCode"))
|
||||
}
|
||||
if !record.ConfirmedAt.IsZero() {
|
||||
patch = patch.Set(repository.Field("confirmedAt"), record.ConfirmedAt)
|
||||
} else {
|
||||
patch = patch.Unset(repository.Field("confirmedAt"))
|
||||
}
|
||||
if !record.ScheduledAt.IsZero() {
|
||||
patch = patch.Set(repository.Field("scheduledAt"), record.ScheduledAt)
|
||||
} else {
|
||||
patch = patch.Unset(repository.Field("scheduledAt"))
|
||||
}
|
||||
if !record.ExecutedAt.IsZero() {
|
||||
patch = patch.Set(repository.Field("executedAt"), record.ExecutedAt)
|
||||
} else {
|
||||
patch = patch.Unset(repository.Field("executedAt"))
|
||||
}
|
||||
if !record.CancelledAt.IsZero() {
|
||||
patch = patch.Set(repository.Field("cancelledAt"), record.CancelledAt)
|
||||
} else {
|
||||
patch = patch.Unset(repository.Field("cancelledAt"))
|
||||
}
|
||||
if record.LedgerReference != "" {
|
||||
patch = patch.Set(repository.Field("ledgerReference"), record.LedgerReference)
|
||||
} else {
|
||||
patch = patch.Unset(repository.Field("ledgerReference"))
|
||||
}
|
||||
if record.ErrorMessage != "" {
|
||||
patch = patch.Set(repository.Field("errorMessage"), record.ErrorMessage)
|
||||
} else {
|
||||
patch = patch.Unset(repository.Field("errorMessage"))
|
||||
}
|
||||
if _, err := t.repo.PatchMany(ctx, repository.Filter(fieldTreasuryRequestID, record.RequestID), patch); err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
t.logger.Warn("Failed to update treasury request", zap.Error(err), zap.String("request_id", record.RequestID))
|
||||
}
|
||||
return err
|
||||
}
|
||||
t.logger.Info("Treasury request updated",
|
||||
zap.String("request_id", record.RequestID),
|
||||
zap.String("telegram_user_id", strings.TrimSpace(record.TelegramUserID)),
|
||||
zap.String("chat_id", strings.TrimSpace(record.ChatID)),
|
||||
zap.String("ledger_account_id", strings.TrimSpace(record.LedgerAccountID)),
|
||||
zap.String("ledger_account_code", strings.TrimSpace(record.LedgerAccountCode)),
|
||||
zap.String("operation_type", strings.TrimSpace(string(record.OperationType))),
|
||||
zap.String("status", strings.TrimSpace(string(record.Status))),
|
||||
zap.String("amount", strings.TrimSpace(record.Amount)),
|
||||
zap.String("currency", strings.TrimSpace(record.Currency)),
|
||||
zap.String("error_message", strings.TrimSpace(record.ErrorMessage)))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TreasuryRequests) ListByAccountAndStatuses(ctx context.Context, ledgerAccountID string, statuses []model.TreasuryRequestStatus, dayStart, dayEnd time.Time) ([]model.TreasuryRequest, error) {
|
||||
ledgerAccountID = strings.TrimSpace(ledgerAccountID)
|
||||
if ledgerAccountID == "" {
|
||||
return nil, merrors.InvalidArgument("ledger_account_id is required", "ledger_account_id")
|
||||
}
|
||||
statusValues := make([]any, 0, len(statuses))
|
||||
for _, status := range statuses {
|
||||
next := strings.TrimSpace(string(status))
|
||||
if next == "" {
|
||||
continue
|
||||
}
|
||||
statusValues = append(statusValues, next)
|
||||
}
|
||||
if len(statusValues) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
query := repository.Query().
|
||||
Filter(repository.Field(fieldTreasuryLedgerAccount), ledgerAccountID).
|
||||
In(repository.Field(fieldTreasuryStatus), statusValues...).
|
||||
Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Gte, dayStart).
|
||||
Comparison(repository.Field(fieldTreasuryCreatedAt), builder.Lt, dayEnd)
|
||||
|
||||
result, err := mutil.GetObjects[model.TreasuryRequest](ctx, t.logger, query, nil, t.repo)
|
||||
if err != nil && !errors.Is(err, merrors.ErrNoData) {
|
||||
t.logger.Warn("Failed to list treasury requests by account and statuses",
|
||||
zap.Error(err),
|
||||
zap.String("ledger_account_id", ledgerAccountID),
|
||||
zap.Any("statuses", statusValues),
|
||||
zap.Time("day_start", dayStart),
|
||||
zap.Time("day_end", dayEnd))
|
||||
return nil, err
|
||||
}
|
||||
t.logger.Debug("Treasury requests loaded by account and statuses",
|
||||
zap.String("ledger_account_id", ledgerAccountID),
|
||||
zap.Any("statuses", statusValues),
|
||||
zap.Time("day_start", dayStart),
|
||||
zap.Time("day_end", dayEnd),
|
||||
zap.Int("count", len(result)))
|
||||
return result, nil
|
||||
}
|
||||
|
||||
var _ storage.TreasuryRequestsStore = (*TreasuryRequests)(nil)
|
||||
@@ -0,0 +1,87 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/tech/sendico/gateway/chsettle/storage"
|
||||
"github.com/tech/sendico/gateway/chsettle/storage/model"
|
||||
"github.com/tech/sendico/pkg/db/repository"
|
||||
ri "github.com/tech/sendico/pkg/db/repository/index"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
treasuryTelegramUsersCollection = "treasury_telegram_users"
|
||||
fieldTreasuryTelegramUserID = "telegramUserId"
|
||||
)
|
||||
|
||||
type TreasuryTelegramUsers struct {
|
||||
logger mlogger.Logger
|
||||
repo repository.Repository
|
||||
}
|
||||
|
||||
func NewTreasuryTelegramUsers(logger mlogger.Logger, db *mongo.Database) (*TreasuryTelegramUsers, error) {
|
||||
if db == nil {
|
||||
return nil, merrors.InvalidArgument("mongo database is nil")
|
||||
}
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
logger = logger.Named("treasury_telegram_users").With(zap.String("collection", treasuryTelegramUsersCollection))
|
||||
|
||||
repo := repository.CreateMongoRepository(db, treasuryTelegramUsersCollection)
|
||||
if err := repo.CreateIndex(&ri.Definition{
|
||||
Keys: []ri.Key{{Field: fieldTreasuryTelegramUserID, Sort: ri.Asc}},
|
||||
Unique: true,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to create treasury telegram users user_id index", zap.Error(err), zap.String("index_field", fieldTreasuryTelegramUserID))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &TreasuryTelegramUsers{
|
||||
logger: logger,
|
||||
repo: repo,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *TreasuryTelegramUsers) FindByTelegramUserID(ctx context.Context, telegramUserID string) (*model.TreasuryTelegramUser, error) {
|
||||
telegramUserID = strings.TrimSpace(telegramUserID)
|
||||
if telegramUserID == "" {
|
||||
return nil, merrors.InvalidArgument("telegram_user_id is required", "telegram_user_id")
|
||||
}
|
||||
var result model.TreasuryTelegramUser
|
||||
err := t.repo.FindOneByFilter(ctx, repository.Filter(fieldTreasuryTelegramUserID, telegramUserID), &result)
|
||||
if errors.Is(err, merrors.ErrNoData) {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
t.logger.Warn("Failed to load treasury telegram user", zap.Error(err), zap.String("telegram_user_id", telegramUserID))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
result.TelegramUserID = strings.TrimSpace(result.TelegramUserID)
|
||||
result.LedgerAccountID = strings.TrimSpace(result.LedgerAccountID)
|
||||
if len(result.AllowedChatIDs) > 0 {
|
||||
normalized := make([]string, 0, len(result.AllowedChatIDs))
|
||||
for _, next := range result.AllowedChatIDs {
|
||||
next = strings.TrimSpace(next)
|
||||
if next == "" {
|
||||
continue
|
||||
}
|
||||
normalized = append(normalized, next)
|
||||
}
|
||||
result.AllowedChatIDs = normalized
|
||||
}
|
||||
if result.TelegramUserID == "" || result.LedgerAccountID == "" {
|
||||
return nil, nil
|
||||
}
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
var _ storage.TreasuryTelegramUsersStore = (*TreasuryTelegramUsers)(nil)
|
||||
38
api/gateway/chsettle/storage/mongo/transaction.go
Normal file
38
api/gateway/chsettle/storage/mongo/transaction.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package mongo
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/pkg/db/transaction"
|
||||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||||
)
|
||||
|
||||
type mongoTransactionFactory struct {
|
||||
client *mongo.Client
|
||||
}
|
||||
|
||||
func (f *mongoTransactionFactory) CreateTransaction() transaction.Transaction {
|
||||
return &mongoTransaction{client: f.client}
|
||||
}
|
||||
|
||||
type mongoTransaction struct {
|
||||
client *mongo.Client
|
||||
}
|
||||
|
||||
func (t *mongoTransaction) Execute(ctx context.Context, cb transaction.Callback) (any, error) {
|
||||
session, err := t.client.StartSession()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer session.EndSession(ctx)
|
||||
|
||||
run := func(sessCtx context.Context) (any, error) {
|
||||
return cb(sessCtx)
|
||||
}
|
||||
|
||||
return session.WithTransaction(ctx, run)
|
||||
}
|
||||
|
||||
func newMongoTransactionFactory(client *mongo.Client) transaction.Factory {
|
||||
return &mongoTransactionFactory{client: client}
|
||||
}
|
||||
Reference in New Issue
Block a user