Fixed http client
This commit is contained in:
@@ -11,6 +11,8 @@ market:
|
||||
- driver: CBR
|
||||
settings:
|
||||
base_url: "https://www.cbr.ru"
|
||||
user_agent: "Mozilla/5.0 (compatible; SendicoFX/1.0; +https://app.sendico.io)"
|
||||
accept_header: "application/xml,text/xml;q=0.9,*/*;q=0.8"
|
||||
pairs:
|
||||
BINANCE:
|
||||
- base: "USDT"
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
type cbrConnector struct {
|
||||
id mmodel.Driver
|
||||
provider string
|
||||
client *http.Client
|
||||
http *httpClient
|
||||
base string
|
||||
dailyPath string
|
||||
directoryPath string
|
||||
@@ -60,6 +60,8 @@ func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Conne
|
||||
directoryPath := defaultDirectoryPath
|
||||
dailyPath := defaultDailyPath
|
||||
dynamicPath := defaultDynamicPath
|
||||
userAgent := defaultUserAgent
|
||||
acceptHeader := defaultAccept
|
||||
|
||||
if settings != nil {
|
||||
if value, ok := settings["base_url"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
@@ -77,6 +79,12 @@ func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Conne
|
||||
if value, ok := settings["dynamic_path"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
dynamicPath = strings.TrimSpace(value)
|
||||
}
|
||||
if value, ok := settings["user_agent"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
userAgent = strings.TrimSpace(value)
|
||||
}
|
||||
if value, ok := settings["accept_header"].(string); ok && strings.TrimSpace(value) != "" {
|
||||
acceptHeader = strings.TrimSpace(value)
|
||||
}
|
||||
dialTimeout = common.DurationSetting(settings, "dial_timeout_seconds", dialTimeout)
|
||||
dialKeepAlive = common.DurationSetting(settings, "dial_keep_alive_seconds", dialKeepAlive)
|
||||
tlsHandshakeTimeout = common.DurationSetting(settings, "tls_handshake_timeout_seconds", tlsHandshakeTimeout)
|
||||
@@ -99,13 +107,24 @@ func NewConnector(logger mlogger.Logger, settings model.SettingsT) (mmodel.Conne
|
||||
transport = customTransport
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: requestTimeout,
|
||||
Transport: transport,
|
||||
}
|
||||
referer := parsed.String()
|
||||
|
||||
connector := &cbrConnector{
|
||||
id: mmodel.DriverCBR,
|
||||
provider: provider,
|
||||
client: &http.Client{
|
||||
Timeout: requestTimeout,
|
||||
Transport: transport,
|
||||
},
|
||||
http: newHTTPClient(
|
||||
logger,
|
||||
client,
|
||||
httpClientOptions{
|
||||
userAgent: userAgent,
|
||||
accept: acceptHeader,
|
||||
referer: referer,
|
||||
},
|
||||
),
|
||||
base: strings.TrimRight(parsed.String(), "/"),
|
||||
dailyPath: dailyPath,
|
||||
directoryPath: directoryPath,
|
||||
@@ -161,20 +180,32 @@ func (c *cbrConnector) refreshDirectory() error {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
|
||||
req, err := c.http.NewRequest(context.Background(), http.MethodGet, endpoint)
|
||||
if err != nil {
|
||||
return merrors.InternalWrap(err, "cbr: build directory request")
|
||||
}
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
c.logger.Warn("CBR directory request failed", zap.Error(err), zap.String("endpoint", endpoint))
|
||||
c.logger.Warn(
|
||||
"CBR directory request failed",
|
||||
zap.Error(err),
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.String("referer", c.http.headerValue("Referer")),
|
||||
zap.String("user_agent", c.http.headerValue("User-Agent")),
|
||||
)
|
||||
return merrors.InternalWrap(err, "cbr: directory request failed")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
c.logger.Warn("CBR directory returned non-OK status", zap.Int("status", resp.StatusCode), zap.String("endpoint", endpoint))
|
||||
c.logger.Warn(
|
||||
"CBR directory returned non-OK status",
|
||||
zap.Int("status", resp.StatusCode),
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.String("referer", c.http.headerValue("Referer")),
|
||||
zap.String("user_agent", c.http.headerValue("User-Agent")),
|
||||
)
|
||||
return merrors.Internal("cbr: unexpected status " + strconv.Itoa(resp.StatusCode))
|
||||
}
|
||||
|
||||
@@ -183,7 +214,11 @@ func (c *cbrConnector) refreshDirectory() error {
|
||||
|
||||
var directory valuteDirectory
|
||||
if err := decoder.Decode(&directory); err != nil {
|
||||
c.logger.Warn("CBR directory decode failed", zap.Error(err), zap.String("endpoint", endpoint))
|
||||
c.logger.Warn(
|
||||
"CBR directory decode failed",
|
||||
zap.Error(err),
|
||||
zap.String("endpoint", endpoint),
|
||||
)
|
||||
return merrors.InternalWrap(err, "cbr: decode directory")
|
||||
}
|
||||
|
||||
@@ -203,20 +238,32 @@ func (c *cbrConnector) fetchDailyRate(ctx context.Context, valute valuteInfo) (s
|
||||
return "", err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
||||
req, err := c.http.NewRequest(ctx, http.MethodGet, endpoint)
|
||||
if err != nil {
|
||||
return "", merrors.InternalWrap(err, "cbr: build daily request")
|
||||
}
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
c.logger.Warn("CBR daily request failed", zap.String("currency", valute.ISOCharCode), zap.Error(err))
|
||||
c.logger.Warn(
|
||||
"CBR daily request failed",
|
||||
zap.String("currency", valute.ISOCharCode),
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.String("referer", c.http.headerValue("Referer")),
|
||||
zap.String("user_agent", c.http.headerValue("User-Agent")),
|
||||
zap.Error(err),
|
||||
)
|
||||
return "", merrors.InternalWrap(err, "cbr: daily request failed")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
c.logger.Warn("CBR daily returned non-OK status", zap.Int("status", resp.StatusCode))
|
||||
c.logger.Warn(
|
||||
"CBR daily returned non-OK status",
|
||||
zap.Int("status", resp.StatusCode),
|
||||
zap.String("currency", valute.ISOCharCode),
|
||||
zap.String("endpoint", endpoint),
|
||||
)
|
||||
return "", merrors.Internal("cbr: unexpected status " + strconv.Itoa(resp.StatusCode))
|
||||
}
|
||||
|
||||
@@ -225,7 +272,12 @@ func (c *cbrConnector) fetchDailyRate(ctx context.Context, valute valuteInfo) (s
|
||||
|
||||
var payload dailyRates
|
||||
if err := decoder.Decode(&payload); err != nil {
|
||||
c.logger.Warn("CBR daily decode failed", zap.Error(err))
|
||||
c.logger.Warn(
|
||||
"CBR daily decode failed",
|
||||
zap.String("currency", valute.ISOCharCode),
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.Error(err),
|
||||
)
|
||||
return "", merrors.InternalWrap(err, "cbr: decode daily response")
|
||||
}
|
||||
|
||||
@@ -247,25 +299,40 @@ func (c *cbrConnector) fetchHistoricalRate(ctx context.Context, valute valuteInf
|
||||
"date_req2": date.Format("02/01/2006"),
|
||||
"VAL_NM_RQ": valute.ID,
|
||||
}
|
||||
dateStr := date.Format("2006-01-02")
|
||||
endpoint, err := c.buildURL(c.dynamicPath, query)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
||||
req, err := c.http.NewRequest(ctx, http.MethodGet, endpoint)
|
||||
if err != nil {
|
||||
return "", merrors.InternalWrap(err, "cbr: build historical request")
|
||||
}
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
c.logger.Warn("CBR historical request failed", zap.String("currency", valute.ISOCharCode), zap.Error(err))
|
||||
c.logger.Warn(
|
||||
"CBR historical request failed",
|
||||
zap.String("currency", valute.ISOCharCode),
|
||||
zap.String("date", dateStr),
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.String("referer", c.http.headerValue("Referer")),
|
||||
zap.String("user_agent", c.http.headerValue("User-Agent")),
|
||||
zap.Error(err),
|
||||
)
|
||||
return "", merrors.InternalWrap(err, "cbr: historical request failed")
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
c.logger.Warn("CBR historical returned non-OK status", zap.Int("status", resp.StatusCode))
|
||||
c.logger.Warn(
|
||||
"CBR historical returned non-OK status",
|
||||
zap.Int("status", resp.StatusCode),
|
||||
zap.String("currency", valute.ISOCharCode),
|
||||
zap.String("date", dateStr),
|
||||
zap.String("endpoint", endpoint),
|
||||
)
|
||||
return "", merrors.Internal("cbr: unexpected status " + strconv.Itoa(resp.StatusCode))
|
||||
}
|
||||
|
||||
@@ -274,7 +341,13 @@ func (c *cbrConnector) fetchHistoricalRate(ctx context.Context, valute valuteInf
|
||||
|
||||
var payload dynamicRates
|
||||
if err := decoder.Decode(&payload); err != nil {
|
||||
c.logger.Warn("CBR historical decode failed", zap.Error(err))
|
||||
c.logger.Warn(
|
||||
"CBR historical decode failed",
|
||||
zap.String("currency", valute.ISOCharCode),
|
||||
zap.String("date", dateStr),
|
||||
zap.String("endpoint", endpoint),
|
||||
zap.Error(err),
|
||||
)
|
||||
return "", merrors.InternalWrap(err, "cbr: decode historical response")
|
||||
}
|
||||
|
||||
|
||||
85
api/fx/ingestor/internal/market/cbr/http_client.go
Normal file
85
api/fx/ingestor/internal/market/cbr/http_client.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package cbr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultUserAgent = "Mozilla/5.0 (compatible; SendicoFX/1.0; +https://app.sendico.io)"
|
||||
defaultAccept = "application/xml,text/xml;q=0.9,*/*;q=0.8"
|
||||
)
|
||||
|
||||
// httpClient wraps http.Client to ensure CBR requests always carry required headers.
|
||||
type httpClient struct {
|
||||
client *http.Client
|
||||
headers http.Header
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
type httpClientOptions struct {
|
||||
userAgent string
|
||||
accept string
|
||||
referer string
|
||||
}
|
||||
|
||||
func newHTTPClient(logger *zap.Logger, client *http.Client, opts httpClientOptions) *httpClient {
|
||||
userAgent := opts.userAgent
|
||||
if strings.TrimSpace(userAgent) == "" {
|
||||
userAgent = defaultUserAgent
|
||||
}
|
||||
|
||||
accept := opts.accept
|
||||
if strings.TrimSpace(accept) == "" {
|
||||
accept = defaultAccept
|
||||
}
|
||||
|
||||
referer := opts.referer
|
||||
if strings.TrimSpace(referer) == "" {
|
||||
referer = defaultCBRBaseURL
|
||||
}
|
||||
l := logger.Named("http_client")
|
||||
|
||||
headers := make(http.Header, 3)
|
||||
headers.Set("User-Agent", userAgent)
|
||||
headers.Set("Accept", accept)
|
||||
headers.Set("Referer", referer)
|
||||
|
||||
l.Info("HTTP client initialized", zap.String("user_agent", userAgent),
|
||||
zap.String("accept", accept), zap.String("referrer", referer))
|
||||
|
||||
return &httpClient{
|
||||
client: client,
|
||||
headers: headers,
|
||||
logger: l,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpClient) NewRequest(ctx context.Context, method, endpoint string) (*http.Request, error) {
|
||||
return http.NewRequestWithContext(ctx, method, endpoint, nil)
|
||||
}
|
||||
|
||||
func (h *httpClient) Do(req *http.Request) (*http.Response, error) {
|
||||
enriched := req.Clone(req.Context())
|
||||
for key, values := range h.headers {
|
||||
if enriched.Header.Get(key) != "" {
|
||||
continue
|
||||
}
|
||||
for _, value := range values {
|
||||
enriched.Header.Add(key, value)
|
||||
}
|
||||
}
|
||||
r, err := h.client.Do(enriched)
|
||||
if err != nil {
|
||||
h.logger.Warn("HTTP request failed", zap.Error(err), zap.String("method", req.Method),
|
||||
zap.String("url", req.URL.String()))
|
||||
}
|
||||
return r, err
|
||||
}
|
||||
|
||||
func (h *httpClient) headerValue(name string) string {
|
||||
return h.headers.Get(name)
|
||||
}
|
||||
@@ -22,7 +22,7 @@ require (
|
||||
|
||||
require (
|
||||
github.com/Microsoft/go-winio v0.6.2 // indirect
|
||||
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251208031133-be43a854e4be // indirect
|
||||
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251211224604-2e727cd2e6fe // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bits-and-blooms/bitset v1.24.4 // indirect
|
||||
github.com/bmatcuk/doublestar/v4 v4.9.1 // indirect
|
||||
|
||||
@@ -6,8 +6,8 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ=
|
||||
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
|
||||
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
|
||||
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
||||
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251208031133-be43a854e4be h1:1LtMLkGIqE5IQZ7Vdh4zv8A6LECInKF86/fTVxKxYLE=
|
||||
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251208031133-be43a854e4be/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI=
|
||||
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251211224604-2e727cd2e6fe h1:Z93WiwkZABbBBb0hGVFSF9nofjiYRvdF7PUxB75oeyE=
|
||||
github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251211224604-2e727cd2e6fe/go.mod h1:ioLG6R+5bUSO1oeGSDxOV3FADARuMoytZCSX6MEMQkI=
|
||||
github.com/VictoriaMetrics/fastcache v1.13.0 h1:AW4mheMR5Vd9FkAPUv+NH6Nhw+fmbTMGMsNAoA/+4G0=
|
||||
github.com/VictoriaMetrics/fastcache v1.13.0/go.mod h1:hHXhl4DA2fTL2HTZDJFXWgW0LNjo6B+4aj2Wmng3TjU=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
|
||||
Reference in New Issue
Block a user