From fc5d44364bae014f6d38bd066c9a4e258f78cdd2 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Fri, 27 Feb 2026 02:53:55 +0100 Subject: [PATCH] +autoremoval of stale discovery records --- api/pkg/discovery/registry.go | 34 ++++++++ api/pkg/discovery/registry_stale_test.go | 105 +++++++++++++++++++++++ api/pkg/discovery/service.go | 98 ++++++++++++++++++--- 3 files changed, 225 insertions(+), 12 deletions(-) create mode 100644 api/pkg/discovery/registry_stale_test.go diff --git a/api/pkg/discovery/registry.go b/api/pkg/discovery/registry.go index 921c7f2e..c94aaa02 100644 --- a/api/pkg/discovery/registry.go +++ b/api/pkg/discovery/registry.go @@ -181,6 +181,26 @@ func (r *Registry) Delete(key string) bool { return true } +// DeleteStale removes entries whose heartbeat timeout has elapsed. +func (r *Registry) DeleteStale(now time.Time) []string { + r.mu.Lock() + defer r.mu.Unlock() + + removed := make([]string, 0) + for key, entry := range r.entries { + if entry == nil || !entry.isStaleAt(now) { + continue + } + delete(r.entries, key) + r.unindexEntry(key, entry) + removed = append(removed, key) + } + if len(removed) == 0 { + return nil + } + return removed +} + func (r *Registry) List(now time.Time, onlyHealthy bool) []RegistryEntry { r.mu.Lock() defer r.mu.Unlock() @@ -789,3 +809,17 @@ func (e *RegistryEntry) isHealthyAt(now time.Time) bool { } return now.Sub(e.LastHeartbeat) <= timeout } + +func (e *RegistryEntry) isStaleAt(now time.Time) bool { + if e == nil { + return false + } + if e.LastHeartbeat.IsZero() { + return true + } + timeout := time.Duration(e.Health.TimeoutSec) * time.Second + if timeout <= 0 { + timeout = time.Duration(DefaultHealthTimeoutSec) * time.Second + } + return now.Sub(e.LastHeartbeat) > timeout +} diff --git a/api/pkg/discovery/registry_stale_test.go b/api/pkg/discovery/registry_stale_test.go new file mode 100644 index 00000000..30b10cf3 --- /dev/null +++ b/api/pkg/discovery/registry_stale_test.go @@ -0,0 +1,105 @@ +package discovery + +import ( + "testing" + "time" +) + +func TestRegistryDeleteStale_RemovesOnlyExpiredEntries(t *testing.T) { + now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC) + registry := NewRegistry() + + stale := RegistryEntry{ + ID: "stale-gateway", + InstanceID: "stale-instance", + Service: "CARD", + Rail: RailCardPayout, + Operations: []string{OperationSend}, + Version: "3.0.0", + Status: "ok", + Health: HealthParams{TimeoutSec: 30}, + LastHeartbeat: now.Add(-31 * time.Second), + } + fresh := RegistryEntry{ + ID: "fresh-gateway", + InstanceID: "fresh-instance", + Service: "CARD", + Rail: RailCardPayout, + Operations: []string{OperationSend}, + Version: "3.0.0", + Status: "ok", + Health: HealthParams{TimeoutSec: 30}, + LastHeartbeat: now.Add(-5 * time.Second), + } + unhealthyButRecent := RegistryEntry{ + ID: "degraded-gateway", + InstanceID: "degraded-instance", + Service: "CARD", + Rail: RailCardPayout, + Operations: []string{OperationSend}, + Version: "3.0.0", + Status: "degraded", + Health: HealthParams{TimeoutSec: 30}, + LastHeartbeat: now.Add(-5 * time.Second), + } + + registry.UpsertEntry(stale, now) + registry.UpsertEntry(fresh, now) + registry.UpsertEntry(unhealthyButRecent, now) + + removed := registry.DeleteStale(now) + if len(removed) != 1 { + t.Fatalf("unexpected removed count: got=%d want=1", len(removed)) + } + + wantKey := registryEntryKey(normalizeEntry(stale)) + if removed[0] != wantKey { + t.Fatalf("unexpected removed key: got=%q want=%q", removed[0], wantKey) + } + + all := registry.List(now, false) + if len(all) != 2 { + t.Fatalf("unexpected remaining entries count: got=%d want=2", len(all)) + } +} + +func TestRegistryDeleteStale_UsesPerEntryTimeout(t *testing.T) { + now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC) + registry := NewRegistry() + + shortTimeout := RegistryEntry{ + ID: "short-timeout", + InstanceID: "short-timeout", + Service: "CARD", + Rail: RailCardPayout, + Operations: []string{OperationSend}, + Version: "3.0.0", + Status: "ok", + Health: HealthParams{TimeoutSec: 5}, + LastHeartbeat: now.Add(-21 * time.Second), + } + longTimeout := RegistryEntry{ + ID: "long-timeout", + InstanceID: "long-timeout", + Service: "CARD", + Rail: RailCardPayout, + Operations: []string{OperationSend}, + Version: "3.0.0", + Status: "ok", + Health: HealthParams{TimeoutSec: 60}, + LastHeartbeat: now.Add(-6 * time.Second), + } + + registry.UpsertEntry(shortTimeout, now) + registry.UpsertEntry(longTimeout, now) + + removed := registry.DeleteStale(now) + if len(removed) != 1 { + t.Fatalf("unexpected removed count: got=%d want=1", len(removed)) + } + + wantKey := registryEntryKey(normalizeEntry(shortTimeout)) + if removed[0] != wantKey { + t.Fatalf("unexpected removed key: got=%q want=%q", removed[0], wantKey) + } +} diff --git a/api/pkg/discovery/service.go b/api/pkg/discovery/service.go index 94715326..7e718f83 100644 --- a/api/pkg/discovery/service.go +++ b/api/pkg/discovery/service.go @@ -3,6 +3,7 @@ package discovery import ( "context" "encoding/json" + "errors" "strings" "sync" "time" @@ -17,6 +18,8 @@ import ( "go.uber.org/zap" ) +const DefaultRegistryStaleCleanupInterval = time.Duration(DefaultHealthIntervalSec) * time.Second + type RegistryOption func(*RegistryService) func WithRegistryKVTTL(ttl time.Duration) RegistryOption { @@ -28,15 +31,28 @@ func WithRegistryKVTTL(ttl time.Duration) RegistryOption { } } +// WithRegistryStaleCleanupInterval configures periodic stale-entry eviction. +func WithRegistryStaleCleanupInterval(interval time.Duration) RegistryOption { + return func(s *RegistryService) { + if s == nil { + return + } + s.staleCleanupInterval = interval + } +} + type RegistryService struct { - logger mlogger.Logger - registry *Registry - producer msg.Producer - sender string - consumers []consumerHandler - kv *KVStore - kvWatcher nats.KeyWatcher - kvOptions []KVStoreOption + logger mlogger.Logger + registry *Registry + producer msg.Producer + sender string + consumers []consumerHandler + kv *KVStore + kvWatcher nats.KeyWatcher + kvOptions []KVStoreOption + staleCleanupInterval time.Duration + staleCleanupStop chan struct{} + staleCleanupDone chan struct{} startOnce sync.Once stopOnce sync.Once @@ -83,10 +99,11 @@ func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg } svc := &RegistryService{ - logger: logger, - registry: registry, - producer: producer, - sender: sender, + logger: logger, + registry: registry, + producer: producer, + sender: sender, + staleCleanupInterval: DefaultRegistryStaleCleanupInterval, } for _, opt := range opts { if opt != nil { @@ -128,6 +145,7 @@ func (s *RegistryService) Start() { }() } s.startKVWatch() + s.startStaleCleanup() }) } @@ -144,6 +162,12 @@ func (s *RegistryService) Stop() { if s.kvWatcher != nil { _ = s.kvWatcher.Stop() } + if s.staleCleanupStop != nil { + close(s.staleCleanupStop) + } + if s.staleCleanupDone != nil { + <-s.staleCleanupDone + } s.logInfo("Discovery registry service stopped") }) } @@ -305,6 +329,56 @@ func (s *RegistryService) startKVWatch() { go s.consumeKVUpdates(watcher) } +func (s *RegistryService) startStaleCleanup() { + if s == nil || s.registry == nil || s.staleCleanupInterval <= 0 { + return + } + + s.staleCleanupStop = make(chan struct{}) + s.staleCleanupDone = make(chan struct{}) + + go func() { + defer close(s.staleCleanupDone) + + ticker := time.NewTicker(s.staleCleanupInterval) + defer ticker.Stop() + + s.pruneStaleEntries(time.Now()) + for { + select { + case <-ticker.C: + s.pruneStaleEntries(time.Now()) + case <-s.staleCleanupStop: + return + } + } + }() +} + +func (s *RegistryService) pruneStaleEntries(now time.Time) { + if s == nil || s.registry == nil { + return + } + + removedKeys := s.registry.DeleteStale(now) + if len(removedKeys) == 0 { + return + } + + for _, key := range removedKeys { + if strings.TrimSpace(key) == "" { + continue + } + s.logInfo("Discovery registry stale entry removed", zap.String("key", key)) + if s.kv == nil { + continue + } + if err := s.kv.Delete(key); err != nil && !errors.Is(err, nats.ErrKeyNotFound) { + s.logWarn("Failed to delete stale discovery KV entry", zap.String("key", key), zap.Error(err)) + } + } +} + func (s *RegistryService) consumeKVUpdates(watcher nats.KeyWatcher) { if s == nil || watcher == nil { return -- 2.49.1