fixed excessive logging non-nil checks)
This commit is contained in:
@@ -15,3 +15,6 @@ messaging:
|
|||||||
broker_name: Discovery Service
|
broker_name: Discovery Service
|
||||||
max_reconnects: 10
|
max_reconnects: 10
|
||||||
reconnect_wait: 5
|
reconnect_wait: 5
|
||||||
|
|
||||||
|
registry:
|
||||||
|
kv_ttl_seconds: 3600
|
||||||
|
|||||||
@@ -16,12 +16,17 @@ type config struct {
|
|||||||
Runtime *grpcapp.RuntimeConfig `yaml:"runtime"`
|
Runtime *grpcapp.RuntimeConfig `yaml:"runtime"`
|
||||||
Messaging *msg.Config `yaml:"messaging"`
|
Messaging *msg.Config `yaml:"messaging"`
|
||||||
Metrics *metricsConfig `yaml:"metrics"`
|
Metrics *metricsConfig `yaml:"metrics"`
|
||||||
|
Registry *registryConfig `yaml:"registry"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type metricsConfig struct {
|
type metricsConfig struct {
|
||||||
Address string `yaml:"address"`
|
Address string `yaml:"address"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type registryConfig struct {
|
||||||
|
KVTTLSeconds *int `yaml:"kv_ttl_seconds"`
|
||||||
|
}
|
||||||
|
|
||||||
func (i *Imp) loadConfig() (*config, error) {
|
func (i *Imp) loadConfig() (*config, error) {
|
||||||
data, err := os.ReadFile(i.file)
|
data, err := os.ReadFile(i.file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
package serverimp
|
package serverimp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/tech/sendico/discovery/internal/appversion"
|
"github.com/tech/sendico/discovery/internal/appversion"
|
||||||
"github.com/tech/sendico/pkg/discovery"
|
"github.com/tech/sendico/pkg/discovery"
|
||||||
"github.com/tech/sendico/pkg/merrors"
|
"github.com/tech/sendico/pkg/merrors"
|
||||||
@@ -23,7 +25,16 @@ func (i *Imp) startDiscovery(cfg *config) error {
|
|||||||
producer := msgproducer.NewProducer(i.logger.Named("discovery_producer"), broker)
|
producer := msgproducer.NewProducer(i.logger.Named("discovery_producer"), broker)
|
||||||
|
|
||||||
registry := discovery.NewRegistry()
|
registry := discovery.NewRegistry()
|
||||||
svc, err := discovery.NewRegistryService(i.logger, broker, producer, registry, string(mservice.Discovery))
|
var registryOpts []discovery.RegistryOption
|
||||||
|
if cfg.Registry != nil && cfg.Registry.KVTTLSeconds != nil {
|
||||||
|
ttlSeconds := *cfg.Registry.KVTTLSeconds
|
||||||
|
if ttlSeconds < 0 {
|
||||||
|
i.logger.Warn("Discovery registry TTL is negative, disabling TTL", zap.Int("ttl_seconds", ttlSeconds))
|
||||||
|
ttlSeconds = 0
|
||||||
|
}
|
||||||
|
registryOpts = append(registryOpts, discovery.WithRegistryKVTTL(time.Duration(ttlSeconds)*time.Second))
|
||||||
|
}
|
||||||
|
svc, err := discovery.NewRegistryService(i.logger, broker, producer, registry, string(mservice.Discovery), registryOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,9 +25,10 @@ type Announcer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewAnnouncer(logger mlogger.Logger, producer msg.Producer, sender string, announce Announcement) *Announcer {
|
func NewAnnouncer(logger mlogger.Logger, producer msg.Producer, sender string, announce Announcement) *Announcer {
|
||||||
if logger != nil {
|
if logger == nil {
|
||||||
logger = logger.Named("discovery")
|
logger = zap.NewNop()
|
||||||
}
|
}
|
||||||
|
logger = logger.Named("discovery")
|
||||||
announce = normalizeAnnouncement(announce)
|
announce = normalizeAnnouncement(announce)
|
||||||
if announce.Service == "" {
|
if announce.Service == "" {
|
||||||
announce.Service = strings.TrimSpace(sender)
|
announce.Service = strings.TrimSpace(sender)
|
||||||
@@ -132,14 +133,14 @@ func (a *Announcer) sendHeartbeat() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *Announcer) logInfo(message string, fields ...zap.Field) {
|
func (a *Announcer) logInfo(message string, fields ...zap.Field) {
|
||||||
if a.logger == nil {
|
if a == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
a.logger.Info(message, fields...)
|
a.logger.Info(message, fields...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *Announcer) logWarn(message string, fields ...zap.Field) {
|
func (a *Announcer) logWarn(message string, fields ...zap.Field) {
|
||||||
if a.logger == nil {
|
if a == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
a.logger.Warn(message, fields...)
|
a.logger.Warn(message, fields...)
|
||||||
|
|||||||
@@ -31,9 +31,10 @@ func NewClient(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer
|
|||||||
if msgBroker == nil {
|
if msgBroker == nil {
|
||||||
return nil, errors.New("discovery client: broker is nil")
|
return nil, errors.New("discovery client: broker is nil")
|
||||||
}
|
}
|
||||||
if logger != nil {
|
if logger == nil {
|
||||||
logger = logger.Named("discovery_client")
|
logger = zap.NewNop()
|
||||||
}
|
}
|
||||||
|
logger = logger.Named("discovery_client")
|
||||||
if producer == nil {
|
if producer == nil {
|
||||||
producer = msgproducer.NewProducer(logger, msgBroker)
|
producer = msgproducer.NewProducer(logger, msgBroker)
|
||||||
}
|
}
|
||||||
@@ -56,7 +57,7 @@ func NewClient(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer
|
|||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
if err := consumer.ConsumeMessages(client.handleLookupResponse); err != nil && client.logger != nil {
|
if err := consumer.ConsumeMessages(client.handleLookupResponse); err != nil {
|
||||||
client.logger.Warn("Discovery lookup consumer stopped", zap.String("event", LookupResponseEvent().ToString()), zap.Error(err))
|
client.logger.Warn("Discovery lookup consumer stopped", zap.String("event", LookupResponseEvent().ToString()), zap.Error(err))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -131,7 +132,7 @@ func (c *Client) handleLookupResponse(_ context.Context, env me.Envelope) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) logWarn(message string, fields ...zap.Field) {
|
func (c *Client) logWarn(message string, fields ...zap.Field) {
|
||||||
if c == nil || c.logger == nil {
|
if c == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.logger.Warn(message, fields...)
|
c.logger.Warn(message, fields...)
|
||||||
|
|||||||
@@ -3,8 +3,6 @@ package discovery
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -13,11 +11,34 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const DefaultKVBucket = "discovery_registry"
|
||||||
DefaultKVBucket = "discovery_registry"
|
|
||||||
kvTTLSecondsEnv = "DISCOVERY_KV_TTL_SECONDS"
|
type kvStoreOptions struct {
|
||||||
defaultKVTTLSeconds = 3600
|
ttl time.Duration
|
||||||
)
|
ttlSet bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type KVStoreOption func(*kvStoreOptions)
|
||||||
|
|
||||||
|
func WithKVTTL(ttl time.Duration) KVStoreOption {
|
||||||
|
return func(opts *kvStoreOptions) {
|
||||||
|
if opts == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
opts.ttl = ttl
|
||||||
|
opts.ttlSet = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newKVStoreOptions(opts ...KVStoreOption) kvStoreOptions {
|
||||||
|
var options kvStoreOptions
|
||||||
|
for _, opt := range opts {
|
||||||
|
if opt != nil {
|
||||||
|
opt(&options)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return options
|
||||||
|
}
|
||||||
|
|
||||||
type KVStore struct {
|
type KVStore struct {
|
||||||
logger mlogger.Logger
|
logger mlogger.Logger
|
||||||
@@ -25,18 +46,20 @@ type KVStore struct {
|
|||||||
bucket string
|
bucket string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string) (*KVStore, error) {
|
func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string, opts ...KVStoreOption) (*KVStore, error) {
|
||||||
if js == nil {
|
if js == nil {
|
||||||
return nil, errors.New("discovery kv: jetstream is nil")
|
return nil, errors.New("discovery kv: jetstream is nil")
|
||||||
}
|
}
|
||||||
if logger != nil {
|
if logger == nil {
|
||||||
logger = logger.Named("discovery_kv")
|
logger = zap.NewNop()
|
||||||
}
|
}
|
||||||
|
logger = logger.Named("discovery_kv")
|
||||||
bucket = strings.TrimSpace(bucket)
|
bucket = strings.TrimSpace(bucket)
|
||||||
if bucket == "" {
|
if bucket == "" {
|
||||||
bucket = DefaultKVBucket
|
bucket = DefaultKVBucket
|
||||||
}
|
}
|
||||||
ttl := kvTTL(logger)
|
options := newKVStoreOptions(opts...)
|
||||||
|
ttl := options.ttl
|
||||||
kv, err := js.KeyValue(bucket)
|
kv, err := js.KeyValue(bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, nats.ErrBucketNotFound) {
|
if errors.Is(err, nats.ErrBucketNotFound) {
|
||||||
@@ -46,9 +69,9 @@ func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string)
|
|||||||
History: 1,
|
History: 1,
|
||||||
TTL: ttl,
|
TTL: ttl,
|
||||||
})
|
})
|
||||||
if err == nil && logger != nil {
|
if err == nil {
|
||||||
fields := []zap.Field{zap.String("bucket", bucket)}
|
fields := []zap.Field{zap.String("bucket", bucket)}
|
||||||
if ttl > 0 {
|
if options.ttlSet {
|
||||||
fields = append(fields, zap.Duration("ttl", ttl))
|
fields = append(fields, zap.Duration("ttl", ttl))
|
||||||
}
|
}
|
||||||
logger.Info("Discovery KV bucket created", fields...)
|
logger.Info("Discovery KV bucket created", fields...)
|
||||||
@@ -57,7 +80,7 @@ func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else if ttl > 0 {
|
} else if options.ttlSet {
|
||||||
ensureKVTTL(logger, js, kv, bucket, ttl)
|
ensureKVTTL(logger, js, kv, bucket, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -68,33 +91,13 @@ func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string)
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func kvTTL(logger mlogger.Logger) time.Duration {
|
|
||||||
raw := strings.TrimSpace(os.Getenv(kvTTLSecondsEnv))
|
|
||||||
if raw == "" {
|
|
||||||
return time.Duration(defaultKVTTLSeconds) * time.Second
|
|
||||||
}
|
|
||||||
secs, err := strconv.Atoi(raw)
|
|
||||||
if err != nil {
|
|
||||||
if logger != nil {
|
|
||||||
logger.Warn("Invalid discovery KV TTL seconds, using default", zap.String("env", kvTTLSecondsEnv), zap.String("value", raw), zap.Int("default_seconds", defaultKVTTLSeconds))
|
|
||||||
}
|
|
||||||
return time.Duration(defaultKVTTLSeconds) * time.Second
|
|
||||||
}
|
|
||||||
if secs <= 0 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return time.Duration(secs) * time.Second
|
|
||||||
}
|
|
||||||
|
|
||||||
func ensureKVTTL(logger mlogger.Logger, js nats.JetStreamContext, kv nats.KeyValue, bucket string, ttl time.Duration) {
|
func ensureKVTTL(logger mlogger.Logger, js nats.JetStreamContext, kv nats.KeyValue, bucket string, ttl time.Duration) {
|
||||||
if kv == nil || js == nil || ttl <= 0 {
|
if kv == nil || js == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
status, err := kv.Status()
|
status, err := kv.Status()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger != nil {
|
|
||||||
logger.Warn("Failed to read discovery KV status", zap.String("bucket", bucket), zap.Error(err))
|
logger.Warn("Failed to read discovery KV status", zap.String("bucket", bucket), zap.Error(err))
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if status.TTL() == ttl {
|
if status.TTL() == ttl {
|
||||||
@@ -103,23 +106,17 @@ func ensureKVTTL(logger mlogger.Logger, js nats.JetStreamContext, kv nats.KeyVal
|
|||||||
stream := "KV_" + bucket
|
stream := "KV_" + bucket
|
||||||
info, err := js.StreamInfo(stream)
|
info, err := js.StreamInfo(stream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if logger != nil {
|
|
||||||
logger.Warn("Failed to read discovery KV stream info", zap.String("bucket", bucket), zap.String("stream", stream), zap.Error(err))
|
logger.Warn("Failed to read discovery KV stream info", zap.String("bucket", bucket), zap.String("stream", stream), zap.Error(err))
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cfg := info.Config
|
cfg := info.Config
|
||||||
cfg.MaxAge = ttl
|
cfg.MaxAge = ttl
|
||||||
if _, err := js.UpdateStream(&cfg); err != nil {
|
if _, err := js.UpdateStream(&cfg); err != nil {
|
||||||
if logger != nil {
|
|
||||||
logger.Warn("Failed to update discovery KV TTL", zap.String("bucket", bucket), zap.Duration("ttl", ttl), zap.Error(err))
|
logger.Warn("Failed to update discovery KV TTL", zap.String("bucket", bucket), zap.Duration("ttl", ttl), zap.Error(err))
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if logger != nil {
|
|
||||||
logger.Info("Discovery KV TTL updated", zap.String("bucket", bucket), zap.Duration("ttl", ttl))
|
logger.Info("Discovery KV TTL updated", zap.String("bucket", bucket), zap.Duration("ttl", ttl))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func (s *KVStore) Put(entry RegistryEntry) error {
|
func (s *KVStore) Put(entry RegistryEntry) error {
|
||||||
if s == nil || s.kv == nil {
|
if s == nil || s.kv == nil {
|
||||||
@@ -134,7 +131,7 @@ func (s *KVStore) Put(entry RegistryEntry) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = s.kv.Put(kvKeyFromRegistryKey(key), payload)
|
_, err = s.kv.Put(kvKeyFromRegistryKey(key), payload)
|
||||||
if err != nil && s.logger != nil {
|
if err != nil {
|
||||||
fields := append(entryFields(entry), zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err))
|
fields := append(entryFields(entry), zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err))
|
||||||
s.logger.Warn("Failed to persist discovery entry", fields...)
|
s.logger.Warn("Failed to persist discovery entry", fields...)
|
||||||
}
|
}
|
||||||
@@ -149,7 +146,7 @@ func (s *KVStore) Delete(id string) error {
|
|||||||
if key == "" {
|
if key == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err := s.kv.Delete(key); err != nil && s.logger != nil {
|
if err := s.kv.Delete(key); err != nil {
|
||||||
s.logger.Warn("Failed to delete discovery entry", zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err))
|
s.logger.Warn("Failed to delete discovery entry", zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,17 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type RegistryOption func(*RegistryService)
|
||||||
|
|
||||||
|
func WithRegistryKVTTL(ttl time.Duration) RegistryOption {
|
||||||
|
return func(s *RegistryService) {
|
||||||
|
if s == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.kvOptions = append(s.kvOptions, WithKVTTL(ttl))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type RegistryService struct {
|
type RegistryService struct {
|
||||||
logger mlogger.Logger
|
logger mlogger.Logger
|
||||||
registry *Registry
|
registry *Registry
|
||||||
@@ -25,6 +36,7 @@ type RegistryService struct {
|
|||||||
consumers []consumerHandler
|
consumers []consumerHandler
|
||||||
kv *KVStore
|
kv *KVStore
|
||||||
kvWatcher nats.KeyWatcher
|
kvWatcher nats.KeyWatcher
|
||||||
|
kvOptions []KVStoreOption
|
||||||
|
|
||||||
startOnce sync.Once
|
startOnce sync.Once
|
||||||
stopOnce sync.Once
|
stopOnce sync.Once
|
||||||
@@ -36,16 +48,17 @@ type consumerHandler struct {
|
|||||||
event string
|
event string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer, registry *Registry, sender string) (*RegistryService, error) {
|
func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer, registry *Registry, sender string, opts ...RegistryOption) (*RegistryService, error) {
|
||||||
if msgBroker == nil {
|
if msgBroker == nil {
|
||||||
return nil, errors.New("discovery registry: broker is nil")
|
return nil, errors.New("discovery registry: broker is nil")
|
||||||
}
|
}
|
||||||
if registry == nil {
|
if registry == nil {
|
||||||
registry = NewRegistry()
|
registry = NewRegistry()
|
||||||
}
|
}
|
||||||
if logger != nil {
|
if logger == nil {
|
||||||
logger = logger.Named("discovery_registry")
|
logger = zap.NewNop()
|
||||||
}
|
}
|
||||||
|
logger = logger.Named("discovery_registry")
|
||||||
sender = strings.TrimSpace(sender)
|
sender = strings.TrimSpace(sender)
|
||||||
if sender == "" {
|
if sender == "" {
|
||||||
sender = "discovery"
|
sender = "discovery"
|
||||||
@@ -74,6 +87,11 @@ func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg
|
|||||||
producer: producer,
|
producer: producer,
|
||||||
sender: sender,
|
sender: sender,
|
||||||
}
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
if opt != nil {
|
||||||
|
opt(svc)
|
||||||
|
}
|
||||||
|
}
|
||||||
svc.consumers = []consumerHandler{
|
svc.consumers = []consumerHandler{
|
||||||
{consumer: serviceConsumer, event: ServiceAnnounceEvent().ToString(), handler: func(ctx context.Context, env me.Envelope) error {
|
{consumer: serviceConsumer, event: ServiceAnnounceEvent().ToString(), handler: func(ctx context.Context, env me.Envelope) error {
|
||||||
return svc.handleAnnounce(ctx, env)
|
return svc.handleAnnounce(ctx, env)
|
||||||
@@ -103,7 +121,7 @@ func (s *RegistryService) Start() {
|
|||||||
for _, ch := range s.consumers {
|
for _, ch := range s.consumers {
|
||||||
ch := ch
|
ch := ch
|
||||||
go func() {
|
go func() {
|
||||||
if err := ch.consumer.ConsumeMessages(ch.handler); err != nil && s.logger != nil {
|
if err := ch.consumer.ConsumeMessages(ch.handler); err != nil {
|
||||||
s.logger.Warn("Discovery consumer stopped with error", zap.String("event", ch.event), zap.Error(err))
|
s.logger.Warn("Discovery consumer stopped with error", zap.String("event", ch.event), zap.Error(err))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -247,7 +265,7 @@ func (s *RegistryService) initKV(msgBroker mb.Broker) {
|
|||||||
s.logWarn("Discovery KV disabled: JetStream not configured")
|
s.logWarn("Discovery KV disabled: JetStream not configured")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
store, err := NewKVStore(s.logger, js, "")
|
store, err := NewKVStore(s.logger, js, "", s.kvOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logWarn("Failed to initialise discovery KV store", zap.Error(err))
|
s.logWarn("Failed to initialise discovery KV store", zap.Error(err))
|
||||||
return
|
return
|
||||||
@@ -331,21 +349,21 @@ func (s *RegistryService) persistEntry(entry RegistryEntry) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *RegistryService) logWarn(message string, fields ...zap.Field) {
|
func (s *RegistryService) logWarn(message string, fields ...zap.Field) {
|
||||||
if s.logger == nil {
|
if s == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.logger.Warn(message, fields...)
|
s.logger.Warn(message, fields...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *RegistryService) logDebug(message string, fields ...zap.Field) {
|
func (s *RegistryService) logDebug(message string, fields ...zap.Field) {
|
||||||
if s.logger == nil {
|
if s == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.logger.Debug(message, fields...)
|
s.logger.Debug(message, fields...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *RegistryService) logInfo(message string, fields ...zap.Field) {
|
func (s *RegistryService) logInfo(message string, fields ...zap.Field) {
|
||||||
if s.logger == nil {
|
if s == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.logger.Info(message, fields...)
|
s.logger.Info(message, fields...)
|
||||||
|
|||||||
@@ -28,9 +28,10 @@ func NewRegistryWatcher(logger mlogger.Logger, msgBroker mb.Broker, registry *Re
|
|||||||
if registry == nil {
|
if registry == nil {
|
||||||
registry = NewRegistry()
|
registry = NewRegistry()
|
||||||
}
|
}
|
||||||
if logger != nil {
|
if logger == nil {
|
||||||
logger = logger.Named("discovery_watcher")
|
return nil, errors.New("discovery logger: logger must be provided")
|
||||||
}
|
}
|
||||||
|
logger = logger.Named("discovery_watcher")
|
||||||
provider, ok := msgBroker.(jetStreamProvider)
|
provider, ok := msgBroker.(jetStreamProvider)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("discovery watcher: jetstream not available")
|
return nil, errors.New("discovery watcher: jetstream not available")
|
||||||
@@ -60,9 +61,7 @@ func (w *RegistryWatcher) Start() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.watcher = watcher
|
w.watcher = watcher
|
||||||
if w.logger != nil {
|
|
||||||
w.logger.Info("Discovery registry watcher started", zap.String("bucket", w.kv.Bucket()))
|
w.logger.Info("Discovery registry watcher started", zap.String("bucket", w.kv.Bucket()))
|
||||||
}
|
|
||||||
go w.consume(watcher)
|
go w.consume(watcher)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -75,9 +74,7 @@ func (w *RegistryWatcher) Stop() {
|
|||||||
if w.watcher != nil {
|
if w.watcher != nil {
|
||||||
_ = w.watcher.Stop()
|
_ = w.watcher.Stop()
|
||||||
}
|
}
|
||||||
if w.logger != nil {
|
|
||||||
w.logger.Info("Discovery registry watcher stopped")
|
w.logger.Info("Discovery registry watcher stopped")
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,7 +93,7 @@ func (w *RegistryWatcher) consume(watcher nats.KeyWatcher) {
|
|||||||
initialCount := 0
|
initialCount := 0
|
||||||
for entry := range watcher.Updates() {
|
for entry := range watcher.Updates() {
|
||||||
if entry == nil {
|
if entry == nil {
|
||||||
if initial && w.logger != nil {
|
if initial {
|
||||||
fields := []zap.Field{zap.Int("entries", initialCount)}
|
fields := []zap.Field{zap.Int("entries", initialCount)}
|
||||||
if w.kv != nil {
|
if w.kv != nil {
|
||||||
fields = append(fields, zap.String("bucket", w.kv.Bucket()))
|
fields = append(fields, zap.String("bucket", w.kv.Bucket()))
|
||||||
@@ -113,7 +110,7 @@ func (w *RegistryWatcher) consume(watcher nats.KeyWatcher) {
|
|||||||
case nats.KeyValueDelete, nats.KeyValuePurge:
|
case nats.KeyValueDelete, nats.KeyValuePurge:
|
||||||
key := registryKeyFromKVKey(entry.Key())
|
key := registryKeyFromKVKey(entry.Key())
|
||||||
if key != "" {
|
if key != "" {
|
||||||
if w.registry.Delete(key) && w.logger != nil {
|
if w.registry.Delete(key) {
|
||||||
w.logger.Info("Discovery registry entry removed", zap.String("key", key))
|
w.logger.Info("Discovery registry entry removed", zap.String("key", key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -125,13 +122,11 @@ func (w *RegistryWatcher) consume(watcher nats.KeyWatcher) {
|
|||||||
|
|
||||||
var payload RegistryEntry
|
var payload RegistryEntry
|
||||||
if err := json.Unmarshal(entry.Value(), &payload); err != nil {
|
if err := json.Unmarshal(entry.Value(), &payload); err != nil {
|
||||||
if w.logger != nil {
|
|
||||||
w.logger.Warn("Failed to decode discovery KV entry", zap.String("key", entry.Key()), zap.Error(err))
|
w.logger.Warn("Failed to decode discovery KV entry", zap.String("key", entry.Key()), zap.Error(err))
|
||||||
}
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
result := w.registry.UpsertEntry(payload, time.Now())
|
result := w.registry.UpsertEntry(payload, time.Now())
|
||||||
if w.logger != nil && (result.IsNew || result.BecameHealthy) {
|
if result.IsNew || result.BecameHealthy {
|
||||||
fields := append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy))
|
fields := append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy))
|
||||||
w.logger.Info("Discovery registry entry updated from KV", fields...)
|
w.logger.Info("Discovery registry entry updated from KV", fields...)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user