package store import ( "context" "errors" "strings" "time" "github.com/tech/sendico/gateway/tgsettle/storage" "github.com/tech/sendico/gateway/tgsettle/storage/model" "github.com/tech/sendico/pkg/db/repository" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.uber.org/zap" ) const ( paymentsCollection = "payments" fieldIdempotencyKey = "idempotencyKey" fieldOperationRef = "operationRef" ) type Payments struct { logger mlogger.Logger repo repository.Repository } func NewPayments(logger mlogger.Logger, db *mongo.Database) (*Payments, error) { if db == nil { return nil, merrors.InvalidArgument("mongo database is nil") } if logger == nil { logger = zap.NewNop() } logger = logger.Named("payments").With(zap.String("collection", paymentsCollection)) repo := repository.CreateMongoRepository(db, paymentsCollection) if err := repo.CreateIndex(&ri.Definition{ Keys: []ri.Key{{Field: fieldIdempotencyKey, Sort: ri.Asc}}, Unique: true, }); err != nil { logger.Error("Failed to create payments idempotency index", zap.Error(err), zap.String("index_field", fieldIdempotencyKey)) return nil, err } if err := repo.CreateIndex(&ri.Definition{ Keys: []ri.Key{{Field: fieldOperationRef, Sort: ri.Asc}}, Unique: true, Sparse: true, }); err != nil { logger.Error("Failed to create payments operation index", zap.Error(err), zap.String("index_field", fieldOperationRef)) return nil, err } p := &Payments{ logger: logger, repo: repo, } p.logger.Debug("Payments store initialised") return p, nil } func (p *Payments) FindByIdempotencyKey(ctx context.Context, key string) (*model.PaymentRecord, error) { key = strings.TrimSpace(key) if key == "" { return nil, merrors.InvalidArgument("idempotency key is required", "idempotency_key") } var result model.PaymentRecord err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldIdempotencyKey, key), &result) if errors.Is(err, merrors.ErrNoData) { return nil, nil } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Payment record lookup failed", zap.String("idempotency_key", key), zap.Error(err)) } return nil, err } return &result, nil } func (p *Payments) FindByOperationRef(ctx context.Context, key string) (*model.PaymentRecord, error) { key = strings.TrimSpace(key) if key == "" { return nil, merrors.InvalidArgument("operation reference is required", "operation_ref") } var result model.PaymentRecord err := p.repo.FindOneByFilter(ctx, repository.Filter(fieldOperationRef, key), &result) if errors.Is(err, merrors.ErrNoData) { return nil, nil } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Payment record lookup by operation ref failed", zap.String("operation_ref", key), zap.Error(err)) } return nil, err } return &result, nil } func (p *Payments) Upsert(ctx context.Context, record *model.PaymentRecord) error { if record == nil { return merrors.InvalidArgument("payment record is nil", "record") } record.IdempotencyKey = strings.TrimSpace(record.IdempotencyKey) record.PaymentIntentID = strings.TrimSpace(record.PaymentIntentID) record.QuoteRef = strings.TrimSpace(record.QuoteRef) record.OutgoingLeg = strings.TrimSpace(record.OutgoingLeg) record.TargetChatID = strings.TrimSpace(record.TargetChatID) record.IntentRef = strings.TrimSpace(record.IntentRef) record.OperationRef = strings.TrimSpace(record.OperationRef) if record.PaymentIntentID == "" { return merrors.InvalidArgument("intention reference is required", "payment_intent_ref") } if record.IdempotencyKey == "" { return merrors.InvalidArgument("idempotency key is required", "idempotency_key") } if record.IntentRef == "" { return merrors.InvalidArgument("intention reference key is required", "intent_ref") } now := time.Now() if record.CreatedAt.IsZero() { record.CreatedAt = now } record.UpdatedAt = now record.ID = bson.NilObjectID filter := repository.Filter(fieldIdempotencyKey, record.IdempotencyKey) existing := &model.PaymentRecord{} err := p.repo.FindOneByFilter(ctx, filter, existing) switch { case err == nil: record.ID = existing.ID err = p.repo.Update(ctx, record) case errors.Is(err, merrors.ErrNoData): record.ID = bson.NilObjectID err = p.repo.Insert(ctx, record, filter) if errors.Is(err, merrors.ErrDataConflict) { if findErr := p.repo.FindOneByFilter(ctx, filter, existing); findErr != nil { err = findErr break } record.ID = existing.ID err = p.repo.Update(ctx, record) } } if err != nil { if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { p.logger.Warn("Failed to upsert payment record", zap.String("idempotency_key", record.IdempotencyKey), zap.String("payment_intent_id", record.PaymentIntentID), zap.String("quote_ref", record.QuoteRef), zap.Error(err)) } return err } return nil } var _ storage.PaymentsStore = (*Payments)(nil)