package store import ( "context" "errors" "strings" "github.com/tech/sendico/payments/storage" "github.com/tech/sendico/payments/storage/model" "github.com/tech/sendico/pkg/db/repository" "github.com/tech/sendico/pkg/db/repository/builder" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.uber.org/zap" ) const ( defaultPaymentPageSize int64 = 50 maxPaymentPageSize int64 = 200 ) type Payments struct { logger mlogger.Logger repo repository.Repository } // NewPayments constructs a Mongo-backed payments store. func NewPayments(logger mlogger.Logger, repo repository.Repository) (*Payments, error) { if repo == nil { return nil, merrors.InvalidArgument("paymentsStore: repository is nil") } indexes := []*ri.Definition{ { Keys: []ri.Key{{Field: "paymentRef", Sort: ri.Asc}}, Unique: true, }, { Keys: []ri.Key{{Field: "idempotencyKey", Sort: ri.Asc}, {Field: "organizationRef", Sort: ri.Asc}}, Unique: true, }, { Keys: []ri.Key{{Field: "state", Sort: ri.Asc}}, }, { Keys: []ri.Key{{Field: "intent.source.managedWallet.managedWalletRef", Sort: ri.Asc}}, }, { Keys: []ri.Key{{Field: "intent.destination.managedWallet.managedWalletRef", Sort: ri.Asc}}, }, { Keys: []ri.Key{{Field: "execution.chainTransferRef", Sort: ri.Asc}}, }, { Keys: []ri.Key{{Field: "executionPlan.steps.transferRef", Sort: ri.Asc}}, }, } for _, def := range indexes { if err := repo.CreateIndex(def); err != nil { logger.Error("Failed to ensure payments index", zap.Error(err), zap.String("collection", repo.Collection())) return nil, err } } childLogger := logger.Named("payments") childLogger.Debug("Payments store initialised") return &Payments{ logger: childLogger, repo: repo, }, nil } func (p *Payments) Create(ctx context.Context, payment *model.Payment) error { if payment == nil { return merrors.InvalidArgument("paymentsStore: nil payment") } payment.Normalize() if payment.PaymentRef == "" { return merrors.InvalidArgument("paymentsStore: empty paymentRef") } if strings.TrimSpace(payment.IdempotencyKey) == "" { return merrors.InvalidArgument("paymentsStore: empty idempotencyKey") } if payment.OrganizationRef == bson.NilObjectID { return merrors.InvalidArgument("paymentsStore: organization_ref is required") } payment.Update() filter := repository.OrgFilter(payment.OrganizationRef).And( repository.Filter("idempotencyKey", payment.IdempotencyKey), ) if err := p.repo.Insert(ctx, payment, filter); err != nil { if errors.Is(err, merrors.ErrDataConflict) { return storage.ErrDuplicatePayment } return err } p.logger.Debug("Payment created", zap.String("payment_ref", payment.PaymentRef)) return nil } func (p *Payments) Update(ctx context.Context, payment *model.Payment) error { if payment == nil { return merrors.InvalidArgument("paymentsStore: nil payment") } if payment.ID.IsZero() { return merrors.InvalidArgument("paymentsStore: missing payment id") } payment.Normalize() payment.Update() if err := p.repo.Update(ctx, payment); err != nil { if errors.Is(err, merrors.ErrNoData) { return storage.ErrPaymentNotFound } return err } return nil } func (p *Payments) GetByPaymentRef(ctx context.Context, paymentRef string) (*model.Payment, error) { paymentRef = strings.TrimSpace(paymentRef) if paymentRef == "" { return nil, merrors.InvalidArgument("paymentsStore: empty paymentRef") } entity := &model.Payment{} if err := p.repo.FindOneByFilter(ctx, repository.Filter("paymentRef", paymentRef), entity); err != nil { if errors.Is(err, merrors.ErrNoData) { return nil, storage.ErrPaymentNotFound } return nil, err } return entity, nil } func (p *Payments) GetByIdempotencyKey(ctx context.Context, orgRef bson.ObjectID, idempotencyKey string) (*model.Payment, error) { idempotencyKey = strings.TrimSpace(idempotencyKey) if orgRef == bson.NilObjectID { return nil, merrors.InvalidArgument("paymentsStore: organization_ref is required") } if idempotencyKey == "" { return nil, merrors.InvalidArgument("paymentsStore: empty idempotencyKey") } entity := &model.Payment{} query := repository.OrgFilter(orgRef).And(repository.Filter("idempotencyKey", idempotencyKey)) if err := p.repo.FindOneByFilter(ctx, query, entity); err != nil { if errors.Is(err, merrors.ErrNoData) { return nil, storage.ErrPaymentNotFound } return nil, err } return entity, nil } func (p *Payments) GetByChainTransferRef(ctx context.Context, transferRef string) (*model.Payment, error) { transferRef = strings.TrimSpace(transferRef) if transferRef == "" { return nil, merrors.InvalidArgument("paymentsStore: empty chain transfer reference") } entity := &model.Payment{} query := repository.Query().Or( repository.Filter("execution.chainTransferRef", transferRef), repository.Filter("executionPlan.steps.transferRef", transferRef), ) if err := p.repo.FindOneByFilter(ctx, query, entity); err != nil { if errors.Is(err, merrors.ErrNoData) { return nil, storage.ErrPaymentNotFound } return nil, err } return entity, nil } func (p *Payments) List(ctx context.Context, filter *model.PaymentFilter) (*model.PaymentList, error) { if filter == nil { filter = &model.PaymentFilter{} } query := repository.Query() if len(filter.States) > 0 { states := make([]string, 0, len(filter.States)) for _, state := range filter.States { if trimmed := strings.TrimSpace(string(state)); trimmed != "" { states = append(states, trimmed) } } if len(states) > 0 { query = query.Comparison(repository.Field("state"), builder.In, states) } } if ref := strings.TrimSpace(filter.SourceRef); ref != "" { if endpointFilter := endpointQuery("intent.source", ref); endpointFilter != nil { query = query.And(endpointFilter) } } if orgRef, err := bson.ObjectIDFromHex(strings.TrimSpace(filter.OrganizationRef)); err != nil { p.logger.Warn("Failed to decode organization reference", zap.Error(err), zap.String("provided_org_ref", filter.OrganizationRef)) return nil, err } else { query.And(repository.OrgFilter(orgRef)) } if ref := strings.TrimSpace(filter.DestinationRef); ref != "" { if endpointFilter := endpointQuery("intent.destination", ref); endpointFilter != nil { query = query.And(endpointFilter) } } if cursor := strings.TrimSpace(filter.Cursor); cursor != "" { if oid, err := bson.ObjectIDFromHex(cursor); err == nil { query = query.Comparison(repository.IDField(), builder.Lt, oid) } else { p.logger.Warn("Ignoring invalid payments cursor", zap.String("cursor", cursor), zap.Error(err)) } } limit := sanitizePaymentLimit(filter.Limit) fetchLimit := limit + 1 query = query.Sort(repository.IDField(), false).Limit(&fetchLimit) payments := make([]*model.Payment, 0, fetchLimit) decoder := func(cur *mongo.Cursor) error { item := &model.Payment{} if err := cur.Decode(item); err != nil { p.logger.Warn("Failed to decode item", zap.Error(err)) return err } payments = append(payments, item) return nil } if err := p.repo.FindManyByFilter(ctx, query, decoder); err != nil && !errors.Is(err, merrors.ErrNoData) { return nil, err } nextCursor := "" if int64(len(payments)) == fetchLimit { last := payments[len(payments)-1] nextCursor = last.ID.Hex() payments = payments[:len(payments)-1] } return &model.PaymentList{ Items: payments, NextCursor: nextCursor, }, nil } func endpointQuery(prefix, ref string) builder.Query { trimmed := strings.TrimSpace(ref) if trimmed == "" { return nil } lower := strings.ToLower(trimmed) filters := []builder.Query{ repository.Filter(prefix+".ledger.ledgerAccountRef", trimmed), repository.Filter(prefix+".managedWallet.managedWalletRef", trimmed), repository.Filter(prefix+".externalChain.address", lower), } return repository.Query().Or(filters...) } func sanitizePaymentLimit(requested int32) int64 { if requested <= 0 { return defaultPaymentPageSize } if requested > int32(maxPaymentPageSize) { return maxPaymentPageSize } return int64(requested) }