Merge pull request '+autoremoval of stale discovery records' (#562) from discovery-566 into main
Some checks failed
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/discovery Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline failed
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/notification Pipeline is pending
ci/woodpecker/push/payments_methods Pipeline is pending
ci/woodpecker/push/payments_orchestrator Pipeline is pending
ci/woodpecker/push/payments_quotation Pipeline is pending
ci/woodpecker/push/billing_documents Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/gateway_chain Pipeline failed
ci/woodpecker/push/gateway_mntx Pipeline was successful
ci/woodpecker/push/gateway_tgsettle Pipeline failed
ci/woodpecker/push/ledger Pipeline failed
ci/woodpecker/push/gateway_tron Pipeline failed
Some checks failed
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/discovery Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline failed
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/notification Pipeline is pending
ci/woodpecker/push/payments_methods Pipeline is pending
ci/woodpecker/push/payments_orchestrator Pipeline is pending
ci/woodpecker/push/payments_quotation Pipeline is pending
ci/woodpecker/push/billing_documents Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/gateway_chain Pipeline failed
ci/woodpecker/push/gateway_mntx Pipeline was successful
ci/woodpecker/push/gateway_tgsettle Pipeline failed
ci/woodpecker/push/ledger Pipeline failed
ci/woodpecker/push/gateway_tron Pipeline failed
Reviewed-on: #562
This commit was merged in pull request #562.
This commit is contained in:
@@ -181,6 +181,26 @@ func (r *Registry) Delete(key string) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteStale removes entries whose heartbeat timeout has elapsed.
|
||||||
|
func (r *Registry) DeleteStale(now time.Time) []string {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
removed := make([]string, 0)
|
||||||
|
for key, entry := range r.entries {
|
||||||
|
if entry == nil || !entry.isStaleAt(now) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
delete(r.entries, key)
|
||||||
|
r.unindexEntry(key, entry)
|
||||||
|
removed = append(removed, key)
|
||||||
|
}
|
||||||
|
if len(removed) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return removed
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Registry) List(now time.Time, onlyHealthy bool) []RegistryEntry {
|
func (r *Registry) List(now time.Time, onlyHealthy bool) []RegistryEntry {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
defer r.mu.Unlock()
|
defer r.mu.Unlock()
|
||||||
@@ -789,3 +809,17 @@ func (e *RegistryEntry) isHealthyAt(now time.Time) bool {
|
|||||||
}
|
}
|
||||||
return now.Sub(e.LastHeartbeat) <= timeout
|
return now.Sub(e.LastHeartbeat) <= timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *RegistryEntry) isStaleAt(now time.Time) bool {
|
||||||
|
if e == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if e.LastHeartbeat.IsZero() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
timeout := time.Duration(e.Health.TimeoutSec) * time.Second
|
||||||
|
if timeout <= 0 {
|
||||||
|
timeout = time.Duration(DefaultHealthTimeoutSec) * time.Second
|
||||||
|
}
|
||||||
|
return now.Sub(e.LastHeartbeat) > timeout
|
||||||
|
}
|
||||||
|
|||||||
105
api/pkg/discovery/registry_stale_test.go
Normal file
105
api/pkg/discovery/registry_stale_test.go
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
package discovery
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRegistryDeleteStale_RemovesOnlyExpiredEntries(t *testing.T) {
|
||||||
|
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
|
||||||
|
registry := NewRegistry()
|
||||||
|
|
||||||
|
stale := RegistryEntry{
|
||||||
|
ID: "stale-gateway",
|
||||||
|
InstanceID: "stale-instance",
|
||||||
|
Service: "CARD",
|
||||||
|
Rail: RailCardPayout,
|
||||||
|
Operations: []string{OperationSend},
|
||||||
|
Version: "3.0.0",
|
||||||
|
Status: "ok",
|
||||||
|
Health: HealthParams{TimeoutSec: 30},
|
||||||
|
LastHeartbeat: now.Add(-31 * time.Second),
|
||||||
|
}
|
||||||
|
fresh := RegistryEntry{
|
||||||
|
ID: "fresh-gateway",
|
||||||
|
InstanceID: "fresh-instance",
|
||||||
|
Service: "CARD",
|
||||||
|
Rail: RailCardPayout,
|
||||||
|
Operations: []string{OperationSend},
|
||||||
|
Version: "3.0.0",
|
||||||
|
Status: "ok",
|
||||||
|
Health: HealthParams{TimeoutSec: 30},
|
||||||
|
LastHeartbeat: now.Add(-5 * time.Second),
|
||||||
|
}
|
||||||
|
unhealthyButRecent := RegistryEntry{
|
||||||
|
ID: "degraded-gateway",
|
||||||
|
InstanceID: "degraded-instance",
|
||||||
|
Service: "CARD",
|
||||||
|
Rail: RailCardPayout,
|
||||||
|
Operations: []string{OperationSend},
|
||||||
|
Version: "3.0.0",
|
||||||
|
Status: "degraded",
|
||||||
|
Health: HealthParams{TimeoutSec: 30},
|
||||||
|
LastHeartbeat: now.Add(-5 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
registry.UpsertEntry(stale, now)
|
||||||
|
registry.UpsertEntry(fresh, now)
|
||||||
|
registry.UpsertEntry(unhealthyButRecent, now)
|
||||||
|
|
||||||
|
removed := registry.DeleteStale(now)
|
||||||
|
if len(removed) != 1 {
|
||||||
|
t.Fatalf("unexpected removed count: got=%d want=1", len(removed))
|
||||||
|
}
|
||||||
|
|
||||||
|
wantKey := registryEntryKey(normalizeEntry(stale))
|
||||||
|
if removed[0] != wantKey {
|
||||||
|
t.Fatalf("unexpected removed key: got=%q want=%q", removed[0], wantKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
all := registry.List(now, false)
|
||||||
|
if len(all) != 2 {
|
||||||
|
t.Fatalf("unexpected remaining entries count: got=%d want=2", len(all))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryDeleteStale_UsesPerEntryTimeout(t *testing.T) {
|
||||||
|
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
|
||||||
|
registry := NewRegistry()
|
||||||
|
|
||||||
|
shortTimeout := RegistryEntry{
|
||||||
|
ID: "short-timeout",
|
||||||
|
InstanceID: "short-timeout",
|
||||||
|
Service: "CARD",
|
||||||
|
Rail: RailCardPayout,
|
||||||
|
Operations: []string{OperationSend},
|
||||||
|
Version: "3.0.0",
|
||||||
|
Status: "ok",
|
||||||
|
Health: HealthParams{TimeoutSec: 5},
|
||||||
|
LastHeartbeat: now.Add(-21 * time.Second),
|
||||||
|
}
|
||||||
|
longTimeout := RegistryEntry{
|
||||||
|
ID: "long-timeout",
|
||||||
|
InstanceID: "long-timeout",
|
||||||
|
Service: "CARD",
|
||||||
|
Rail: RailCardPayout,
|
||||||
|
Operations: []string{OperationSend},
|
||||||
|
Version: "3.0.0",
|
||||||
|
Status: "ok",
|
||||||
|
Health: HealthParams{TimeoutSec: 60},
|
||||||
|
LastHeartbeat: now.Add(-6 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
registry.UpsertEntry(shortTimeout, now)
|
||||||
|
registry.UpsertEntry(longTimeout, now)
|
||||||
|
|
||||||
|
removed := registry.DeleteStale(now)
|
||||||
|
if len(removed) != 1 {
|
||||||
|
t.Fatalf("unexpected removed count: got=%d want=1", len(removed))
|
||||||
|
}
|
||||||
|
|
||||||
|
wantKey := registryEntryKey(normalizeEntry(shortTimeout))
|
||||||
|
if removed[0] != wantKey {
|
||||||
|
t.Fatalf("unexpected removed key: got=%q want=%q", removed[0], wantKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,6 +3,7 @@ package discovery
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -17,6 +18,8 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const DefaultRegistryStaleCleanupInterval = time.Duration(DefaultHealthIntervalSec) * time.Second
|
||||||
|
|
||||||
type RegistryOption func(*RegistryService)
|
type RegistryOption func(*RegistryService)
|
||||||
|
|
||||||
func WithRegistryKVTTL(ttl time.Duration) RegistryOption {
|
func WithRegistryKVTTL(ttl time.Duration) RegistryOption {
|
||||||
@@ -28,6 +31,16 @@ func WithRegistryKVTTL(ttl time.Duration) RegistryOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithRegistryStaleCleanupInterval configures periodic stale-entry eviction.
|
||||||
|
func WithRegistryStaleCleanupInterval(interval time.Duration) RegistryOption {
|
||||||
|
return func(s *RegistryService) {
|
||||||
|
if s == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.staleCleanupInterval = interval
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type RegistryService struct {
|
type RegistryService struct {
|
||||||
logger mlogger.Logger
|
logger mlogger.Logger
|
||||||
registry *Registry
|
registry *Registry
|
||||||
@@ -37,6 +50,9 @@ type RegistryService struct {
|
|||||||
kv *KVStore
|
kv *KVStore
|
||||||
kvWatcher nats.KeyWatcher
|
kvWatcher nats.KeyWatcher
|
||||||
kvOptions []KVStoreOption
|
kvOptions []KVStoreOption
|
||||||
|
staleCleanupInterval time.Duration
|
||||||
|
staleCleanupStop chan struct{}
|
||||||
|
staleCleanupDone chan struct{}
|
||||||
|
|
||||||
startOnce sync.Once
|
startOnce sync.Once
|
||||||
stopOnce sync.Once
|
stopOnce sync.Once
|
||||||
@@ -87,6 +103,7 @@ func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg
|
|||||||
registry: registry,
|
registry: registry,
|
||||||
producer: producer,
|
producer: producer,
|
||||||
sender: sender,
|
sender: sender,
|
||||||
|
staleCleanupInterval: DefaultRegistryStaleCleanupInterval,
|
||||||
}
|
}
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
if opt != nil {
|
if opt != nil {
|
||||||
@@ -128,6 +145,7 @@ func (s *RegistryService) Start() {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
s.startKVWatch()
|
s.startKVWatch()
|
||||||
|
s.startStaleCleanup()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -144,6 +162,12 @@ func (s *RegistryService) Stop() {
|
|||||||
if s.kvWatcher != nil {
|
if s.kvWatcher != nil {
|
||||||
_ = s.kvWatcher.Stop()
|
_ = s.kvWatcher.Stop()
|
||||||
}
|
}
|
||||||
|
if s.staleCleanupStop != nil {
|
||||||
|
close(s.staleCleanupStop)
|
||||||
|
}
|
||||||
|
if s.staleCleanupDone != nil {
|
||||||
|
<-s.staleCleanupDone
|
||||||
|
}
|
||||||
s.logInfo("Discovery registry service stopped")
|
s.logInfo("Discovery registry service stopped")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -305,6 +329,56 @@ func (s *RegistryService) startKVWatch() {
|
|||||||
go s.consumeKVUpdates(watcher)
|
go s.consumeKVUpdates(watcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *RegistryService) startStaleCleanup() {
|
||||||
|
if s == nil || s.registry == nil || s.staleCleanupInterval <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.staleCleanupStop = make(chan struct{})
|
||||||
|
s.staleCleanupDone = make(chan struct{})
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer close(s.staleCleanupDone)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(s.staleCleanupInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
s.pruneStaleEntries(time.Now())
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
s.pruneStaleEntries(time.Now())
|
||||||
|
case <-s.staleCleanupStop:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *RegistryService) pruneStaleEntries(now time.Time) {
|
||||||
|
if s == nil || s.registry == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
removedKeys := s.registry.DeleteStale(now)
|
||||||
|
if len(removedKeys) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range removedKeys {
|
||||||
|
if strings.TrimSpace(key) == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
s.logInfo("Discovery registry stale entry removed", zap.String("key", key))
|
||||||
|
if s.kv == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := s.kv.Delete(key); err != nil && !errors.Is(err, nats.ErrKeyNotFound) {
|
||||||
|
s.logWarn("Failed to delete stale discovery KV entry", zap.String("key", key), zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *RegistryService) consumeKVUpdates(watcher nats.KeyWatcher) {
|
func (s *RegistryService) consumeKVUpdates(watcher nats.KeyWatcher) {
|
||||||
if s == nil || watcher == nil {
|
if s == nil || watcher == nil {
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user