fixed linting config
This commit is contained in:
195
api/fx/ingestor/.golangci.yml
Normal file
195
api/fx/ingestor/.golangci.yml
Normal file
@@ -0,0 +1,195 @@
|
||||
# See the dedicated "version" documentation section.
|
||||
version: "2"
|
||||
linters:
|
||||
# Default set of linters.
|
||||
# The value can be:
|
||||
# - `standard`: https://golangci-lint.run/docs/linters/#enabled-by-default
|
||||
# - `all`: enables all linters by default.
|
||||
# - `none`: disables all linters by default.
|
||||
# - `fast`: enables only linters considered as "fast" (`golangci-lint help linters --json | jq '[ .[] | select(.fast==true) ] | map(.name)'`).
|
||||
# Default: standard
|
||||
default: all
|
||||
# Enable specific linter.
|
||||
enable:
|
||||
- arangolint
|
||||
- asasalint
|
||||
- asciicheck
|
||||
- bidichk
|
||||
- bodyclose
|
||||
- canonicalheader
|
||||
- containedctx
|
||||
- contextcheck
|
||||
- copyloopvar
|
||||
- cyclop
|
||||
- decorder
|
||||
- dogsled
|
||||
- dupl
|
||||
- dupword
|
||||
- durationcheck
|
||||
- embeddedstructfieldcheck
|
||||
- err113
|
||||
- errcheck
|
||||
- errchkjson
|
||||
- errname
|
||||
- errorlint
|
||||
- exhaustive
|
||||
- exptostd
|
||||
- fatcontext
|
||||
- forbidigo
|
||||
- forcetypeassert
|
||||
- funcorder
|
||||
- funlen
|
||||
- ginkgolinter
|
||||
- gocheckcompilerdirectives
|
||||
- gochecknoglobals
|
||||
- gochecknoinits
|
||||
- gochecksumtype
|
||||
- gocognit
|
||||
- goconst
|
||||
- gocritic
|
||||
- gocyclo
|
||||
- godoclint
|
||||
- godot
|
||||
- godox
|
||||
- goheader
|
||||
- gomodguard
|
||||
- goprintffuncname
|
||||
- gosec
|
||||
- gosmopolitan
|
||||
- govet
|
||||
- grouper
|
||||
- iface
|
||||
- importas
|
||||
- inamedparam
|
||||
- ineffassign
|
||||
- interfacebloat
|
||||
- intrange
|
||||
- iotamixing
|
||||
- ireturn
|
||||
- lll
|
||||
- loggercheck
|
||||
- maintidx
|
||||
- makezero
|
||||
- mirror
|
||||
- misspell
|
||||
- mnd
|
||||
- modernize
|
||||
- musttag
|
||||
- nakedret
|
||||
- nestif
|
||||
- nilerr
|
||||
- nilnesserr
|
||||
- nilnil
|
||||
- nlreturn
|
||||
- noctx
|
||||
- noinlineerr
|
||||
- nolintlint
|
||||
- nonamedreturns
|
||||
- nosprintfhostport
|
||||
- paralleltest
|
||||
- perfsprint
|
||||
- prealloc
|
||||
- predeclared
|
||||
- promlinter
|
||||
- protogetter
|
||||
- reassign
|
||||
- recvcheck
|
||||
- revive
|
||||
- rowserrcheck
|
||||
- sloglint
|
||||
- spancheck
|
||||
- sqlclosecheck
|
||||
- staticcheck
|
||||
- tagalign
|
||||
- tagliatelle
|
||||
- testableexamples
|
||||
- testifylint
|
||||
- testpackage
|
||||
- thelper
|
||||
- tparallel
|
||||
- unconvert
|
||||
- unparam
|
||||
- unqueryvet
|
||||
- unused
|
||||
- usestdlibvars
|
||||
- usetesting
|
||||
- varnamelen
|
||||
- wastedassign
|
||||
- whitespace
|
||||
- wsl_v5
|
||||
- zerologlint
|
||||
# Disable specific linters.
|
||||
disable:
|
||||
- depguard
|
||||
- exhaustruct
|
||||
- gochecknoglobals
|
||||
- gomoddirectives
|
||||
- wrapcheck
|
||||
- wsl
|
||||
# All available settings of specific linters.
|
||||
# See the dedicated "linters.settings" documentation section.
|
||||
settings:
|
||||
wsl_v5:
|
||||
allow-first-in-block: true
|
||||
allow-whole-block: false
|
||||
branch-max-lines: 2
|
||||
|
||||
# Defines a set of rules to ignore issues.
|
||||
# It does not skip the analysis, and so does not ignore "typecheck" errors.
|
||||
exclusions:
|
||||
# Mode of the generated files analysis.
|
||||
#
|
||||
# - `strict`: sources are excluded by strictly following the Go generated file convention.
|
||||
# Source files that have lines matching only the following regular expression will be excluded: `^// Code generated .* DO NOT EDIT\.$`
|
||||
# This line must appear before the first non-comment, non-blank text in the file.
|
||||
# https://go.dev/s/generatedcode
|
||||
# - `lax`: sources are excluded if they contain lines like `autogenerated file`, `code generated`, `do not edit`, etc.
|
||||
# - `disable`: disable the generated files exclusion.
|
||||
#
|
||||
# Default: strict
|
||||
generated: lax
|
||||
# Log a warning if an exclusion rule is unused.
|
||||
# Default: false
|
||||
warn-unused: true
|
||||
# Predefined exclusion rules.
|
||||
# Default: []
|
||||
presets:
|
||||
- comments
|
||||
- std-error-handling
|
||||
- common-false-positives
|
||||
- legacy
|
||||
# Excluding configuration per-path, per-linter, per-text and per-source.
|
||||
rules:
|
||||
# Exclude some linters from running on tests files.
|
||||
- path: _test\.go
|
||||
linters:
|
||||
- gocyclo
|
||||
- errcheck
|
||||
- dupl
|
||||
- gosec
|
||||
# Run some linter only for test files by excluding its issues for everything else.
|
||||
- path-except: _test\.go
|
||||
linters:
|
||||
- forbidigo
|
||||
# Exclude known linters from partially hard-vendored code,
|
||||
# which is impossible to exclude via `nolint` comments.
|
||||
# `/` will be replaced by the current OS file path separator to properly work on Windows.
|
||||
- path: internal/hmac/
|
||||
text: "weak cryptographic primitive"
|
||||
linters:
|
||||
- gosec
|
||||
# Exclude some `staticcheck` messages.
|
||||
- linters:
|
||||
- staticcheck
|
||||
text: "SA9003:"
|
||||
# Exclude `lll` issues for long lines with `go:generate`.
|
||||
- linters:
|
||||
- lll
|
||||
source: "^//go:generate "
|
||||
# Which file paths to exclude: they will be analyzed, but issues from them won't be reported.
|
||||
# "/" will be replaced by the current OS file path separator to properly work on Windows.
|
||||
# Default: []
|
||||
paths: []
|
||||
# Which file paths to not exclude.
|
||||
# Default: []
|
||||
paths-except: []
|
||||
@@ -51,16 +51,17 @@ func (a *App) Run(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
a.logger.Debug("Metrics server initialised")
|
||||
defer metricsSrv.Close(context.Background())
|
||||
defer metricsSrv.Close(context.Background()) //nolint:contextcheck
|
||||
|
||||
conn, err := db.ConnectMongo(a.logger, a.cfg.Database)
|
||||
conn, err := db.ConnectMongo(a.logger, a.cfg.Database) //nolint:contextcheck
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Disconnect(context.Background())
|
||||
defer conn.Disconnect(context.Background()) //nolint:errcheck,contextcheck
|
||||
|
||||
a.logger.Debug("MongoDB connection established")
|
||||
|
||||
repo, err := mongostorage.New(a.logger, conn)
|
||||
repo, err := mongostorage.New(a.logger, conn) //nolint:contextcheck
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -72,6 +73,7 @@ func (a *App) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
var announcer *discovery.Announcer
|
||||
|
||||
if cfg := a.cfg.Messaging; cfg != nil && cfg.Driver != "" {
|
||||
broker, err := msg.CreateMessagingBroker(a.logger.Named("discovery_bus"), cfg)
|
||||
if err != nil {
|
||||
@@ -84,6 +86,7 @@ func (a *App) Run(ctx context.Context) error {
|
||||
Version: appversion.Create().Short(),
|
||||
}
|
||||
announcer = discovery.NewAnnouncer(a.logger, producer, "fx_ingestor", announce)
|
||||
|
||||
announcer.Start()
|
||||
defer announcer.Stop()
|
||||
}
|
||||
@@ -98,6 +101,8 @@ func (a *App) Run(ctx context.Context) error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
a.logger.Info("Ingestor service stopped")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -14,8 +14,9 @@ var (
|
||||
BuildDate string
|
||||
)
|
||||
|
||||
//nolint:ireturn
|
||||
func Create() version.Printer {
|
||||
vi := version.Info{
|
||||
info := version.Info{
|
||||
Program: "Sendico FX Ingestor Service",
|
||||
Revision: Revision,
|
||||
Branch: Branch,
|
||||
@@ -23,5 +24,6 @@ func Create() version.Printer {
|
||||
BuildDate: BuildDate,
|
||||
Version: Version,
|
||||
}
|
||||
return vf.Create(&vi)
|
||||
|
||||
return vf.Create(&info)
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ type Config struct {
|
||||
pairsBySource map[mmodel.Driver][]PairConfig
|
||||
}
|
||||
|
||||
//nolint:cyclop
|
||||
func Load(path string) (*Config, error) {
|
||||
if path == "" {
|
||||
return nil, merrors.InvalidArgument("config: path is empty")
|
||||
@@ -36,19 +37,23 @@ func Load(path string) (*Config, error) {
|
||||
}
|
||||
|
||||
cfg := &Config{}
|
||||
if err := yaml.Unmarshal(data, cfg); err != nil {
|
||||
|
||||
err = yaml.Unmarshal(data, cfg)
|
||||
if err != nil {
|
||||
return nil, merrors.InternalWrap(err, "config: failed to parse yaml")
|
||||
}
|
||||
|
||||
if len(cfg.Market.Sources) == 0 {
|
||||
return nil, merrors.InvalidArgument("config: no market sources configured")
|
||||
}
|
||||
|
||||
sourceSet := make(map[mmodel.Driver]struct{}, len(cfg.Market.Sources))
|
||||
for idx := range cfg.Market.Sources {
|
||||
src := &cfg.Market.Sources[idx]
|
||||
if src.Driver.IsEmpty() {
|
||||
return nil, merrors.InvalidArgument("config: market source driver is empty")
|
||||
}
|
||||
|
||||
sourceSet[src.Driver] = struct{}{}
|
||||
}
|
||||
|
||||
@@ -65,6 +70,7 @@ func Load(path string) (*Config, error) {
|
||||
if driver.IsEmpty() {
|
||||
return nil, merrors.InvalidArgument("config: pair source is empty")
|
||||
}
|
||||
|
||||
if _, ok := sourceSet[driver]; !ok {
|
||||
return nil, merrors.InvalidArgument("config: pair references unknown source: "+driver.String(), "pairs."+driver.String())
|
||||
}
|
||||
@@ -74,10 +80,12 @@ func Load(path string) (*Config, error) {
|
||||
pair := pairList[idx]
|
||||
pair.Base = strings.ToUpper(strings.TrimSpace(pair.Base))
|
||||
pair.Quote = strings.ToUpper(strings.TrimSpace(pair.Quote))
|
||||
|
||||
pair.Symbol = strings.TrimSpace(pair.Symbol)
|
||||
if pair.Base == "" || pair.Quote == "" || pair.Symbol == "" {
|
||||
return nil, merrors.InvalidArgument("config: pair entries must define base, quote, and symbol", "pairs."+driver.String())
|
||||
}
|
||||
|
||||
if strings.TrimSpace(pair.Provider) == "" {
|
||||
pair.Provider = strings.ToLower(driver.String())
|
||||
}
|
||||
@@ -87,6 +95,7 @@ func Load(path string) (*Config, error) {
|
||||
Source: driver,
|
||||
})
|
||||
}
|
||||
|
||||
pairsBySource[driver] = processed
|
||||
normalizedPairs[driver.String()] = processed
|
||||
}
|
||||
@@ -94,6 +103,7 @@ func Load(path string) (*Config, error) {
|
||||
cfg.Market.Pairs = normalizedPairs
|
||||
cfg.pairsBySource = pairsBySource
|
||||
cfg.pairs = flattened
|
||||
|
||||
if cfg.Database == nil {
|
||||
return nil, merrors.InvalidArgument("config: database configuration is required")
|
||||
}
|
||||
@@ -101,7 +111,7 @@ func Load(path string) (*Config, error) {
|
||||
if cfg.Metrics != nil && cfg.Metrics.Enabled {
|
||||
cfg.Metrics.Address = strings.TrimSpace(cfg.Metrics.Address)
|
||||
if cfg.Metrics.Address == "" {
|
||||
cfg.Metrics.Address = ":9102"
|
||||
cfg.Metrics.Address = ":9102" //nolint:mnd
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,9 +122,11 @@ func (c *Config) PollInterval() time.Duration {
|
||||
if c == nil {
|
||||
return defaultPollInterval
|
||||
}
|
||||
|
||||
if c.PollIntervalSeconds <= 0 {
|
||||
return defaultPollInterval
|
||||
}
|
||||
|
||||
return time.Duration(c.PollIntervalSeconds) * time.Second
|
||||
}
|
||||
|
||||
@@ -122,8 +134,10 @@ func (c *Config) Pairs() []Pair {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
out := make([]Pair, len(c.pairs))
|
||||
copy(out, c.pairs)
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -131,12 +145,14 @@ func (c *Config) PairsBySource() map[mmodel.Driver][]PairConfig {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
out := make(map[mmodel.Driver][]PairConfig, len(c.pairsBySource))
|
||||
for driver, pairs := range c.pairsBySource {
|
||||
cp := make([]PairConfig, len(pairs))
|
||||
copy(cp, pairs)
|
||||
out[driver] = cp
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -144,6 +160,8 @@ func (c *Config) MetricsConfig() *MetricsConfig {
|
||||
if c == nil || c.Metrics == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
cp := *c.Metrics
|
||||
|
||||
return &cp
|
||||
}
|
||||
|
||||
@@ -15,7 +15,8 @@ type PairConfig struct {
|
||||
|
||||
type Pair struct {
|
||||
PairConfig `yaml:",inline"`
|
||||
Source mmodel.Driver `yaml:"-"`
|
||||
|
||||
Source mmodel.Driver `yaml:"-"`
|
||||
}
|
||||
|
||||
type MarketConfig struct {
|
||||
|
||||
@@ -28,9 +28,11 @@ func New(logger mlogger.Logger, cfg *config.Config, repo storage.Repository) (*S
|
||||
if logger == nil {
|
||||
return nil, merrors.InvalidArgument("ingestor: nil logger")
|
||||
}
|
||||
|
||||
if cfg == nil {
|
||||
return nil, merrors.InvalidArgument("ingestor: nil config")
|
||||
}
|
||||
|
||||
if repo == nil {
|
||||
return nil, merrors.InvalidArgument("ingestor: nil repository")
|
||||
}
|
||||
@@ -52,6 +54,7 @@ func New(logger mlogger.Logger, cfg *config.Config, repo storage.Repository) (*S
|
||||
|
||||
func (s *Service) Run(ctx context.Context) error {
|
||||
interval := s.cfg.PollInterval()
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -65,6 +68,7 @@ func (s *Service) Run(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.logger.Info("Context cancelled, stopping ingestor")
|
||||
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
if err := s.executePoll(ctx); err != nil {
|
||||
@@ -77,27 +81,34 @@ func (s *Service) Run(ctx context.Context) error {
|
||||
func (s *Service) executePoll(ctx context.Context) error {
|
||||
start := time.Now()
|
||||
err := s.pollOnce(ctx)
|
||||
|
||||
if s.metrics != nil {
|
||||
s.metrics.observePoll(time.Since(start), err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) pollOnce(ctx context.Context) error {
|
||||
var firstErr error
|
||||
failures := 0
|
||||
|
||||
for _, pair := range s.pairs {
|
||||
start := time.Now()
|
||||
err := s.upsertPair(ctx, pair)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if s.metrics != nil {
|
||||
s.metrics.observePair(pair, elapsed, err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
|
||||
failures++
|
||||
|
||||
s.logger.Warn("Failed to ingest pair",
|
||||
zap.String("symbol", pair.Symbol),
|
||||
zap.String("source", pair.Source.String()),
|
||||
@@ -110,14 +121,17 @@ func (s *Service) pollOnce(ctx context.Context) error {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if failures > 0 {
|
||||
s.logger.Warn("Ingestion poll completed with failures", zap.Int("failures", failures), zap.Int("total", len(s.pairs)))
|
||||
} else {
|
||||
s.logger.Debug("Ingestion poll completed", zap.Int("total", len(s.pairs)))
|
||||
}
|
||||
|
||||
return firstErr
|
||||
}
|
||||
|
||||
//nolint:funlen
|
||||
func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error {
|
||||
connector, ok := s.connectors[pair.Source]
|
||||
if !ok {
|
||||
@@ -133,6 +147,7 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error {
|
||||
if err != nil {
|
||||
return merrors.InvalidArgumentWrap(err, "parse bid price", "bid")
|
||||
}
|
||||
|
||||
ask, err := parseDecimal(ticker.AskPrice)
|
||||
if err != nil {
|
||||
return merrors.InvalidArgumentWrap(err, "parse ask price", "ask")
|
||||
@@ -148,16 +163,18 @@ func (s *Service) upsertPair(ctx context.Context, pair config.Pair) error {
|
||||
}
|
||||
|
||||
mid := new(big.Rat).Add(bid, ask)
|
||||
mid.Quo(mid, big.NewRat(2, 1))
|
||||
mid.Quo(mid, big.NewRat(2, 1)) //nolint:mnd
|
||||
|
||||
spread := big.NewRat(0, 1)
|
||||
if mid.Sign() != 0 {
|
||||
spread.Sub(ask, bid)
|
||||
|
||||
if spread.Sign() < 0 {
|
||||
spread.Neg(spread)
|
||||
}
|
||||
|
||||
spread.Quo(spread, mid)
|
||||
spread.Mul(spread, big.NewRat(10000, 1)) // basis points
|
||||
spread.Mul(spread, big.NewRat(10000, 1)) //nolint:mnd // basis points
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
@@ -201,6 +218,7 @@ func parseDecimal(value string) (*big.Rat, error) {
|
||||
if _, ok := r.SetString(value); !ok {
|
||||
return nil, merrors.InvalidArgument("invalid decimal \""+value+"\"", "value")
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
@@ -208,9 +226,11 @@ func invertPrices(bid, ask *big.Rat) (*big.Rat, *big.Rat) {
|
||||
if bid.Sign() == 0 || ask.Sign() == 0 {
|
||||
return bid, ask
|
||||
}
|
||||
|
||||
one := big.NewRat(1, 1)
|
||||
invBid := new(big.Rat).Quo(one, ask) // invert ask to get bid
|
||||
invAsk := new(big.Rat).Quo(one, bid) // invert bid to get ask
|
||||
|
||||
return invBid, invAsk
|
||||
}
|
||||
|
||||
@@ -218,6 +238,7 @@ func formatDecimal(r *big.Rat) string {
|
||||
if r == nil {
|
||||
return "0"
|
||||
}
|
||||
|
||||
// Format with 8 decimal places, trimming trailing zeros.
|
||||
return r.FloatString(8)
|
||||
}
|
||||
|
||||
@@ -27,30 +27,33 @@ type binanceConnector struct {
|
||||
}
|
||||
|
||||
const defaultBinanceBaseURL = "https://api.binance.com"
|
||||
|
||||
const (
|
||||
defaultDialTimeoutSeconds = 5 * time.Second
|
||||
defaultDialKeepAliveSeconds = 30 * time.Second
|
||||
defaultTLSHandshakeTimeoutSeconds = 5 * time.Second
|
||||
defaultResponseHeaderTimeoutSeconds = 10 * time.Second
|
||||
defaultRequestTimeoutSeconds = 10 * time.Second
|
||||
defaultDialTimeout = 5 * time.Second
|
||||
defaultDialKeepAlive = 30 * time.Second
|
||||
defaultTLSHandshakeTimeout = 5 * time.Second
|
||||
defaultResponseHeaderTimeout = 10 * time.Second
|
||||
defaultRequestTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) {
|
||||
func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { //nolint:ireturn
|
||||
baseURL := defaultBinanceBaseURL
|
||||
provider := strings.ToLower(mmodel.DriverBinance.String())
|
||||
dialTimeout := defaultDialTimeoutSeconds
|
||||
dialKeepAlive := defaultDialKeepAliveSeconds
|
||||
tlsHandshakeTimeout := defaultTLSHandshakeTimeoutSeconds
|
||||
responseHeaderTimeout := defaultResponseHeaderTimeoutSeconds
|
||||
requestTimeout := defaultRequestTimeoutSeconds
|
||||
dialTimeout := defaultDialTimeout
|
||||
dialKeepAlive := defaultDialKeepAlive
|
||||
tlsHandshakeTimeout := defaultTLSHandshakeTimeout
|
||||
responseHeaderTimeout := defaultResponseHeaderTimeout
|
||||
requestTimeout := defaultRequestTimeout
|
||||
|
||||
if settings != nil {
|
||||
if value, ok := settings["base_url"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
baseURL = strings.TrimSpace(value)
|
||||
}
|
||||
|
||||
if value, ok := settings["provider"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
provider = strings.TrimSpace(value)
|
||||
}
|
||||
|
||||
dialTimeout = common.DurationSetting(settings, "dial_timeout_seconds", dialTimeout)
|
||||
dialKeepAlive = common.DurationSetting(settings, "dial_keep_alive_seconds", dialKeepAlive)
|
||||
tlsHandshakeTimeout = common.DurationSetting(settings, "tls_handshake_timeout_seconds", tlsHandshakeTimeout)
|
||||
@@ -96,6 +99,7 @@ func (c *binanceConnector) FetchTicker(ctx context.Context, symbol string) (*mmo
|
||||
if err != nil {
|
||||
return nil, merrors.InternalWrap(err, "binance: parse base url")
|
||||
}
|
||||
|
||||
endpoint.Path = "/api/v3/ticker/bookTicker"
|
||||
query := endpoint.Query()
|
||||
query.Set("symbol", strings.ToUpper(strings.TrimSpace(symbol)))
|
||||
@@ -109,12 +113,14 @@ func (c *binanceConnector) FetchTicker(ctx context.Context, symbol string) (*mmo
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
c.logger.Warn("Binance request failed", zap.String("symbol", symbol), zap.Error(err))
|
||||
|
||||
return nil, merrors.InternalWrap(err, "binance: request failed")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
c.logger.Warn("Binance returned non-OK status", zap.String("symbol", symbol), zap.Int("status", resp.StatusCode))
|
||||
|
||||
return nil, merrors.Internal("binance: unexpected status " + strconv.Itoa(resp.StatusCode))
|
||||
}
|
||||
|
||||
@@ -124,9 +130,11 @@ func (c *binanceConnector) FetchTicker(ctx context.Context, symbol string) (*mmo
|
||||
AskPrice string `json:"askPrice"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(&payload); err != nil {
|
||||
c.logger.Warn("Binance decode failed", zap.String("symbol", symbol), zap.Error(err))
|
||||
return nil, merrors.InternalWrap(err, "binance: decode response")
|
||||
decodeErr := json.NewDecoder(resp.Body).Decode(&payload)
|
||||
if decodeErr != nil {
|
||||
c.logger.Warn("Binance decode failed", zap.String("symbol", symbol), zap.Error(decodeErr))
|
||||
|
||||
return nil, merrors.InternalWrap(decodeErr, "binance: decode response")
|
||||
}
|
||||
|
||||
return &mmodel.Ticker{
|
||||
|
||||
@@ -49,7 +49,7 @@ const (
|
||||
defaultRequestTimeoutSeconds = 10 * time.Second
|
||||
)
|
||||
|
||||
func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) {
|
||||
func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { //nolint:cyclop,ireturn
|
||||
baseURL := defaultCBRBaseURL
|
||||
provider := strings.ToLower(mmodel.DriverCBR.String())
|
||||
dialTimeout := defaultDialTimeoutSeconds
|
||||
@@ -284,7 +284,7 @@ func (c *cbrConnector) fetchDailyRate(ctx context.Context, valute valuteInfo) (s
|
||||
return computePrice(entry.Value, entry.Nominal)
|
||||
}
|
||||
|
||||
func (c *cbrConnector) fetchHistoricalRate(ctx context.Context, valute valuteInfo, date time.Time) (string, error) {
|
||||
func (c *cbrConnector) fetchHistoricalRate(ctx context.Context, valute valuteInfo, date time.Time) (string, error) { //nolint:funlen
|
||||
query := map[string]string{
|
||||
"date_req1": date.Format("02/01/2006"),
|
||||
"date_req2": date.Format("02/01/2006"),
|
||||
@@ -366,6 +366,7 @@ func (c *cbrConnector) buildURL(path string, query map[string]string) (string, e
|
||||
return "", merrors.InternalWrap(err, "cbr: parse base url")
|
||||
}
|
||||
base.Path = strings.TrimRight(base.Path, "/") + path
|
||||
|
||||
q := base.Query()
|
||||
for key, value := range query {
|
||||
q.Set(key, value)
|
||||
@@ -401,7 +402,7 @@ type valuteMapping struct {
|
||||
byID map[string]valuteInfo
|
||||
}
|
||||
|
||||
func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping, error) {
|
||||
func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping, error) { //nolint:gocognit,nestif
|
||||
byISO := make(map[string]valuteInfo, len(items))
|
||||
byID := make(map[string]valuteInfo, len(items))
|
||||
byNum := make(map[string]string, len(items))
|
||||
@@ -453,11 +454,12 @@ func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping,
|
||||
// 2) Otherwise prefer smaller nominal
|
||||
keepExisting := true
|
||||
|
||||
if existing.Nominal != 1 && info.Nominal == 1 {
|
||||
switch {
|
||||
case existing.Nominal != 1 && info.Nominal == 1:
|
||||
keepExisting = false
|
||||
} else if existing.Nominal == 1 && info.Nominal != 1 {
|
||||
case existing.Nominal == 1 && info.Nominal != 1:
|
||||
keepExisting = true
|
||||
} else if info.Nominal < existing.Nominal {
|
||||
case info.Nominal < existing.Nominal:
|
||||
keepExisting = false
|
||||
}
|
||||
|
||||
@@ -513,7 +515,9 @@ func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping,
|
||||
byNum[isoNum] = id
|
||||
}
|
||||
|
||||
logger.Info("Installing currency code", zap.String("iso_code", isoChar), zap.String("id", id), zap.Int64("nominal", nominal))
|
||||
logger.Info("Installing currency code",
|
||||
zap.String("iso_code", isoChar), zap.String("id", id), zap.Int64("nominal", nominal),
|
||||
)
|
||||
|
||||
byISO[isoChar] = info
|
||||
byID[id] = info
|
||||
@@ -546,6 +550,7 @@ func (d *dailyRates) find(id string) *dailyValute {
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for idx := range d.Valutes {
|
||||
if strings.EqualFold(strings.TrimSpace(d.Valutes[idx].ID), id) {
|
||||
return &d.Valutes[idx]
|
||||
@@ -569,7 +574,9 @@ func (d *dynamicRates) find(id string, date time.Time) *dynamicRecord {
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
target := date.Format("02.01.2006")
|
||||
|
||||
for idx := range d.Records {
|
||||
rec := &d.Records[idx]
|
||||
if !strings.EqualFold(strings.TrimSpace(rec.ID), id) {
|
||||
@@ -663,7 +670,7 @@ func computePrice(value string, nominalStr string) (string, error) {
|
||||
|
||||
den := big.NewRat(nominal, 1)
|
||||
price := new(big.Rat).Quo(r, den)
|
||||
return price.FloatString(8), nil
|
||||
return price.FloatString(8), nil //nolint:mnd
|
||||
}
|
||||
|
||||
func formatSymbol(iso string, asOf *time.Time) string {
|
||||
|
||||
@@ -29,29 +29,36 @@ type coingeckoConnector struct {
|
||||
const defaultCoinGeckoBaseURL = "https://api.coingecko.com/api/v3"
|
||||
|
||||
const (
|
||||
defaultDialTimeoutSeconds = 5 * time.Second
|
||||
defaultDialKeepAliveSeconds = 30 * time.Second
|
||||
defaultTLSHandshakeTimeoutSeconds = 5 * time.Second
|
||||
defaultResponseHeaderTimeoutSeconds = 10 * time.Second
|
||||
defaultRequestTimeoutSeconds = 10 * time.Second
|
||||
defaultDialTimeout = 5 * time.Second
|
||||
defaultDialKeepAlive = 30 * time.Second
|
||||
defaultTLSHandshakeTimeout = 5 * time.Second
|
||||
defaultResponseHeaderTimeout = 10 * time.Second
|
||||
defaultRequestTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) {
|
||||
const (
|
||||
expectedSymbolParts = 2
|
||||
tsToMillis = 1000
|
||||
)
|
||||
|
||||
func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { //nolint:ireturn
|
||||
baseURL := defaultCoinGeckoBaseURL
|
||||
provider := strings.ToLower(mmodel.DriverCoinGecko.String())
|
||||
dialTimeout := defaultDialTimeoutSeconds
|
||||
dialKeepAlive := defaultDialKeepAliveSeconds
|
||||
tlsHandshakeTimeout := defaultTLSHandshakeTimeoutSeconds
|
||||
responseHeaderTimeout := defaultResponseHeaderTimeoutSeconds
|
||||
requestTimeout := defaultRequestTimeoutSeconds
|
||||
dialTimeout := defaultDialTimeout
|
||||
dialKeepAlive := defaultDialKeepAlive
|
||||
tlsHandshakeTimeout := defaultTLSHandshakeTimeout
|
||||
responseHeaderTimeout := defaultResponseHeaderTimeout
|
||||
requestTimeout := defaultRequestTimeout
|
||||
|
||||
if settings != nil {
|
||||
if value, ok := settings["base_url"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
baseURL = strings.TrimSpace(value)
|
||||
}
|
||||
|
||||
if value, ok := settings["provider"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
provider = strings.TrimSpace(value)
|
||||
}
|
||||
|
||||
dialTimeout = common.DurationSetting(settings, "dial_timeout_seconds", dialTimeout)
|
||||
dialKeepAlive = common.DurationSetting(settings, "dial_keep_alive_seconds", dialKeepAlive)
|
||||
tlsHandshakeTimeout = common.DurationSetting(settings, "tls_handshake_timeout_seconds", tlsHandshakeTimeout)
|
||||
@@ -88,6 +95,7 @@ func (c *coingeckoConnector) ID() mmodel.Driver {
|
||||
return c.id
|
||||
}
|
||||
|
||||
//nolint:cyclop,funlen
|
||||
func (c *coingeckoConnector) FetchTicker(ctx context.Context, symbol string) (*mmodel.Ticker, error) {
|
||||
coinID, vsCurrency, err := parseSymbol(symbol)
|
||||
if err != nil {
|
||||
@@ -98,6 +106,7 @@ func (c *coingeckoConnector) FetchTicker(ctx context.Context, symbol string) (*m
|
||||
if err != nil {
|
||||
return nil, merrors.InternalWrap(err, "coingecko: parse base url")
|
||||
}
|
||||
|
||||
endpoint.Path = strings.TrimRight(endpoint.Path, "/") + "/simple/price"
|
||||
query := endpoint.Query()
|
||||
query.Set("ids", coinID)
|
||||
@@ -113,44 +122,51 @@ func (c *coingeckoConnector) FetchTicker(ctx context.Context, symbol string) (*m
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
c.logger.Warn("CoinGecko request failed", zap.String("symbol", symbol), zap.Error(err))
|
||||
|
||||
return nil, merrors.InternalWrap(err, "coingecko: request failed")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
c.logger.Warn("CoinGecko returned non-OK status", zap.String("symbol", symbol), zap.Int("status", resp.StatusCode))
|
||||
|
||||
return nil, merrors.Internal("coingecko: unexpected status " + strconv.Itoa(resp.StatusCode))
|
||||
}
|
||||
|
||||
decoder := json.NewDecoder(resp.Body)
|
||||
decoder.UseNumber()
|
||||
|
||||
var payload map[string]map[string]interface{}
|
||||
if err := decoder.Decode(&payload); err != nil {
|
||||
c.logger.Warn("CoinGecko decode failed", zap.String("symbol", symbol), zap.Error(err))
|
||||
return nil, merrors.InternalWrap(err, "coingecko: decode response")
|
||||
var payload map[string]map[string]any
|
||||
|
||||
decodeErr := decoder.Decode(&payload)
|
||||
if decodeErr != nil {
|
||||
c.logger.Warn("CoinGecko decode failed", zap.String("symbol", symbol), zap.Error(decodeErr))
|
||||
|
||||
return nil, merrors.InternalWrap(decodeErr, "coingecko: decode response")
|
||||
}
|
||||
|
||||
coinData, ok := payload[coinID]
|
||||
if !ok {
|
||||
coinData, coinFound := payload[coinID]
|
||||
if !coinFound {
|
||||
return nil, merrors.Internal("coingecko: coin id not found in response")
|
||||
}
|
||||
priceValue, ok := coinData[vsCurrency]
|
||||
if !ok {
|
||||
|
||||
priceValue, priceFound := coinData[vsCurrency]
|
||||
if !priceFound {
|
||||
return nil, merrors.Internal("coingecko: vs currency not found in response")
|
||||
}
|
||||
|
||||
price, ok := toFloat(priceValue)
|
||||
if !ok || price <= 0 {
|
||||
price, priceOk := toFloat(priceValue)
|
||||
if !priceOk || price <= 0 {
|
||||
return nil, merrors.Internal("coingecko: invalid price value in response")
|
||||
}
|
||||
|
||||
priceStr := strconv.FormatFloat(price, 'f', -1, 64)
|
||||
|
||||
timestamp := time.Now().UnixMilli()
|
||||
if tsValue, ok := coinData["last_updated_at"]; ok {
|
||||
if tsFloat, ok := toFloat(tsValue); ok && tsFloat > 0 {
|
||||
tsMillis := int64(tsFloat * 1000)
|
||||
|
||||
if tsValue, tsFound := coinData["last_updated_at"]; tsFound {
|
||||
if tsFloat, tsOk := toFloat(tsValue); tsOk && tsFloat > 0 {
|
||||
tsMillis := int64(tsFloat * tsToMillis)
|
||||
if tsMillis > 0 {
|
||||
timestamp = tsMillis
|
||||
}
|
||||
@@ -179,14 +195,16 @@ func parseSymbol(symbol string) (string, string, error) {
|
||||
case ':', '/', '-', '_':
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
|
||||
if len(parts) != 2 {
|
||||
if len(parts) != expectedSymbolParts {
|
||||
return "", "", merrors.InvalidArgument("coingecko: symbol must be <coin_id>/<vs_currency>", "symbol")
|
||||
}
|
||||
|
||||
coinID := strings.TrimSpace(parts[0])
|
||||
|
||||
vsCurrency := strings.TrimSpace(parts[1])
|
||||
if coinID == "" || vsCurrency == "" {
|
||||
return "", "", merrors.InvalidArgument("coingecko: symbol contains empty segments", "symbol")
|
||||
@@ -195,28 +213,31 @@ func parseSymbol(symbol string) (string, string, error) {
|
||||
return coinID, vsCurrency, nil
|
||||
}
|
||||
|
||||
func toFloat(value interface{}) (float64, bool) {
|
||||
switch v := value.(type) {
|
||||
func toFloat(value any) (float64, bool) {
|
||||
switch val := value.(type) {
|
||||
case json.Number:
|
||||
f, err := v.Float64()
|
||||
f, err := val.Float64()
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
return f, true
|
||||
case float64:
|
||||
return v, true
|
||||
return val, true
|
||||
case float32:
|
||||
return float64(v), true
|
||||
return float64(val), true
|
||||
case int:
|
||||
return float64(v), true
|
||||
return float64(val), true
|
||||
case int64:
|
||||
return float64(v), true
|
||||
return float64(val), true
|
||||
case uint64:
|
||||
return float64(v), true
|
||||
return float64(val), true
|
||||
case string:
|
||||
if parsed, err := strconv.ParseFloat(v, 64); err == nil {
|
||||
parsed, parseErr := strconv.ParseFloat(val, 64)
|
||||
if parseErr == nil {
|
||||
return parsed, true
|
||||
}
|
||||
}
|
||||
|
||||
return 0, false
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package common
|
||||
package common //nolint:revive // package provides shared market connector utilities
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
@@ -8,39 +8,46 @@ import (
|
||||
)
|
||||
|
||||
// DurationSetting reads a positive duration override from settings or returns def when the value is missing or invalid.
|
||||
//
|
||||
//nolint:cyclop
|
||||
func DurationSetting(settings model.SettingsT, key string, def time.Duration) time.Duration {
|
||||
if settings == nil {
|
||||
return def
|
||||
}
|
||||
|
||||
value, ok := settings[key]
|
||||
if !ok {
|
||||
return def
|
||||
}
|
||||
|
||||
switch v := value.(type) {
|
||||
switch val := value.(type) {
|
||||
case time.Duration:
|
||||
if v > 0 {
|
||||
return v
|
||||
if val > 0 {
|
||||
return val
|
||||
}
|
||||
case int:
|
||||
if v > 0 {
|
||||
return time.Duration(v) * time.Second
|
||||
if val > 0 {
|
||||
return time.Duration(val) * time.Second
|
||||
}
|
||||
case int64:
|
||||
if v > 0 {
|
||||
return time.Duration(v) * time.Second
|
||||
if val > 0 {
|
||||
return time.Duration(val) * time.Second
|
||||
}
|
||||
case float64:
|
||||
if v > 0 {
|
||||
return time.Duration(v * float64(time.Second))
|
||||
if val > 0 {
|
||||
return time.Duration(val * float64(time.Second))
|
||||
}
|
||||
case string:
|
||||
if parsed, err := time.ParseDuration(v); err == nil && parsed > 0 {
|
||||
parsed, parseErr := time.ParseDuration(val)
|
||||
if parseErr == nil && parsed > 0 {
|
||||
return parsed
|
||||
}
|
||||
if seconds, err := strconv.ParseFloat(v, 64); err == nil && seconds > 0 {
|
||||
|
||||
seconds, floatErr := strconv.ParseFloat(val, 64)
|
||||
if floatErr == nil && seconds > 0 {
|
||||
return time.Duration(seconds * float64(time.Second))
|
||||
}
|
||||
}
|
||||
|
||||
return def
|
||||
}
|
||||
|
||||
@@ -24,16 +24,19 @@ const (
|
||||
)
|
||||
|
||||
type Server interface {
|
||||
SetStatus(health.ServiceStatus)
|
||||
Close(context.Context)
|
||||
SetStatus(status health.ServiceStatus)
|
||||
Close(ctx context.Context)
|
||||
}
|
||||
|
||||
//nolint:ireturn
|
||||
func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error) {
|
||||
if logger == nil {
|
||||
return nil, merrors.InvalidArgument("metrics: logger is nil")
|
||||
}
|
||||
|
||||
if cfg == nil || !cfg.Enabled {
|
||||
logger.Debug("Metrics disabled; using noop server")
|
||||
|
||||
return noopServer{}, nil
|
||||
}
|
||||
|
||||
@@ -47,7 +50,9 @@ func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error)
|
||||
router.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
var healthRouter routers.Health
|
||||
if hr, err := routers.NewHealthRouter(metricsLogger, router, ""); err != nil {
|
||||
|
||||
hr, err := routers.NewHealthRouter(metricsLogger, router, "")
|
||||
if err != nil {
|
||||
metricsLogger.Warn("Failed to initialise health router", zap.Error(err))
|
||||
} else {
|
||||
hr.SetStatus(health.SSStarting)
|
||||
@@ -60,7 +65,7 @@ func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error)
|
||||
ReadHeaderTimeout: readHeaderTimeout,
|
||||
}
|
||||
|
||||
ms := &httpServerWrapper{
|
||||
wrapper := &httpServerWrapper{
|
||||
logger: metricsLogger,
|
||||
server: httpServer,
|
||||
health: healthRouter,
|
||||
@@ -69,7 +74,9 @@ func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error)
|
||||
|
||||
go func() {
|
||||
metricsLogger.Info("Prometheus endpoint listening", zap.String("address", address))
|
||||
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
|
||||
err := httpServer.ListenAndServe()
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
metricsLogger.Error("Prometheus endpoint stopped unexpectedly", zap.Error(err))
|
||||
if healthRouter != nil {
|
||||
healthRouter.SetStatus(health.SSTerminating)
|
||||
@@ -77,7 +84,7 @@ func NewServer(logger mlogger.Logger, cfg *config.MetricsConfig) (Server, error)
|
||||
}
|
||||
}()
|
||||
|
||||
return ms, nil
|
||||
return wrapper, nil
|
||||
}
|
||||
|
||||
type httpServerWrapper struct {
|
||||
@@ -91,6 +98,7 @@ func (s *httpServerWrapper) SetStatus(status health.ServiceStatus) {
|
||||
if s == nil || s.health == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.logger.Debug("Updating metrics health status", zap.String("status", string(status)))
|
||||
s.health.SetStatus(status)
|
||||
}
|
||||
@@ -110,10 +118,12 @@ func (s *httpServerWrapper) Close(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
//nolint:contextcheck
|
||||
shutdownCtx := ctx
|
||||
if shutdownCtx == nil {
|
||||
shutdownCtx = context.Background()
|
||||
}
|
||||
|
||||
if s.timeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
shutdownCtx, cancel = context.WithTimeout(shutdownCtx, s.timeout)
|
||||
@@ -129,6 +139,6 @@ func (s *httpServerWrapper) Close(ctx context.Context) {
|
||||
|
||||
type noopServer struct{}
|
||||
|
||||
func (noopServer) SetStatus(health.ServiceStatus) {}
|
||||
func (noopServer) SetStatus(_ health.ServiceStatus) {}
|
||||
|
||||
func (noopServer) Close(context.Context) {}
|
||||
func (noopServer) Close(_ context.Context) {}
|
||||
|
||||
@@ -26,16 +26,18 @@ func main() {
|
||||
flag.Parse()
|
||||
|
||||
logger := lf.NewLogger(*debugFlag).Named("fx_ingestor")
|
||||
logger = logger.With(zap.String("instance_id", discovery.InstanceID()))
|
||||
defer logger.Sync()
|
||||
|
||||
av := appversion.Create()
|
||||
logger = logger.With(zap.String("instance_id", discovery.InstanceID()))
|
||||
defer logger.Sync() //nolint:errcheck
|
||||
|
||||
appVersion := appversion.Create()
|
||||
if *versionFlag {
|
||||
fmt.Fprintln(os.Stdout, av.Print())
|
||||
fmt.Fprintln(os.Stdout, appVersion.Print())
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info(fmt.Sprintf("Starting %s", av.Program()), zap.String("version", av.Info()))
|
||||
logger.Info("Starting "+appVersion.Program(), zap.String("version", appVersion.Info()))
|
||||
|
||||
ctx, cancel := signalctx.WithSignals(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
@@ -47,8 +49,10 @@ func main() {
|
||||
if err := application.Run(ctx); err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
logger.Info("FX ingestor stopped")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
logger.Error("Ingestor terminated with error", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user