package store import ( "context" "errors" "strings" "github.com/tech/sendico/gateway/tgsettle/storage" "github.com/tech/sendico/gateway/tgsettle/storage/model" "github.com/tech/sendico/pkg/db/repository" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/v2/mongo" "go.uber.org/zap" ) const ( paymentsCollection = "payments" fieldIdempotencyKey = "idempotencyKey" fieldOperationRef = "operationRef" ) type Payments struct { logger mlogger.Logger repo repository.Repository } func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) { if db == nil { return nil, merrors.InvalidArgument("mongo database is nil") } if logger == nil { logger = zap.NewNop() } logger = logger.Named("payments").With(zap.String("collection", paymentsCollection)) repo := repository.CreateMongoRepository(db, paymentsCollection) if err := repo.CreateIndex(&ri.Definition{ Keys: []ri.Key{{Field: fieldIdempotencyKey, Sort: ri.Asc}}, Unique: true, }); err != nil { logger.Error("Failed to create payments idempotency index", zap.Error(err), zap.String("index_field", fieldIdempotencyKey)) return nil, err } if err := repo.CreateIndex(&ri.Definition{ Keys: []ri.Key{{Field: fieldOperationRef, Sort: ri.Asc}}, Unique: true, Sparse: true, }); err != nil { logger.Error("Failed to create payments operation index", zap.Error(err), zap.String("index_field", fieldOperationRef)) return nil, err } p := &Payments{ logger: logger, repo: repo, } p.logger.Debug("Payments store initialised") return p, nil } func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error) { key = strings.TrimSpace(key) if key == "" { return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key") } var result model.PaymentRecord err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldIdempotencyKey, key), &result) if errors.Is(err, merrors.ErrNoData) { //nolint:nilnil // Not-found is represented as (nil, nil) by this store contract. return nil, nil } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Payment record lookup failed", zap.String("idempotency_key", key), zap.Error(err)) } return nil, err } return &result, nil } func (p *Payments) FindByOperationRef(ctx context.Context, key string) (*model.PaymentRecord, error) { key = strings.TrimSpace(key) if key == "" { return nil, merrors.InvalidArgument("operation reference is required", "operation_ref") } var result model.PaymentRecord err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldOperationRef, key), &result) if errors.Is(err, merrors.ErrNoData) { //nolint:nilnil // Not-found is represented as (nil, nil) by this store contract. return nil, nil } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Payment record lookup by operation ref failed", zap.String("operation_ref", key), zap.Error(err)) } return nil, err } return &result, nil } func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) error { if record == nil { return merrors.InvalidArgument("payment record is nil", "record") } record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey) record.QuoteRef = strings.TrimSpace(record.QuoteRef) record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg) record.TargetChatID = strings.TrimSpace(record.TargetChatID) record.IntentRef = strings.TrimSpace(record.IntentRef) record.OperationRef = strings.TrimSpace(record.OperationRef) if record.IntentRef == "" { return merrors.InvalidArgument("intention reference is required", "intent_ref") } if record.IdempotencyKey == "" { return merrors.InvalidArgument("idempotency key is required", "idempotency_key") } if record.IntentRef == "" { return merrors.InvalidArgument("intention reference key is required", "intent_ref") } existing, err := p.FindByIdempotencyKey(ctx, record.IdempotencyKey) if err != nil { return err } if existing != nil { record.ID = existing.ID if record.CreatedAt.IsZero() { record.CreatedAt = existing.CreatedAt } } err = p.repo.Upsert(ctx, record) if mongo.IsDuplicateKeyError(err) { // Concurrent insert by idempotency key: resolve existing ID and retry replace-by-ID. existing, lookupErr := p.FindByIdempotencyKey(ctx, record.IdempotencyKey) if lookupErr != nil { err = lookupErr } else if existing != nil { record.ID = existing.ID if record.CreatedAt.IsZero() { record.CreatedAt = existing.CreatedAt } err = p.repo.Upsert(ctx, record) } } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Failed to upsert payment record", zap.String("idempotency_key", record.IdempotencyKey), zap.String("intent_ref", record.IntentRef), zap.String("quote_ref", record.QuoteRef), zap.Error(err)) } return err } return nil } var _ storage.PaymentsStore = (*Payments)(nil)