package oracle import ( "context" "errors" "strings" "time" "github.com/tech/sendico/fx/storage" "github.com/tech/sendico/fx/storage/model" "github.com/tech/sendico/pkg/api/routers" "github.com/tech/sendico/pkg/api/routers/gsresponse" "github.com/tech/sendico/pkg/merrors" pmessaging "github.com/tech/sendico/pkg/messaging" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mservice" fxv1 "github.com/tech/sendico/pkg/proto/common/fx/v1" oraclev1 "github.com/tech/sendico/pkg/proto/oracle/v1" "go.uber.org/zap" "google.golang.org/grpc" ) type serviceError string func (e serviceError) Error() string { return string(e) } var ( errSideRequired = serviceError("oracle: side is required") errAmountsMutuallyExclusive = serviceError("oracle: exactly one amount must be provided") errAmountRequired = serviceError("oracle: amount is required") errQuoteRefRequired = serviceError("oracle: quote_ref is required") errEmptyRequest = serviceError("oracle: request payload is empty") errLedgerTxnRefRequired = serviceError("oracle: ledger_txn_ref is required") ) type Service struct { logger mlogger.Logger storage storage.Repository producer pmessaging.Producer oraclev1.UnimplementedOracleServer } func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer) *Service { initMetrics() return &Service{ logger: logger.Named("oracle"), storage: repo, producer: prod, } } func (s *Service) Register(router routers.GRPC) error { return router.Register(func(reg grpc.ServiceRegistrar) { oraclev1.RegisterOracleServer(reg, s) }) } func (s *Service) GetQuote(ctx context.Context, req *oraclev1.GetQuoteRequest) (*oraclev1.GetQuoteResponse, error) { start := time.Now() responder := s.getQuoteResponder(ctx, req) resp, err := responder(ctx) observeRPC(start, "GetQuote", err) return resp, err } func (s *Service) ValidateQuote(ctx context.Context, req *oraclev1.ValidateQuoteRequest) (*oraclev1.ValidateQuoteResponse, error) { start := time.Now() responder := s.validateQuoteResponder(ctx, req) resp, err := responder(ctx) observeRPC(start, "ValidateQuote", err) return resp, err } func (s *Service) ConsumeQuote(ctx context.Context, req *oraclev1.ConsumeQuoteRequest) (*oraclev1.ConsumeQuoteResponse, error) { start := time.Now() responder := s.consumeQuoteResponder(ctx, req) resp, err := responder(ctx) observeRPC(start, "ConsumeQuote", err) return resp, err } func (s *Service) LatestRate(ctx context.Context, req *oraclev1.LatestRateRequest) (*oraclev1.LatestRateResponse, error) { start := time.Now() responder := s.latestRateResponder(ctx, req) resp, err := responder(ctx) observeRPC(start, "LatestRate", err) return resp, err } func (s *Service) ListPairs(ctx context.Context, req *oraclev1.ListPairsRequest) (*oraclev1.ListPairsResponse, error) { start := time.Now() responder := s.listPairsResponder(ctx, req) resp, err := responder(ctx) observeRPC(start, "ListPairs", err) return resp, err } func (s *Service) getQuoteResponder(ctx context.Context, req *oraclev1.GetQuoteRequest) gsresponse.Responder[oraclev1.GetQuoteResponse] { if req == nil { req = &oraclev1.GetQuoteRequest{} } logger := s.logger.With(quoteRequestFields(req)...) logger.Debug("Handling GetQuote") if req.GetSide() == fxv1.Side_SIDE_UNSPECIFIED { logger.Warn("GetQuote invalid: side missing") return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, errSideRequired) } if req.GetBaseAmount() != nil && req.GetQuoteAmount() != nil { logger.Warn("GetQuote invalid: both base_amount and quote_amount provided") return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, errAmountsMutuallyExclusive) } if req.GetBaseAmount() == nil && req.GetQuoteAmount() == nil { logger.Warn("GetQuote invalid: amount missing") return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, errAmountRequired) } if err := s.pingStorage(ctx); err != nil { logger.Warn("Storage unavailable during GetQuote", zap.Error(err)) return gsresponse.Unavailable[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } pairMsg := req.GetPair() if pairMsg == nil || strings.TrimSpace(pairMsg.GetBase()) == "" || strings.TrimSpace(pairMsg.GetQuote()) == "" { logger.Warn("GetQuote invalid: pair missing") return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, errEmptyRequest) } pairKey := model.CurrencyPair{Base: strings.ToUpper(pairMsg.GetBase()), Quote: strings.ToUpper(pairMsg.GetQuote())} pair, err := s.storage.Pairs().Get(ctx, pairKey) if err != nil { switch { case errors.Is(err, merrors.ErrNoData): logger.Warn("pair not supported", zap.String("pair", pairKey.Base+"/"+pairKey.Quote)) return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, merrors.InvalidArgument("pair_not_supported")) default: logger.Warn("GetQuote failed to load pair", zap.Error(err)) return gsresponse.Internal[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } } provider := req.GetPreferredProvider() if provider == "" { provider = pair.DefaultProvider } if provider == "" && len(pair.Providers) > 0 { provider = pair.Providers[0] } rate, err := s.getLatestRate(ctx, pair, provider) if err != nil { switch { case errors.Is(err, merrors.ErrNoData): logger.Warn("rate not found", zap.String("pair", pairKey.Base+"/"+pairKey.Quote), zap.String("provider", provider)) return gsresponse.FailedPrecondition[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, "rate_not_found", err) default: logger.Warn("GetQuote failed to load rate", zap.Error(err), zap.String("pair", pairKey.Base+"/"+pairKey.Quote), zap.String("provider", provider)) return gsresponse.Internal[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } } now := time.Now() if maxAge := req.GetMaxAgeMs(); maxAge > 0 { age := now.UnixMilli() - rate.AsOfUnixMs if age > int64(maxAge) { logger.Warn("Rate snapshot stale", zap.Int64("age_ms", age), zap.Int32("max_age_ms", req.GetMaxAgeMs()), zap.String("pair", pairKey.Base+"/"+pairKey.Quote), zap.String("provider", provider)) return gsresponse.FailedPrecondition[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, "stale_rate", merrors.InvalidArgument("rate older than allowed window")) } } comp, err := newQuoteComputation(pair, rate, req.GetSide(), provider) if err != nil { logger.Warn("GetQuote invalid input", zap.Error(err)) return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } if req.GetBaseAmount() != nil { if err := comp.withBaseInput(req.GetBaseAmount()); err != nil { logger.Warn("GetQuote invalid base input", zap.Error(err)) return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } } else if req.GetQuoteAmount() != nil { if err := comp.withQuoteInput(req.GetQuoteAmount()); err != nil { logger.Warn("GetQuote invalid quote input", zap.Error(err)) return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } } if err := comp.compute(); err != nil { logger.Warn("GetQuote computation failed", zap.Error(err)) return gsresponse.Internal[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } expiresAt := int64(0) if req.GetFirm() { expiry, err := computeExpiry(now, req.GetTtlMs()) if err != nil { return gsresponse.InvalidArgument[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } expiresAt = expiry } quoteModel, err := comp.buildModelQuote(req.GetFirm(), expiresAt, req) if err != nil { return gsresponse.Internal[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } if req.GetFirm() { if err := s.storage.Quotes().Issue(ctx, quoteModel); err != nil { switch { case errors.Is(err, merrors.ErrDataConflict): logger.Warn("GetQuote conflict issuing firm quote", zap.Error(err), zap.String("quote_ref", quoteModel.QuoteRef)) return gsresponse.Conflict[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) default: logger.Warn("GetQuote failed to issue firm quote", zap.Error(err), zap.String("quote_ref", quoteModel.QuoteRef)) return gsresponse.Internal[oraclev1.GetQuoteResponse](s.logger, mservice.FXOracle, err) } } logger.Info("Firm quote stored", zap.String("quote_ref", quoteModel.QuoteRef), zap.String("pair", pairKey.Base+"/"+pairKey.Quote), zap.String("provider", quoteModel.Provider), zap.Int64("expires_at_ms", quoteModel.ExpiresAtUnixMs)) } resp := &oraclev1.GetQuoteResponse{ Meta: buildResponseMeta(req.GetMeta()), Quote: quoteModelToProto(quoteModel), } return gsresponse.Success(resp) } func (s *Service) validateQuoteResponder(ctx context.Context, req *oraclev1.ValidateQuoteRequest) gsresponse.Responder[oraclev1.ValidateQuoteResponse] { if req == nil { req = &oraclev1.ValidateQuoteRequest{} } logger := s.logger.With(requestMetaFields(req.GetMeta())...) if ref := strings.TrimSpace(req.GetQuoteRef()); ref != "" { logger = logger.With(zap.String("quote_ref", ref)) } logger.Debug("Handling ValidateQuote") if req.GetQuoteRef() == "" { logger.Warn("ValidateQuote invalid: quote_ref missing") return gsresponse.InvalidArgument[oraclev1.ValidateQuoteResponse](s.logger, mservice.FXOracle, errQuoteRefRequired) } if err := s.pingStorage(ctx); err != nil { logger.Warn("Storage unavailable during ValidateQuote", zap.Error(err)) return gsresponse.Unavailable[oraclev1.ValidateQuoteResponse](s.logger, mservice.FXOracle, err) } quote, err := s.storage.Quotes().GetByRef(ctx, req.GetQuoteRef()) if err != nil { switch { case errors.Is(err, merrors.ErrNoData): logger.Warn("ValidateQuote: quote not found", zap.String("quote_ref", req.GetQuoteRef())) resp := &oraclev1.ValidateQuoteResponse{ Meta: buildResponseMeta(req.GetMeta()), Quote: nil, Valid: false, Reason: "not_found", } return gsresponse.Success(resp) default: logger.Warn("ValidateQuote failed", zap.Error(err)) return gsresponse.Internal[oraclev1.ValidateQuoteResponse](s.logger, mservice.FXOracle, err) } } now := time.Now() valid := true reason := "" if quote.IsExpired(now) { valid = false reason = "expired" } else if quote.Status == model.QuoteStatusConsumed { valid = false reason = "consumed" } resp := &oraclev1.ValidateQuoteResponse{ Meta: buildResponseMeta(req.GetMeta()), Quote: quoteModelToProto(quote), Valid: valid, Reason: reason, } if !valid { logger.Info("ValidateQuote invalid", zap.String("reason", reason), zap.Bool("firm", quote.Firm)) } else { logger.Debug("ValidateQuote valid", zap.Bool("firm", quote.Firm)) } return gsresponse.Success(resp) } func (s *Service) consumeQuoteResponder(ctx context.Context, req *oraclev1.ConsumeQuoteRequest) gsresponse.Responder[oraclev1.ConsumeQuoteResponse] { if req == nil { req = &oraclev1.ConsumeQuoteRequest{} } logger := s.logger.With(requestMetaFields(req.GetMeta())...) if ref := strings.TrimSpace(req.GetQuoteRef()); ref != "" { logger = logger.With(zap.String("quote_ref", ref)) } if ledger := strings.TrimSpace(req.GetLedgerTxnRef()); ledger != "" { logger = logger.With(zap.String("ledger_txn_ref", ledger)) } logger.Debug("Handling ConsumeQuote") if req.GetQuoteRef() == "" { logger.Warn("ConsumeQuote invalid: quote_ref missing") return gsresponse.InvalidArgument[oraclev1.ConsumeQuoteResponse](s.logger, mservice.FXOracle, errQuoteRefRequired) } if req.GetLedgerTxnRef() == "" { logger.Warn("ConsumeQuote invalid: ledger_txn_ref missing") return gsresponse.InvalidArgument[oraclev1.ConsumeQuoteResponse](s.logger, mservice.FXOracle, errLedgerTxnRefRequired) } if err := s.pingStorage(ctx); err != nil { logger.Warn("Storage unavailable during ConsumeQuote", zap.Error(err)) return gsresponse.Unavailable[oraclev1.ConsumeQuoteResponse](s.logger, mservice.FXOracle, err) } _, err := s.storage.Quotes().Consume(ctx, req.GetQuoteRef(), req.GetLedgerTxnRef(), time.Now()) if err != nil { switch { case errors.Is(err, storage.ErrQuoteExpired): logger.Warn("ConsumeQuote failed: expired") return gsresponse.FailedPrecondition[oraclev1.ConsumeQuoteResponse](s.logger, mservice.FXOracle, "expired", err) case errors.Is(err, storage.ErrQuoteConsumed): logger.Warn("ConsumeQuote failed: already consumed") return gsresponse.FailedPrecondition[oraclev1.ConsumeQuoteResponse](s.logger, mservice.FXOracle, "consumed", err) case errors.Is(err, storage.ErrQuoteNotFirm): logger.Warn("ConsumeQuote failed: quote not firm") return gsresponse.FailedPrecondition[oraclev1.ConsumeQuoteResponse](s.logger, mservice.FXOracle, "not_firm", err) case errors.Is(err, merrors.ErrNoData): logger.Warn("ConsumeQuote failed: quote not found") return gsresponse.NotFound[oraclev1.ConsumeQuoteResponse](s.logger, mservice.FXOracle, err) default: logger.Warn("ConsumeQuote failed", zap.Error(err)) return gsresponse.Internal[oraclev1.ConsumeQuoteResponse](s.logger, mservice.FXOracle, err) } } resp := &oraclev1.ConsumeQuoteResponse{ Meta: buildResponseMeta(req.GetMeta()), Consumed: true, Reason: "consumed", } logger.Info("Quote consumed") return gsresponse.Success(resp) } func (s *Service) latestRateResponder(ctx context.Context, req *oraclev1.LatestRateRequest) gsresponse.Responder[oraclev1.LatestRateResponse] { if req == nil { req = &oraclev1.LatestRateRequest{} } logger := s.logger.With(requestMetaFields(req.GetMeta())...) if pair := req.GetPair(); pair != nil { logger = logger.With(zap.String("pair_base", strings.TrimSpace(pair.GetBase())), zap.String("pair_quote", strings.TrimSpace(pair.GetQuote()))) } if provider := strings.TrimSpace(req.GetProvider()); provider != "" { logger = logger.With(zap.String("provider", provider)) } logger.Debug("Handling LatestRate") if err := s.pingStorage(ctx); err != nil { logger.Warn("Storage unavailable during LatestRate", zap.Error(err)) return gsresponse.Unavailable[oraclev1.LatestRateResponse](s.logger, mservice.FXOracle, err) } pairMsg := req.GetPair() if pairMsg == nil || strings.TrimSpace(pairMsg.GetBase()) == "" || strings.TrimSpace(pairMsg.GetQuote()) == "" { logger.Warn("LatestRate invalid: pair missing") return gsresponse.InvalidArgument[oraclev1.LatestRateResponse](s.logger, mservice.FXOracle, errEmptyRequest) } pair := model.CurrencyPair{Base: strings.ToUpper(pairMsg.GetBase()), Quote: strings.ToUpper(pairMsg.GetQuote())} pairMeta, err := s.storage.Pairs().Get(ctx, pair) if err != nil { switch { case errors.Is(err, merrors.ErrNoData): logger.Warn("LatestRate pair not found") return gsresponse.NotFound[oraclev1.LatestRateResponse](s.logger, mservice.FXOracle, err) default: logger.Warn("LatestRate failed to load pair", zap.Error(err)) return gsresponse.Internal[oraclev1.LatestRateResponse](s.logger, mservice.FXOracle, err) } } provider := req.GetProvider() if provider == "" { provider = pairMeta.DefaultProvider } if provider == "" && len(pairMeta.Providers) > 0 { provider = pairMeta.Providers[0] } rate, err := s.getLatestRate(ctx, pairMeta, provider) if err != nil { switch { case errors.Is(err, merrors.ErrNoData): logger.Warn("LatestRate not found", zap.String("provider", provider)) return gsresponse.NotFound[oraclev1.LatestRateResponse](s.logger, mservice.FXOracle, err) default: logger.Warn("LatestRate failed", zap.Error(err)) return gsresponse.Internal[oraclev1.LatestRateResponse](s.logger, mservice.FXOracle, err) } } resp := &oraclev1.LatestRateResponse{ Meta: buildResponseMeta(req.GetMeta()), Rate: rateModelToProto(rate), } logger.Debug("LatestRate succeeded", zap.String("provider", provider), zap.Int64("asof_unix_ms", rate.AsOfUnixMs)) return gsresponse.Success(resp) } func (s *Service) listPairsResponder(ctx context.Context, req *oraclev1.ListPairsRequest) gsresponse.Responder[oraclev1.ListPairsResponse] { if req == nil { req = &oraclev1.ListPairsRequest{} } logger := s.logger.With(requestMetaFields(req.GetMeta())...) logger.Debug("Handling ListPairs") if err := s.pingStorage(ctx); err != nil { logger.Warn("Storage unavailable during ListPairs", zap.Error(err)) return gsresponse.Unavailable[oraclev1.ListPairsResponse](s.logger, mservice.FXOracle, err) } pairs, err := s.storage.Pairs().ListEnabled(ctx) if err != nil { logger.Warn("ListPairs failed", zap.Error(err)) return gsresponse.Internal[oraclev1.ListPairsResponse](s.logger, mservice.FXOracle, err) } result := make([]*oraclev1.PairMeta, 0, len(pairs)) for _, pair := range pairs { result = append(result, pairModelToProto(pair)) } resp := &oraclev1.ListPairsResponse{ Meta: buildResponseMeta(req.GetMeta()), Pairs: result, } logger.Debug("ListPairs returning metadata", zap.Int("pairs", len(resp.GetPairs()))) return gsresponse.Success(resp) } func (s *Service) pingStorage(ctx context.Context) error { if s.storage == nil { return nil } return s.storage.Ping(ctx) } func (s *Service) getLatestRate(ctx context.Context, pair *model.Pair, provider string) (*model.RateSnapshot, error) { rate, err := s.storage.Rates().LatestSnapshot(ctx, pair.Pair, provider) if err == nil { return rate, nil } if !errors.Is(err, merrors.ErrNoData) { return nil, err } crossRate, crossErr := s.computeCrossRate(ctx, pair, provider) if crossErr != nil { if errors.Is(crossErr, merrors.ErrNoData) { return nil, err } return nil, crossErr } s.logger.Debug("Derived cross rate", zap.String("pair", pair.Pair.Base+"/"+pair.Pair.Quote), zap.String("provider", provider)) return crossRate, nil } var _ oraclev1.OracleServer = (*Service)(nil)