package store import ( "context" "errors" "github.com/tech/sendico/ledger/storage" "github.com/tech/sendico/ledger/storage/model" "github.com/tech/sendico/pkg/db/repository" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" ) type journalEntriesStore struct { logger mlogger.Logger repo repository.Repository } func NewJournalEntries(logger mlogger.Logger, db *mongo.Database) (storage.JournalEntriesStore, error) { repo := repository.CreateMongoRepository(db, model.JournalEntriesCollection) // Create unique index on organizationRef + idempotencyKey uniqueIndex := &ri.Definition{ Keys: []ri.Key{ {Field: "organizationRef", Sort: ri.Asc}, {Field: "idempotencyKey", Sort: ri.Asc}, }, Unique: true, } if err := repo.CreateIndex(uniqueIndex); err != nil { logger.Error("failed to ensure journal entries idempotency index", zap.Error(err)) return nil, err } // Create index on organizationRef for listing orgIndex := &ri.Definition{ Keys: []ri.Key{ {Field: "organizationRef", Sort: ri.Asc}, {Field: "createdAt", Sort: ri.Desc}, }, } if err := repo.CreateIndex(orgIndex); err != nil { logger.Error("failed to ensure journal entries organization index", zap.Error(err)) return nil, err } childLogger := logger.Named(model.JournalEntriesCollection) childLogger.Debug("journal entries store initialised", zap.String("collection", model.JournalEntriesCollection)) return &journalEntriesStore{ logger: childLogger, repo: repo, }, nil } func (j *journalEntriesStore) Create(ctx context.Context, entry *model.JournalEntry) error { if entry == nil { j.logger.Warn("attempt to create nil journal entry") return merrors.InvalidArgument("journalEntriesStore: nil journal entry") } if err := j.repo.Insert(ctx, entry, nil); err != nil { if mongo.IsDuplicateKeyError(err) { j.logger.Warn("duplicate idempotency key", zap.String("idempotencyKey", entry.IdempotencyKey)) return storage.ErrDuplicateIdempotency } j.logger.Warn("failed to create journal entry", zap.Error(err)) return err } j.logger.Debug("journal entry created", zap.String("idempotencyKey", entry.IdempotencyKey), zap.String("entryType", string(entry.EntryType))) return nil } func (j *journalEntriesStore) Get(ctx context.Context, entryRef primitive.ObjectID) (*model.JournalEntry, error) { if entryRef.IsZero() { j.logger.Warn("attempt to get journal entry with zero ID") return nil, merrors.InvalidArgument("journalEntriesStore: zero entry ID") } result := &model.JournalEntry{} if err := j.repo.Get(ctx, entryRef, result); err != nil { if errors.Is(err, merrors.ErrNoData) { j.logger.Debug("journal entry not found", zap.String("entryRef", entryRef.Hex())) return nil, storage.ErrJournalEntryNotFound } j.logger.Warn("failed to get journal entry", zap.Error(err), zap.String("entryRef", entryRef.Hex())) return nil, err } j.logger.Debug("journal entry loaded", zap.String("entryRef", entryRef.Hex()), zap.String("idempotencyKey", result.IdempotencyKey)) return result, nil } func (j *journalEntriesStore) GetByIdempotencyKey(ctx context.Context, orgRef primitive.ObjectID, idempotencyKey string) (*model.JournalEntry, error) { if orgRef.IsZero() { j.logger.Warn("attempt to get journal entry with zero organization ID") return nil, merrors.InvalidArgument("journalEntriesStore: zero organization ID") } if idempotencyKey == "" { j.logger.Warn("attempt to get journal entry with empty idempotency key") return nil, merrors.InvalidArgument("journalEntriesStore: empty idempotency key") } query := repository.Query(). Filter(repository.Field("organizationRef"), orgRef). Filter(repository.Field("idempotencyKey"), idempotencyKey) result := &model.JournalEntry{} if err := j.repo.FindOneByFilter(ctx, query, result); err != nil { if errors.Is(err, merrors.ErrNoData) { j.logger.Debug("journal entry not found by idempotency key", zap.String("idempotencyKey", idempotencyKey)) return nil, storage.ErrJournalEntryNotFound } j.logger.Warn("failed to get journal entry by idempotency key", zap.Error(err), zap.String("idempotencyKey", idempotencyKey)) return nil, err } j.logger.Debug("journal entry loaded by idempotency key", zap.String("idempotencyKey", idempotencyKey)) return result, nil } func (j *journalEntriesStore) ListByOrganization(ctx context.Context, orgRef primitive.ObjectID, limit int, offset int) ([]*model.JournalEntry, error) { if orgRef.IsZero() { j.logger.Warn("attempt to list journal entries with zero organization ID") return nil, merrors.InvalidArgument("journalEntriesStore: zero organization ID") } limit64 := int64(limit) offset64 := int64(offset) query := repository.Query(). Filter(repository.Field("organizationRef"), orgRef). Limit(&limit64). Offset(&offset64). Sort(repository.Field("createdAt"), false) // false = descending entries := make([]*model.JournalEntry, 0) err := j.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error { doc := &model.JournalEntry{} if err := cur.Decode(doc); err != nil { return err } entries = append(entries, doc) return nil }) if err != nil { j.logger.Warn("failed to list journal entries", zap.Error(err)) return nil, err } j.logger.Debug("listed journal entries", zap.Int("count", len(entries))) return entries, nil }