linting
This commit is contained in:
@@ -14,8 +14,8 @@ var (
|
||||
BuildDate string
|
||||
)
|
||||
|
||||
func Create() version.Printer {
|
||||
vi := version.Info{
|
||||
func Create() version.Printer { //nolint:ireturn // factory returns interface by design
|
||||
info := version.Info{
|
||||
Program: "Sendico Discovery Service",
|
||||
Revision: Revision,
|
||||
Branch: Branch,
|
||||
@@ -23,5 +23,6 @@ func Create() version.Printer {
|
||||
BuildDate: BuildDate,
|
||||
Version: Version,
|
||||
}
|
||||
return vf.Create(&vi)
|
||||
|
||||
return vf.Create(&info)
|
||||
}
|
||||
|
||||
@@ -10,7 +10,10 @@ import (
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
const defaultMetricsAddress = ":9405"
|
||||
const (
|
||||
defaultMetricsAddress = ":9405"
|
||||
defaultShutdownTimeoutSeconds = 15
|
||||
)
|
||||
|
||||
type config struct {
|
||||
Runtime *grpcapp.RuntimeConfig `yaml:"runtime"`
|
||||
@@ -24,24 +27,28 @@ type metricsConfig struct {
|
||||
}
|
||||
|
||||
type registryConfig struct {
|
||||
KVTTLSeconds *int `yaml:"kv_ttl_seconds"`
|
||||
KVTTLSeconds *int `yaml:"kv_ttl_seconds"` //nolint:tagliatelle // matches config file format
|
||||
}
|
||||
|
||||
func (i *Imp) loadConfig() (*config, error) {
|
||||
data, err := os.ReadFile(i.file)
|
||||
if err != nil {
|
||||
i.logger.Error("Could not read configuration file", zap.String("config_file", i.file), zap.Error(err))
|
||||
return nil, err
|
||||
|
||||
return nil, err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
cfg := &config{}
|
||||
if err := yaml.Unmarshal(data, cfg); err != nil {
|
||||
|
||||
err = yaml.Unmarshal(data, cfg)
|
||||
if err != nil {
|
||||
i.logger.Error("Failed to parse configuration", zap.Error(err))
|
||||
return nil, err
|
||||
|
||||
return nil, err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
if cfg.Runtime == nil {
|
||||
cfg.Runtime = &grpcapp.RuntimeConfig{ShutdownTimeoutSeconds: 15}
|
||||
cfg.Runtime = &grpcapp.RuntimeConfig{ShutdownTimeoutSeconds: defaultShutdownTimeoutSeconds}
|
||||
}
|
||||
|
||||
if cfg.Metrics != nil && strings.TrimSpace(cfg.Metrics.Address) == "" {
|
||||
|
||||
@@ -14,30 +14,37 @@ import (
|
||||
|
||||
func (i *Imp) startDiscovery(cfg *config) error {
|
||||
if cfg == nil || cfg.Messaging == nil || cfg.Messaging.Driver == "" {
|
||||
//nolint:wrapcheck
|
||||
return merrors.InvalidArgument("discovery service: messaging configuration is required", "messaging")
|
||||
}
|
||||
|
||||
broker, err := msg.CreateMessagingBroker(i.logger.Named("discovery_bus"), cfg.Messaging)
|
||||
if err != nil {
|
||||
return err
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
i.logger.Info("Discovery messaging broker ready", zap.String("messaging_driver", string(cfg.Messaging.Driver)))
|
||||
producer := msgproducer.NewProducer(i.logger.Named("discovery_producer"), broker)
|
||||
|
||||
registry := discovery.NewRegistry()
|
||||
|
||||
var registryOpts []discovery.RegistryOption
|
||||
|
||||
if cfg.Registry != nil && cfg.Registry.KVTTLSeconds != nil {
|
||||
ttlSeconds := *cfg.Registry.KVTTLSeconds
|
||||
if ttlSeconds < 0 {
|
||||
i.logger.Warn("Discovery registry TTL is negative, disabling TTL", zap.Int("ttl_seconds", ttlSeconds))
|
||||
ttlSeconds = 0
|
||||
}
|
||||
|
||||
registryOpts = append(registryOpts, discovery.WithRegistryKVTTL(time.Duration(ttlSeconds)*time.Second))
|
||||
}
|
||||
svc, err := discovery.NewRegistryService(i.logger, broker, producer, registry, string(mservice.Discovery), registryOpts...)
|
||||
|
||||
svc, err := discovery.NewRegistryService(i.logger, broker, producer, registry, mservice.Discovery, registryOpts...)
|
||||
if err != nil {
|
||||
return err
|
||||
return err //nolint:wrapcheck
|
||||
}
|
||||
|
||||
svc.Start()
|
||||
i.registrySvc = svc
|
||||
|
||||
@@ -47,10 +54,11 @@ func (i *Imp) startDiscovery(cfg *config) error {
|
||||
Operations: []string{"discovery.lookup"},
|
||||
Version: appversion.Create().Short(),
|
||||
}
|
||||
i.announcer = discovery.NewAnnouncer(i.logger, producer, string(mservice.Discovery), announce)
|
||||
i.announcer = discovery.NewAnnouncer(i.logger, producer, mservice.Discovery, announce)
|
||||
i.announcer.Start()
|
||||
|
||||
i.logger.Info("Discovery registry service started", zap.String("messaging_driver", string(cfg.Messaging.Driver)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -58,10 +66,12 @@ func (i *Imp) stopDiscovery() {
|
||||
if i == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if i.announcer != nil {
|
||||
i.announcer.Stop()
|
||||
i.announcer = nil
|
||||
}
|
||||
|
||||
if i.registrySvc != nil {
|
||||
i.registrySvc.Stop()
|
||||
i.registrySvc = nil
|
||||
|
||||
@@ -15,22 +15,30 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const readHeaderTimeout = 5 * time.Second
|
||||
|
||||
func (i *Imp) startMetrics(cfg *metricsConfig) {
|
||||
if i == nil {
|
||||
return
|
||||
}
|
||||
|
||||
address := ""
|
||||
if cfg != nil {
|
||||
address = strings.TrimSpace(cfg.Address)
|
||||
}
|
||||
|
||||
if address == "" {
|
||||
i.logger.Info("Metrics endpoint disabled")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
listener, err := net.Listen("tcp", address)
|
||||
lc := net.ListenConfig{}
|
||||
|
||||
listener, err := lc.Listen(context.Background(), "tcp", address)
|
||||
if err != nil {
|
||||
i.logger.Error("Failed to bind metrics listener", zap.String("address", address), zap.Error(err))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -38,7 +46,9 @@ func (i *Imp) startMetrics(cfg *metricsConfig) {
|
||||
router.Handle("/metrics", promhttp.Handler())
|
||||
|
||||
var healthRouter routers.Health
|
||||
if hr, err := routers.NewHealthRouter(i.logger.Named("metrics"), router, ""); err != nil {
|
||||
|
||||
hr, err := routers.NewHealthRouter(i.logger.Named("metrics"), router, "")
|
||||
if err != nil {
|
||||
i.logger.Warn("Failed to initialise health router", zap.Error(err))
|
||||
} else {
|
||||
hr.SetStatus(health.SSStarting)
|
||||
@@ -49,13 +59,16 @@ func (i *Imp) startMetrics(cfg *metricsConfig) {
|
||||
i.metricsSrv = &http.Server{
|
||||
Addr: address,
|
||||
Handler: router,
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
ReadHeaderTimeout: readHeaderTimeout,
|
||||
}
|
||||
|
||||
go func() {
|
||||
i.logger.Info("Prometheus endpoint listening", zap.String("address", address))
|
||||
if err := i.metricsSrv.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
i.logger.Error("Prometheus endpoint stopped unexpectedly", zap.Error(err))
|
||||
|
||||
serveErr := i.metricsSrv.Serve(listener)
|
||||
if serveErr != nil && !errors.Is(serveErr, http.ErrServerClosed) {
|
||||
i.logger.Error("Prometheus endpoint stopped unexpectedly", zap.Error(serveErr))
|
||||
|
||||
if healthRouter != nil {
|
||||
healthRouter.SetStatus(health.SSTerminating)
|
||||
}
|
||||
@@ -69,14 +82,18 @@ func (i *Imp) shutdownMetrics(ctx context.Context) {
|
||||
i.metricsHealth.Finish()
|
||||
i.metricsHealth = nil
|
||||
}
|
||||
|
||||
if i.metricsSrv == nil {
|
||||
return
|
||||
}
|
||||
if err := i.metricsSrv.Shutdown(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
|
||||
err := i.metricsSrv.Shutdown(ctx)
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
i.logger.Warn("Failed to stop metrics server", zap.Error(err))
|
||||
} else {
|
||||
i.logger.Info("Metrics server stopped")
|
||||
}
|
||||
|
||||
i.metricsSrv = nil
|
||||
}
|
||||
|
||||
@@ -84,5 +101,6 @@ func (i *Imp) setMetricsStatus(status health.ServiceStatus) {
|
||||
if i == nil || i.metricsHealth == nil {
|
||||
return
|
||||
}
|
||||
|
||||
i.metricsHealth.SetStatus(status)
|
||||
}
|
||||
|
||||
@@ -10,6 +10,8 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const defaultShutdownTimeout = 15 * time.Second
|
||||
|
||||
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
|
||||
return &Imp{
|
||||
logger: logger.Named("server"),
|
||||
@@ -28,29 +30,37 @@ func (i *Imp) Start() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
i.config = cfg
|
||||
|
||||
messagingDriver := "none"
|
||||
if cfg.Messaging != nil {
|
||||
messagingDriver = string(cfg.Messaging.Driver)
|
||||
}
|
||||
|
||||
metricsAddress := ""
|
||||
if cfg.Metrics != nil {
|
||||
metricsAddress = strings.TrimSpace(cfg.Metrics.Address)
|
||||
}
|
||||
|
||||
if metricsAddress == "" {
|
||||
metricsAddress = "disabled"
|
||||
}
|
||||
i.logger.Info("Discovery config loaded", zap.String("messaging_driver", messagingDriver), zap.String("metrics_address", metricsAddress))
|
||||
|
||||
i.logger.Info("Discovery config loaded",
|
||||
zap.String("messaging_driver", messagingDriver),
|
||||
zap.String("metrics_address", metricsAddress))
|
||||
|
||||
i.startMetrics(cfg.Metrics)
|
||||
|
||||
if err := i.startDiscovery(cfg); err != nil {
|
||||
err = i.startDiscovery(cfg)
|
||||
if err != nil {
|
||||
i.stopDiscovery()
|
||||
i.setMetricsStatus(health.SSTerminating)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), i.shutdownTimeout())
|
||||
i.shutdownMetrics(ctx)
|
||||
cancel()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -59,6 +69,7 @@ func (i *Imp) Start() error {
|
||||
|
||||
<-i.stopCh
|
||||
i.logger.Info("Discovery service stop signal received")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -72,6 +83,7 @@ func (i *Imp) Shutdown() {
|
||||
if i.doneCh != nil {
|
||||
<-i.doneCh
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
i.shutdownMetrics(ctx)
|
||||
cancel()
|
||||
@@ -83,6 +95,7 @@ func (i *Imp) initStopChannels() {
|
||||
if i.stopCh == nil {
|
||||
i.stopCh = make(chan struct{})
|
||||
}
|
||||
|
||||
if i.doneCh == nil {
|
||||
i.doneCh = make(chan struct{})
|
||||
}
|
||||
@@ -108,5 +121,6 @@ func (i *Imp) shutdownTimeout() time.Duration {
|
||||
if i.config != nil && i.config.Runtime != nil {
|
||||
return i.config.Runtime.ShutdownTimeout()
|
||||
}
|
||||
return 15 * time.Second
|
||||
|
||||
return defaultShutdownTimeout
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/tech/sendico/pkg/server"
|
||||
)
|
||||
|
||||
//nolint:ireturn // factory returns interface by design
|
||||
func Create(logger mlogger.Logger, file string, debug bool) (server.Application, error) {
|
||||
return serverimp.Create(logger, file, debug)
|
||||
return serverimp.Create(logger, file, debug) //nolint:wrapcheck
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user