package store import ( "context" "errors" "time" "github.com/tech/sendico/fx/storage" "github.com/tech/sendico/fx/storage/model" "github.com/tech/sendico/pkg/db/repository" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" ) type ratesStore struct { logger mlogger.Logger repo repository.Repository } func NewRates(logger mlogger.Logger, db *mongo.Database) (storage.RatesStore, error) { repo := repository.CreateMongoRepository(db, model.RatesCollection) indexes := []*ri.Definition{ { Keys: []ri.Key{ {Field: "pair.base", Sort: ri.Asc}, {Field: "pair.quote", Sort: ri.Asc}, {Field: "provider", Sort: ri.Asc}, {Field: "asOfUnixMs", Sort: ri.Desc}, }, }, { Keys: []ri.Key{ {Field: "rateRef", Sort: ri.Asc}, }, Unique: true, }, } ttlSeconds := int32(24 * 60 * 60) indexes = append(indexes, &ri.Definition{ Keys: []ri.Key{ {Field: "asOf", Sort: ri.Asc}, }, TTL: &ttlSeconds, Name: "rates_as_of_ttl", }) for _, def := range indexes { if err := repo.CreateIndex(def); err != nil { logger.Error("Failed to ensure rates index", zap.Error(err)) return nil, err } } logger.Debug("Rates store initialised", zap.String("collection", model.RatesCollection)) return &ratesStore{ logger: logger.Named(model.RatesCollection), repo: repo, }, nil } func (r *ratesStore) UpsertSnapshot(ctx context.Context, snapshot *model.RateSnapshot) error { if snapshot == nil { r.logger.Warn("Attempt to upsert nil snapshot") return merrors.InvalidArgument("ratesStore: nil snapshot") } if snapshot.RateRef == "" { r.logger.Warn("Attempt to upsert snapshot with empty rate_ref") return merrors.InvalidArgument("ratesStore: empty rateRef") } if snapshot.AsOfUnixMs > 0 && snapshot.AsOf == nil { asOf := time.UnixMilli(snapshot.AsOfUnixMs).UTC() snapshot.AsOf = &asOf } existing := &model.RateSnapshot{} filter := repository.Filter("rateRef", snapshot.RateRef) err := r.repo.FindOneByFilter(ctx, filter, existing) if err != nil { if errors.Is(err, merrors.ErrNoData) { r.logger.Debug("Inserting new rate snapshot", zap.String("rate_ref", snapshot.RateRef)) return r.repo.Insert(ctx, snapshot, filter) } r.logger.Warn("Failed to query rate snapshot", zap.Error(err), zap.String("rate_ref", snapshot.RateRef)) return err } if existing.GetID() != nil { snapshot.SetID(*existing.GetID()) } r.logger.Debug("Updating rate snapshot", zap.String("rate_ref", snapshot.RateRef)) return r.repo.Update(ctx, snapshot) } func (r *ratesStore) LatestSnapshot(ctx context.Context, pair model.CurrencyPair, provider string) (*model.RateSnapshot, error) { query := repository.Query(). Filter(repository.Field("pair").Dot("base"), pair.Base). Filter(repository.Field("pair").Dot("quote"), pair.Quote) if provider != "" { query = query.Filter(repository.Field("provider"), provider) } limit := int64(1) query = query.Sort(repository.Field("asOfUnixMs"), false).Limit(&limit) var result *model.RateSnapshot err := r.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error { doc := &model.RateSnapshot{} if err := cur.Decode(doc); err != nil { return err } result = doc return nil }) if err != nil { return nil, err } if result == nil { return nil, merrors.ErrNoData } return result, nil }