Linting #509
@@ -39,6 +39,7 @@ func New(logger mlogger.Logger, cfgPath string) (*App, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &App{
|
||||
logger: logger,
|
||||
cfg: cfg,
|
||||
@@ -50,7 +51,9 @@ func (a *App) Run(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.logger.Debug("Metrics server initialised")
|
||||
|
||||
defer metricsSrv.Close(context.Background()) //nolint:contextcheck
|
||||
|
||||
conn, err := db.ConnectMongo(a.logger, a.cfg.Database) //nolint:contextcheck
|
||||
@@ -65,6 +68,7 @@ func (a *App) Run(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.logger.Debug("Storage repository initialised")
|
||||
|
||||
service, err := ingestor.New(a.logger, a.cfg, repo)
|
||||
@@ -95,7 +99,8 @@ func (a *App) Run(ctx context.Context) error {
|
||||
a.logger.Info("Starting FX ingestor service", zap.String("version", appversion.Create().Info()))
|
||||
metricsSrv.SetStatus(health.SSRunning)
|
||||
|
||||
if err := service.Run(ctx); err != nil {
|
||||
err = service.Run(ctx)
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) { // ignore termination reques error
|
||||
a.logger.Error("Ingestor service exited with error", zap.Error(err))
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
const defaultPollInterval = 30 * time.Second
|
||||
|
||||
type Config struct {
|
||||
PollIntervalSeconds int `yaml:"poll_interval_seconds"`
|
||||
PollIntervalSeconds int `yaml:"poll_interval_seconds"` //nolint:tagliatelle // matches config file format
|
||||
Market MarketConfig `yaml:"market"`
|
||||
Database *db.Config `yaml:"database"`
|
||||
Metrics *MetricsConfig `yaml:"metrics"`
|
||||
@@ -25,7 +25,7 @@ type Config struct {
|
||||
pairsBySource map[mmodel.Driver][]PairConfig
|
||||
}
|
||||
|
||||
//nolint:cyclop
|
||||
//nolint:cyclop,funlen
|
||||
func Load(path string) (*Config, error) {
|
||||
if path == "" {
|
||||
return nil, merrors.InvalidArgument("config: path is empty")
|
||||
@@ -63,6 +63,7 @@ func Load(path string) (*Config, error) {
|
||||
|
||||
normalizedPairs := make(map[string][]PairConfig, len(cfg.Market.Pairs))
|
||||
pairsBySource := make(map[mmodel.Driver][]PairConfig, len(cfg.Market.Pairs))
|
||||
|
||||
var flattened []Pair
|
||||
|
||||
for rawSource, pairList := range cfg.Market.Pairs {
|
||||
@@ -72,7 +73,8 @@ func Load(path string) (*Config, error) {
|
||||
}
|
||||
|
||||
if _, ok := sourceSet[driver]; !ok {
|
||||
return nil, merrors.InvalidArgument("config: pair references unknown source: "+driver.String(), "pairs."+driver.String())
|
||||
return nil, merrors.InvalidArgument( //nolint:lll
|
||||
"config: pair references unknown source: "+driver.String(), "pairs."+driver.String())
|
||||
}
|
||||
|
||||
processed := make([]PairConfig, len(pairList))
|
||||
@@ -82,13 +84,16 @@ func Load(path string) (*Config, error) {
|
||||
pair.Quote = strings.ToUpper(strings.TrimSpace(pair.Quote))
|
||||
|
||||
pair.Symbol = strings.TrimSpace(pair.Symbol)
|
||||
|
||||
if pair.Base == "" || pair.Quote == "" || pair.Symbol == "" {
|
||||
return nil, merrors.InvalidArgument("config: pair entries must define base, quote, and symbol", "pairs."+driver.String())
|
||||
return nil, merrors.InvalidArgument( //nolint:lll
|
||||
"config: pair entries must define base, quote, and symbol", "pairs."+driver.String())
|
||||
}
|
||||
|
||||
if strings.TrimSpace(pair.Provider) == "" {
|
||||
pair.Provider = strings.ToLower(driver.String())
|
||||
}
|
||||
|
||||
processed[idx] = pair
|
||||
flattened = append(flattened, Pair{
|
||||
PairConfig: pair,
|
||||
@@ -111,7 +116,7 @@ func Load(path string) (*Config, error) {
|
||||
if cfg.Metrics != nil && cfg.Metrics.Enabled {
|
||||
cfg.Metrics.Address = strings.TrimSpace(cfg.Metrics.Address)
|
||||
if cfg.Metrics.Address == "" {
|
||||
cfg.Metrics.Address = ":9102" //nolint:mnd
|
||||
cfg.Metrics.Address = ":9102"
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -42,21 +42,26 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultDialTimeoutSeconds = 5 * time.Second
|
||||
defaultDialKeepAliveSeconds = 30 * time.Second
|
||||
defaultTLSHandshakeTimeoutSeconds = 5 * time.Second
|
||||
defaultResponseHeaderTimeoutSeconds = 10 * time.Second
|
||||
defaultRequestTimeoutSeconds = 10 * time.Second
|
||||
defaultDialTimeout = 5 * time.Second
|
||||
defaultDialKeepAlive = 30 * time.Second
|
||||
defaultTLSHandshakeTimeout = 5 * time.Second
|
||||
defaultResponseHeaderTimeout = 10 * time.Second
|
||||
defaultRequestTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { //nolint:cyclop,ireturn
|
||||
const (
|
||||
maxSymbolParts = 2
|
||||
isoCodeLen = 3
|
||||
)
|
||||
|
||||
func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Connector, error) { //nolint:cyclop,funlen,nestif,ireturn
|
||||
baseURL := defaultCBRBaseURL
|
||||
provider := strings.ToLower(mmodel.DriverCBR.String())
|
||||
dialTimeout := defaultDialTimeoutSeconds
|
||||
dialKeepAlive := defaultDialKeepAliveSeconds
|
||||
tlsHandshakeTimeout := defaultTLSHandshakeTimeoutSeconds
|
||||
responseHeaderTimeout := defaultResponseHeaderTimeoutSeconds
|
||||
requestTimeout := defaultRequestTimeoutSeconds
|
||||
dialTimeout := defaultDialTimeout
|
||||
dialKeepAlive := defaultDialKeepAlive
|
||||
tlsHandshakeTimeout := defaultTLSHandshakeTimeout
|
||||
responseHeaderTimeout := defaultResponseHeaderTimeout
|
||||
requestTimeout := defaultRequestTimeout
|
||||
directoryPath := defaultDirectoryPath
|
||||
dailyPath := defaultDailyPath
|
||||
dynamicPath := defaultDynamicPath
|
||||
@@ -79,9 +84,11 @@ func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Conne
|
||||
if value, ok := settings["dynamic_path"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
dynamicPath = strings.TrimSpace(value)
|
||||
}
|
||||
|
||||
if value, ok := settings["user_agent"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
userAgent = strings.TrimSpace(value)
|
||||
}
|
||||
|
||||
if value, ok := settings["accept_header"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
acceptHeader = strings.TrimSpace(value)
|
||||
}
|
||||
@@ -132,7 +139,8 @@ func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Conne
|
||||
logger: logger.Named("cbr"),
|
||||
}
|
||||
|
||||
if err := connector.refreshDirectory(); err != nil {
|
||||
err = connector.refreshDirectory()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -165,6 +173,7 @@ func (c *cbrConnector) FetchTicker(ctx context.Context, symbol string) (*mmodel.
|
||||
}
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
|
||||
return &mmodel.Ticker{
|
||||
Symbol: formatSymbol(isoCode, asOfDate),
|
||||
BidPrice: price,
|
||||
@@ -213,7 +222,9 @@ func (c *cbrConnector) refreshDirectory() error {
|
||||
decoder.CharsetReader = charset.NewReaderLabel
|
||||
|
||||
var directory valuteDirectory
|
||||
if err := decoder.Decode(&directory); err != nil {
|
||||
|
||||
err = decoder.Decode(&directory)
|
||||
if err != nil {
|
||||
c.logger.Warn("CBR directory decode failed", zap.Error(err), zap.String("endpoint", endpoint))
|
||||
return merrors.InternalWrap(err, "cbr: decode directory")
|
||||
}
|
||||
@@ -226,6 +237,7 @@ func (c *cbrConnector) refreshDirectory() error {
|
||||
|
||||
c.byISO = mapping.byISO
|
||||
c.byID = mapping.byID
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -265,7 +277,9 @@ func (c *cbrConnector) fetchDailyRate(ctx context.Context, valute valuteInfo) (s
|
||||
decoder.CharsetReader = charset.NewReaderLabel
|
||||
|
||||
var payload dailyRates
|
||||
if err := decoder.Decode(&payload); err != nil {
|
||||
|
||||
err = decoder.Decode(&payload)
|
||||
if err != nil {
|
||||
c.logger.Warn("CBR daily decode failed", zap.Error(err),
|
||||
zap.String("currency", valute.ISOCharCode), zap.String("endpoint", endpoint),
|
||||
)
|
||||
@@ -284,13 +298,16 @@ func (c *cbrConnector) fetchDailyRate(ctx context.Context, valute valuteInfo) (s
|
||||
return computePrice(entry.Value, entry.Nominal)
|
||||
}
|
||||
|
||||
func (c *cbrConnector) fetchHistoricalRate(ctx context.Context, valute valuteInfo, date time.Time) (string, error) { //nolint:funlen
|
||||
func (c *cbrConnector) fetchHistoricalRate( //nolint:funlen
|
||||
ctx context.Context, valute valuteInfo, date time.Time,
|
||||
) (string, error) {
|
||||
query := map[string]string{
|
||||
"date_req1": date.Format("02/01/2006"),
|
||||
"date_req2": date.Format("02/01/2006"),
|
||||
"VAL_NM_RQ": valute.ID,
|
||||
}
|
||||
dateStr := date.Format("2006-01-02")
|
||||
|
||||
endpoint, err := c.buildURL(c.dynamicPath, query)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@@ -365,6 +382,7 @@ func (c *cbrConnector) buildURL(path string, query map[string]string) (string, e
|
||||
if err != nil {
|
||||
return "", merrors.InternalWrap(err, "cbr: parse base url")
|
||||
}
|
||||
|
||||
base.Path = strings.TrimRight(base.Path, "/") + path
|
||||
|
||||
q := base.Query()
|
||||
@@ -402,13 +420,13 @@ type valuteMapping struct {
|
||||
byID map[string]valuteInfo
|
||||
}
|
||||
|
||||
func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping, error) { //nolint:gocognit,nestif
|
||||
func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping, error) { //nolint:cyclop,gocognit,nestif
|
||||
byISO := make(map[string]valuteInfo, len(items))
|
||||
byID := make(map[string]valuteInfo, len(items))
|
||||
byNum := make(map[string]string, len(items))
|
||||
|
||||
for _, item := range items {
|
||||
id := strings.TrimSpace(item.ID)
|
||||
valuteID := strings.TrimSpace(item.ID)
|
||||
isoChar := strings.ToUpper(strings.TrimSpace(item.ISOChar))
|
||||
isoNum := strings.TrimSpace(item.ISONum)
|
||||
name := strings.TrimSpace(item.Name)
|
||||
@@ -419,17 +437,18 @@ func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping,
|
||||
return nil, merrors.InvalidDataType("cbr: parse directory nominal: " + err.Error())
|
||||
}
|
||||
|
||||
if id == "" || isoChar == "" {
|
||||
if valuteID == "" || isoChar == "" {
|
||||
logger.Info("Skipping invalid currency entry",
|
||||
zap.String("id", id),
|
||||
zap.String("id", valuteID),
|
||||
zap.String("iso_char", isoChar),
|
||||
zap.String("name", name),
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
info := valuteInfo{
|
||||
ID: id,
|
||||
ID: valuteID,
|
||||
ISOCharCode: isoChar,
|
||||
ISONumCode: isoNum,
|
||||
Name: name,
|
||||
@@ -440,11 +459,12 @@ func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping,
|
||||
// Handle duplicate ISO char codes (e.g. DEM with different IDs / nominals).
|
||||
if existing, ok := byISO[isoChar]; ok {
|
||||
// Same ISO + same ID: duplicate entry, just ignore.
|
||||
if existing.ID == id {
|
||||
if existing.ID == valuteID {
|
||||
logger.Debug("Duplicate directory entry for same ISO and ID, ignoring",
|
||||
zap.String("iso_code", isoChar),
|
||||
zap.String("id", id),
|
||||
zap.String("id", valuteID),
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -486,17 +506,18 @@ func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping,
|
||||
|
||||
// Update byID: drop old ID, add new one
|
||||
delete(byID, existing.ID)
|
||||
byID[id] = info
|
||||
byID[valuteID] = info
|
||||
|
||||
// Update ISO mapping
|
||||
byISO[isoChar] = info
|
||||
|
||||
// Update numeric-code index if present
|
||||
if isoNum != "" {
|
||||
if existingID, ok := byNum[isoNum]; ok && existingID != id {
|
||||
if existingID, ok := byNum[isoNum]; ok && existingID != valuteID {
|
||||
return nil, merrors.InvalidDataType("cbr: duplicate ISO numeric code " + isoNum)
|
||||
}
|
||||
byNum[isoNum] = id
|
||||
|
||||
byNum[isoNum] = valuteID
|
||||
}
|
||||
|
||||
continue
|
||||
@@ -504,23 +525,24 @@ func buildValuteMapping(logger *zap.Logger, items []valuteItem) (*valuteMapping,
|
||||
|
||||
// No existing ISO entry, do normal uniqueness checks.
|
||||
|
||||
if existing, ok := byID[id]; ok && existing.ISOCharCode != isoChar {
|
||||
return nil, merrors.InvalidDataType("cbr: duplicate valute id " + id)
|
||||
if existing, ok := byID[valuteID]; ok && existing.ISOCharCode != isoChar {
|
||||
return nil, merrors.InvalidDataType("cbr: duplicate valute id " + valuteID)
|
||||
}
|
||||
|
||||
if isoNum != "" {
|
||||
if existingID, ok := byNum[isoNum]; ok && existingID != id {
|
||||
if existingID, ok := byNum[isoNum]; ok && existingID != valuteID {
|
||||
return nil, merrors.InvalidDataType("cbr: duplicate ISO numeric code " + isoNum)
|
||||
}
|
||||
byNum[isoNum] = id
|
||||
|
||||
byNum[isoNum] = valuteID
|
||||
}
|
||||
|
||||
logger.Info("Installing currency code",
|
||||
zap.String("iso_code", isoChar), zap.String("id", id), zap.Int64("nominal", nominal),
|
||||
zap.String("iso_code", isoChar), zap.String("id", valuteID), zap.Int64("nominal", nominal),
|
||||
)
|
||||
|
||||
byISO[isoChar] = info
|
||||
byID[id] = info
|
||||
byID[valuteID] = info
|
||||
}
|
||||
|
||||
if len(byISO) == 0 {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package cbr
|
||||
package cbr //nolint:testpackage
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
)
|
||||
|
||||
func TestFetchTickerDaily(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
transport := &stubRoundTripper{
|
||||
responses: map[string]stubResponse{
|
||||
"/scripts/XML_valFull.asp": {body: valuteDirectoryXML},
|
||||
@@ -47,10 +49,16 @@ func TestFetchTickerDaily(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFetchTickerValidatesDailyEntry(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
transport := &stubRoundTripper{
|
||||
responses: map[string]stubResponse{
|
||||
"/scripts/XML_valFull.asp": {body: valuteDirectoryXML},
|
||||
"/scripts/XML_daily.asp": {body: strings.ReplaceAll(dailyRatesXML, "<CharCode>USD</CharCode>", "<CharCode>XXX</CharCode>")},
|
||||
"/scripts/XML_daily.asp": {
|
||||
body: strings.ReplaceAll(
|
||||
dailyRatesXML, "<CharCode>USD</CharCode>", "<CharCode>XXX</CharCode>",
|
||||
),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -68,6 +76,8 @@ func TestFetchTickerValidatesDailyEntry(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFetchTickerHistorical(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
transport := &stubRoundTripper{
|
||||
responses: map[string]stubResponse{
|
||||
"/scripts/XML_valFull.asp": {body: valuteDirectoryXML},
|
||||
@@ -75,13 +85,13 @@ func TestFetchTickerHistorical(t *testing.T) {
|
||||
body: dynamicRatesXML,
|
||||
check: func(r *http.Request) error {
|
||||
if got := r.URL.Query().Get("VAL_NM_RQ"); got != "R01235" {
|
||||
return fmt.Errorf("unexpected valute id: %s", got)
|
||||
return fmt.Errorf("unexpected valute id: %s", got) //nolint:err113
|
||||
}
|
||||
if got := r.URL.Query().Get("date_req1"); got != "05/01/2023" {
|
||||
return fmt.Errorf("unexpected date_req1: %s", got)
|
||||
return fmt.Errorf("unexpected date_req1: %s", got) //nolint:err113
|
||||
}
|
||||
if got := r.URL.Query().Get("date_req2"); got != "05/01/2023" {
|
||||
return fmt.Errorf("unexpected date_req2: %s", got)
|
||||
return fmt.Errorf("unexpected date_req2: %s", got) //nolint:err113
|
||||
}
|
||||
return nil
|
||||
},
|
||||
@@ -111,6 +121,8 @@ func TestFetchTickerHistorical(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFetchTickerUnknownCurrency(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
transport := &stubRoundTripper{
|
||||
responses: map[string]stubResponse{
|
||||
"/scripts/XML_valFull.asp": {body: valuteDirectoryXML},
|
||||
@@ -136,6 +148,8 @@ func TestFetchTickerUnknownCurrency(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFetchTickerRespectsCustomPaths(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
transport := &stubRoundTripper{
|
||||
responses: map[string]stubResponse{
|
||||
"/dir.xml": {body: valuteDirectoryXML},
|
||||
@@ -200,11 +214,11 @@ type stubRoundTripper struct {
|
||||
|
||||
func (s *stubRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
if s.responses == nil {
|
||||
return nil, fmt.Errorf("no responses configured")
|
||||
return nil, fmt.Errorf("no responses configured") //nolint:err113
|
||||
}
|
||||
res, ok := s.responses[req.URL.Path]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected request path: %s", req.URL.Path)
|
||||
return nil, fmt.Errorf("unexpected request path: %s", req.URL.Path) //nolint:err113
|
||||
}
|
||||
if res.check != nil {
|
||||
if err := res.check(req); err != nil {
|
||||
|
||||
@@ -41,20 +41,20 @@ func newHTTPClient(logger *zap.Logger, client *http.Client, opts httpClientOptio
|
||||
if strings.TrimSpace(referer) == "" {
|
||||
referer = defaultCBRBaseURL
|
||||
}
|
||||
l := logger.Named("http_client")
|
||||
httpLogger := logger.Named("http_client")
|
||||
|
||||
headers := make(http.Header, 3)
|
||||
headers.Set("User-Agent", userAgent)
|
||||
headers.Set("Accept", accept)
|
||||
headers.Set("Referer", referer)
|
||||
|
||||
l.Info("HTTP client initialized", zap.String("user_agent", userAgent),
|
||||
httpLogger.Info("HTTP client initialized", zap.String("user_agent", userAgent),
|
||||
zap.String("accept", accept), zap.String("referrer", referer))
|
||||
|
||||
return &httpClient{
|
||||
client: client,
|
||||
headers: headers,
|
||||
logger: l,
|
||||
logger: httpLogger,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,6 +68,7 @@ func (h *httpClient) Do(req *http.Request) (*http.Response, error) {
|
||||
if enriched.Header.Get(key) != "" {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, value := range values {
|
||||
enriched.Header.Add(key, value)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user