From 4da9e0b522331fc217b4b7f994f5920f8be690b5 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Tue, 6 Jan 2026 16:05:20 +0100 Subject: [PATCH] fixed excessive logging non-nil checks) --- api/discovery/config.yml | 3 + .../internal/server/internal/config.go | 5 + .../internal/server/internal/discovery.go | 13 ++- api/pkg/discovery/announcer.go | 9 +- api/pkg/discovery/client.go | 9 +- api/pkg/discovery/kv.go | 91 +++++++++---------- api/pkg/discovery/service.go | 34 +++++-- api/pkg/discovery/watcher.go | 23 ++--- 8 files changed, 109 insertions(+), 78 deletions(-) diff --git a/api/discovery/config.yml b/api/discovery/config.yml index a022be6..e840d01 100644 --- a/api/discovery/config.yml +++ b/api/discovery/config.yml @@ -15,3 +15,6 @@ messaging: broker_name: Discovery Service max_reconnects: 10 reconnect_wait: 5 + +registry: + kv_ttl_seconds: 3600 diff --git a/api/discovery/internal/server/internal/config.go b/api/discovery/internal/server/internal/config.go index 60eaceb..05619da 100644 --- a/api/discovery/internal/server/internal/config.go +++ b/api/discovery/internal/server/internal/config.go @@ -16,12 +16,17 @@ type config struct { Runtime *grpcapp.RuntimeConfig `yaml:"runtime"` Messaging *msg.Config `yaml:"messaging"` Metrics *metricsConfig `yaml:"metrics"` + Registry *registryConfig `yaml:"registry"` } type metricsConfig struct { Address string `yaml:"address"` } +type registryConfig struct { + KVTTLSeconds *int `yaml:"kv_ttl_seconds"` +} + func (i *Imp) loadConfig() (*config, error) { data, err := os.ReadFile(i.file) if err != nil { diff --git a/api/discovery/internal/server/internal/discovery.go b/api/discovery/internal/server/internal/discovery.go index eee5f76..e463f13 100644 --- a/api/discovery/internal/server/internal/discovery.go +++ b/api/discovery/internal/server/internal/discovery.go @@ -1,6 +1,8 @@ package serverimp import ( + "time" + "github.com/tech/sendico/discovery/internal/appversion" "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" @@ -23,7 +25,16 @@ func (i *Imp) startDiscovery(cfg *config) error { producer := msgproducer.NewProducer(i.logger.Named("discovery_producer"), broker) registry := discovery.NewRegistry() - svc, err := discovery.NewRegistryService(i.logger, broker, producer, registry, string(mservice.Discovery)) + var registryOpts []discovery.RegistryOption + if cfg.Registry != nil && cfg.Registry.KVTTLSeconds != nil { + ttlSeconds := *cfg.Registry.KVTTLSeconds + if ttlSeconds < 0 { + i.logger.Warn("Discovery registry TTL is negative, disabling TTL", zap.Int("ttl_seconds", ttlSeconds)) + ttlSeconds = 0 + } + registryOpts = append(registryOpts, discovery.WithRegistryKVTTL(time.Duration(ttlSeconds)*time.Second)) + } + svc, err := discovery.NewRegistryService(i.logger, broker, producer, registry, string(mservice.Discovery), registryOpts...) if err != nil { return err } diff --git a/api/pkg/discovery/announcer.go b/api/pkg/discovery/announcer.go index 1ec9e31..08482ac 100644 --- a/api/pkg/discovery/announcer.go +++ b/api/pkg/discovery/announcer.go @@ -25,9 +25,10 @@ type Announcer struct { } func NewAnnouncer(logger mlogger.Logger, producer msg.Producer, sender string, announce Announcement) *Announcer { - if logger != nil { - logger = logger.Named("discovery") + if logger == nil { + logger = zap.NewNop() } + logger = logger.Named("discovery") announce = normalizeAnnouncement(announce) if announce.Service == "" { announce.Service = strings.TrimSpace(sender) @@ -132,14 +133,14 @@ func (a *Announcer) sendHeartbeat() { } func (a *Announcer) logInfo(message string, fields ...zap.Field) { - if a.logger == nil { + if a == nil { return } a.logger.Info(message, fields...) } func (a *Announcer) logWarn(message string, fields ...zap.Field) { - if a.logger == nil { + if a == nil { return } a.logger.Warn(message, fields...) diff --git a/api/pkg/discovery/client.go b/api/pkg/discovery/client.go index b82f56d..d3cb380 100644 --- a/api/pkg/discovery/client.go +++ b/api/pkg/discovery/client.go @@ -31,9 +31,10 @@ func NewClient(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer if msgBroker == nil { return nil, errors.New("discovery client: broker is nil") } - if logger != nil { - logger = logger.Named("discovery_client") + if logger == nil { + logger = zap.NewNop() } + logger = logger.Named("discovery_client") if producer == nil { producer = msgproducer.NewProducer(logger, msgBroker) } @@ -56,7 +57,7 @@ func NewClient(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer } go func() { - if err := consumer.ConsumeMessages(client.handleLookupResponse); err != nil && client.logger != nil { + if err := consumer.ConsumeMessages(client.handleLookupResponse); err != nil { client.logger.Warn("Discovery lookup consumer stopped", zap.String("event", LookupResponseEvent().ToString()), zap.Error(err)) } }() @@ -131,7 +132,7 @@ func (c *Client) handleLookupResponse(_ context.Context, env me.Envelope) error } func (c *Client) logWarn(message string, fields ...zap.Field) { - if c == nil || c.logger == nil { + if c == nil { return } c.logger.Warn(message, fields...) diff --git a/api/pkg/discovery/kv.go b/api/pkg/discovery/kv.go index d168283..650e17a 100644 --- a/api/pkg/discovery/kv.go +++ b/api/pkg/discovery/kv.go @@ -3,8 +3,6 @@ package discovery import ( "encoding/json" "errors" - "os" - "strconv" "strings" "time" @@ -13,11 +11,34 @@ import ( "go.uber.org/zap" ) -const ( - DefaultKVBucket = "discovery_registry" - kvTTLSecondsEnv = "DISCOVERY_KV_TTL_SECONDS" - defaultKVTTLSeconds = 3600 -) +const DefaultKVBucket = "discovery_registry" + +type kvStoreOptions struct { + ttl time.Duration + ttlSet bool +} + +type KVStoreOption func(*kvStoreOptions) + +func WithKVTTL(ttl time.Duration) KVStoreOption { + return func(opts *kvStoreOptions) { + if opts == nil { + return + } + opts.ttl = ttl + opts.ttlSet = true + } +} + +func newKVStoreOptions(opts ...KVStoreOption) kvStoreOptions { + var options kvStoreOptions + for _, opt := range opts { + if opt != nil { + opt(&options) + } + } + return options +} type KVStore struct { logger mlogger.Logger @@ -25,18 +46,20 @@ type KVStore struct { bucket string } -func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string) (*KVStore, error) { +func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string, opts ...KVStoreOption) (*KVStore, error) { if js == nil { return nil, errors.New("discovery kv: jetstream is nil") } - if logger != nil { - logger = logger.Named("discovery_kv") + if logger == nil { + logger = zap.NewNop() } + logger = logger.Named("discovery_kv") bucket = strings.TrimSpace(bucket) if bucket == "" { bucket = DefaultKVBucket } - ttl := kvTTL(logger) + options := newKVStoreOptions(opts...) + ttl := options.ttl kv, err := js.KeyValue(bucket) if err != nil { if errors.Is(err, nats.ErrBucketNotFound) { @@ -46,9 +69,9 @@ func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string) History: 1, TTL: ttl, }) - if err == nil && logger != nil { + if err == nil { fields := []zap.Field{zap.String("bucket", bucket)} - if ttl > 0 { + if options.ttlSet { fields = append(fields, zap.Duration("ttl", ttl)) } logger.Info("Discovery KV bucket created", fields...) @@ -57,7 +80,7 @@ func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string) if err != nil { return nil, err } - } else if ttl > 0 { + } else if options.ttlSet { ensureKVTTL(logger, js, kv, bucket, ttl) } @@ -68,33 +91,13 @@ func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string) }, nil } -func kvTTL(logger mlogger.Logger) time.Duration { - raw := strings.TrimSpace(os.Getenv(kvTTLSecondsEnv)) - if raw == "" { - return time.Duration(defaultKVTTLSeconds) * time.Second - } - secs, err := strconv.Atoi(raw) - if err != nil { - if logger != nil { - logger.Warn("Invalid discovery KV TTL seconds, using default", zap.String("env", kvTTLSecondsEnv), zap.String("value", raw), zap.Int("default_seconds", defaultKVTTLSeconds)) - } - return time.Duration(defaultKVTTLSeconds) * time.Second - } - if secs <= 0 { - return 0 - } - return time.Duration(secs) * time.Second -} - func ensureKVTTL(logger mlogger.Logger, js nats.JetStreamContext, kv nats.KeyValue, bucket string, ttl time.Duration) { - if kv == nil || js == nil || ttl <= 0 { + if kv == nil || js == nil { return } status, err := kv.Status() if err != nil { - if logger != nil { - logger.Warn("Failed to read discovery KV status", zap.String("bucket", bucket), zap.Error(err)) - } + logger.Warn("Failed to read discovery KV status", zap.String("bucket", bucket), zap.Error(err)) return } if status.TTL() == ttl { @@ -103,22 +106,16 @@ func ensureKVTTL(logger mlogger.Logger, js nats.JetStreamContext, kv nats.KeyVal stream := "KV_" + bucket info, err := js.StreamInfo(stream) if err != nil { - if logger != nil { - logger.Warn("Failed to read discovery KV stream info", zap.String("bucket", bucket), zap.String("stream", stream), zap.Error(err)) - } + logger.Warn("Failed to read discovery KV stream info", zap.String("bucket", bucket), zap.String("stream", stream), zap.Error(err)) return } cfg := info.Config cfg.MaxAge = ttl if _, err := js.UpdateStream(&cfg); err != nil { - if logger != nil { - logger.Warn("Failed to update discovery KV TTL", zap.String("bucket", bucket), zap.Duration("ttl", ttl), zap.Error(err)) - } + logger.Warn("Failed to update discovery KV TTL", zap.String("bucket", bucket), zap.Duration("ttl", ttl), zap.Error(err)) return } - if logger != nil { - logger.Info("Discovery KV TTL updated", zap.String("bucket", bucket), zap.Duration("ttl", ttl)) - } + logger.Info("Discovery KV TTL updated", zap.String("bucket", bucket), zap.Duration("ttl", ttl)) } func (s *KVStore) Put(entry RegistryEntry) error { @@ -134,7 +131,7 @@ func (s *KVStore) Put(entry RegistryEntry) error { return err } _, err = s.kv.Put(kvKeyFromRegistryKey(key), payload) - if err != nil && s.logger != nil { + if err != nil { fields := append(entryFields(entry), zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err)) s.logger.Warn("Failed to persist discovery entry", fields...) } @@ -149,7 +146,7 @@ func (s *KVStore) Delete(id string) error { if key == "" { return nil } - if err := s.kv.Delete(key); err != nil && s.logger != nil { + if err := s.kv.Delete(key); err != nil { s.logger.Warn("Failed to delete discovery entry", zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err)) return err } diff --git a/api/pkg/discovery/service.go b/api/pkg/discovery/service.go index faf1796..99525a5 100644 --- a/api/pkg/discovery/service.go +++ b/api/pkg/discovery/service.go @@ -17,6 +17,17 @@ import ( "go.uber.org/zap" ) +type RegistryOption func(*RegistryService) + +func WithRegistryKVTTL(ttl time.Duration) RegistryOption { + return func(s *RegistryService) { + if s == nil { + return + } + s.kvOptions = append(s.kvOptions, WithKVTTL(ttl)) + } +} + type RegistryService struct { logger mlogger.Logger registry *Registry @@ -25,6 +36,7 @@ type RegistryService struct { consumers []consumerHandler kv *KVStore kvWatcher nats.KeyWatcher + kvOptions []KVStoreOption startOnce sync.Once stopOnce sync.Once @@ -36,16 +48,17 @@ type consumerHandler struct { event string } -func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer, registry *Registry, sender string) (*RegistryService, error) { +func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer, registry *Registry, sender string, opts ...RegistryOption) (*RegistryService, error) { if msgBroker == nil { return nil, errors.New("discovery registry: broker is nil") } if registry == nil { registry = NewRegistry() } - if logger != nil { - logger = logger.Named("discovery_registry") + if logger == nil { + logger = zap.NewNop() } + logger = logger.Named("discovery_registry") sender = strings.TrimSpace(sender) if sender == "" { sender = "discovery" @@ -74,6 +87,11 @@ func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg producer: producer, sender: sender, } + for _, opt := range opts { + if opt != nil { + opt(svc) + } + } svc.consumers = []consumerHandler{ {consumer: serviceConsumer, event: ServiceAnnounceEvent().ToString(), handler: func(ctx context.Context, env me.Envelope) error { return svc.handleAnnounce(ctx, env) @@ -103,7 +121,7 @@ func (s *RegistryService) Start() { for _, ch := range s.consumers { ch := ch go func() { - if err := ch.consumer.ConsumeMessages(ch.handler); err != nil && s.logger != nil { + if err := ch.consumer.ConsumeMessages(ch.handler); err != nil { s.logger.Warn("Discovery consumer stopped with error", zap.String("event", ch.event), zap.Error(err)) } }() @@ -247,7 +265,7 @@ func (s *RegistryService) initKV(msgBroker mb.Broker) { s.logWarn("Discovery KV disabled: JetStream not configured") return } - store, err := NewKVStore(s.logger, js, "") + store, err := NewKVStore(s.logger, js, "", s.kvOptions...) if err != nil { s.logWarn("Failed to initialise discovery KV store", zap.Error(err)) return @@ -331,21 +349,21 @@ func (s *RegistryService) persistEntry(entry RegistryEntry) { } func (s *RegistryService) logWarn(message string, fields ...zap.Field) { - if s.logger == nil { + if s == nil { return } s.logger.Warn(message, fields...) } func (s *RegistryService) logDebug(message string, fields ...zap.Field) { - if s.logger == nil { + if s == nil { return } s.logger.Debug(message, fields...) } func (s *RegistryService) logInfo(message string, fields ...zap.Field) { - if s.logger == nil { + if s == nil { return } s.logger.Info(message, fields...) diff --git a/api/pkg/discovery/watcher.go b/api/pkg/discovery/watcher.go index 9a80b54..0b84acf 100644 --- a/api/pkg/discovery/watcher.go +++ b/api/pkg/discovery/watcher.go @@ -28,9 +28,10 @@ func NewRegistryWatcher(logger mlogger.Logger, msgBroker mb.Broker, registry *Re if registry == nil { registry = NewRegistry() } - if logger != nil { - logger = logger.Named("discovery_watcher") + if logger == nil { + return nil, errors.New("discovery logger: logger must be provided") } + logger = logger.Named("discovery_watcher") provider, ok := msgBroker.(jetStreamProvider) if !ok { return nil, errors.New("discovery watcher: jetstream not available") @@ -60,9 +61,7 @@ func (w *RegistryWatcher) Start() error { return err } w.watcher = watcher - if w.logger != nil { - w.logger.Info("Discovery registry watcher started", zap.String("bucket", w.kv.Bucket())) - } + w.logger.Info("Discovery registry watcher started", zap.String("bucket", w.kv.Bucket())) go w.consume(watcher) return nil } @@ -75,9 +74,7 @@ func (w *RegistryWatcher) Stop() { if w.watcher != nil { _ = w.watcher.Stop() } - if w.logger != nil { - w.logger.Info("Discovery registry watcher stopped") - } + w.logger.Info("Discovery registry watcher stopped") }) } @@ -96,7 +93,7 @@ func (w *RegistryWatcher) consume(watcher nats.KeyWatcher) { initialCount := 0 for entry := range watcher.Updates() { if entry == nil { - if initial && w.logger != nil { + if initial { fields := []zap.Field{zap.Int("entries", initialCount)} if w.kv != nil { fields = append(fields, zap.String("bucket", w.kv.Bucket())) @@ -113,7 +110,7 @@ func (w *RegistryWatcher) consume(watcher nats.KeyWatcher) { case nats.KeyValueDelete, nats.KeyValuePurge: key := registryKeyFromKVKey(entry.Key()) if key != "" { - if w.registry.Delete(key) && w.logger != nil { + if w.registry.Delete(key) { w.logger.Info("Discovery registry entry removed", zap.String("key", key)) } } @@ -125,13 +122,11 @@ func (w *RegistryWatcher) consume(watcher nats.KeyWatcher) { var payload RegistryEntry if err := json.Unmarshal(entry.Value(), &payload); err != nil { - if w.logger != nil { - w.logger.Warn("Failed to decode discovery KV entry", zap.String("key", entry.Key()), zap.Error(err)) - } + w.logger.Warn("Failed to decode discovery KV entry", zap.String("key", entry.Key()), zap.Error(err)) continue } result := w.registry.UpsertEntry(payload, time.Now()) - if w.logger != nil && (result.IsNew || result.BecameHealthy) { + if result.IsNew || result.BecameHealthy { fields := append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy)) w.logger.Info("Discovery registry entry updated from KV", fields...) } -- 2.49.1