From ce23de94ce6229a0e6d214eda37b295fa28181b1 Mon Sep 17 00:00:00 2001 From: Stephan D Date: Sun, 1 Mar 2026 14:02:05 +0100 Subject: [PATCH] added service reannounce --- api/pkg/discovery/announcer.go | 29 ++++++--- api/pkg/discovery/announcer_test.go | 99 +++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 8 deletions(-) create mode 100644 api/pkg/discovery/announcer_test.go diff --git a/api/pkg/discovery/announcer.go b/api/pkg/discovery/announcer.go index 8678b527..6445c8c3 100644 --- a/api/pkg/discovery/announcer.go +++ b/api/pkg/discovery/announcer.go @@ -12,6 +12,8 @@ import ( "go.uber.org/zap" ) +const defaultReannounceHeartbeatFactor = 6 + type Announcer struct { logger mlogger.Logger producer msg.Producer @@ -84,23 +86,34 @@ func (a *Announcer) Stop() { func (a *Announcer) heartbeatLoop() { defer close(a.doneCh) - interval := time.Duration(a.announce.Health.IntervalSec) * time.Second - if interval <= 0 { - interval = time.Duration(DefaultHealthIntervalSec) * time.Second - } - ticker := time.NewTicker(interval) - defer ticker.Stop() + heartbeatInterval := heartbeatIntervalForHealth(a.announce.Health) + heartbeatTicker := time.NewTicker(heartbeatInterval) + defer heartbeatTicker.Stop() + heartbeatTicks := 0 for { select { case <-a.stopCh: return - case <-ticker.C: - a.sendHeartbeat() + case <-heartbeatTicker.C: + heartbeatTicks++ + if heartbeatTicks%defaultReannounceHeartbeatFactor == 0 { + a.sendAnnouncement() + } else { + a.sendHeartbeat() + } } } } +func heartbeatIntervalForHealth(health HealthParams) time.Duration { + interval := time.Duration(health.IntervalSec) * time.Second + if interval <= 0 { + return time.Duration(DefaultHealthIntervalSec) * time.Second + } + return interval +} + func (a *Announcer) sendAnnouncement() { env := NewServiceAnnounceEnvelope(a.sender, a.announce) event := ServiceAnnounceEvent() diff --git a/api/pkg/discovery/announcer_test.go b/api/pkg/discovery/announcer_test.go new file mode 100644 index 00000000..56333612 --- /dev/null +++ b/api/pkg/discovery/announcer_test.go @@ -0,0 +1,99 @@ +package discovery + +import ( + "sync" + "testing" + "time" + + me "github.com/tech/sendico/pkg/messaging/envelope" + "go.uber.org/zap" +) + +type recordingProducer struct { + mu sync.Mutex + counts map[string]int +} + +func newRecordingProducer() *recordingProducer { + return &recordingProducer{ + counts: make(map[string]int), + } +} + +func (p *recordingProducer) SendMessage(env me.Envelope) error { + if p == nil || env == nil { + return nil + } + sig := env.GetSignature() + if sig == nil { + return nil + } + + p.mu.Lock() + p.counts[sig.ToString()]++ + p.mu.Unlock() + return nil +} + +func (p *recordingProducer) count(event string) int { + if p == nil { + return 0 + } + p.mu.Lock() + defer p.mu.Unlock() + return p.counts[event] +} + +func waitForAtLeast(t *testing.T, timeout time.Duration, countFn func() int, want int, label string) { + t.Helper() + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if countFn() >= want { + return + } + time.Sleep(20 * time.Millisecond) + } + t.Fatalf("timed out waiting for %s: got=%d want_at_least=%d", label, countFn(), want) +} + +func TestHeartbeatIntervalForHealth_Default(t *testing.T) { + got := heartbeatIntervalForHealth(HealthParams{}) + want := time.Duration(DefaultHealthIntervalSec) * time.Second + if got != want { + t.Fatalf("unexpected heartbeat interval: got=%s want=%s", got, want) + } +} + +func TestReannounceIntervalForHealth_Default(t *testing.T) { + got := heartbeatIntervalForHealth(HealthParams{}) * defaultReannounceHeartbeatFactor + want := time.Duration(DefaultHealthIntervalSec*defaultReannounceHeartbeatFactor) * time.Second + if got != want { + t.Fatalf("unexpected reannounce interval: got=%s want=%s", got, want) + } +} + +func TestAnnouncerPeriodicReannounce(t *testing.T) { + producer := newRecordingProducer() + announce := Announcement{ + Service: "FX_INGESTOR", + Operations: []string{OperationFXIngest}, + Health: HealthParams{ + IntervalSec: 1, + TimeoutSec: 2, + }, + } + + announcer := NewAnnouncer(zap.NewNop(), producer, "fx_ingestor", announce) + announcer.Start() + defer announcer.Stop() + + announceEvent := ServiceAnnounceEvent().ToString() + heartbeatEvent := HeartbeatEvent().ToString() + + waitForAtLeast(t, 1*time.Second, func() int { return producer.count(announceEvent) }, 1, "initial announce") + waitForAtLeast(t, 1*time.Second, func() int { return producer.count(heartbeatEvent) }, 1, "initial heartbeat") + + // With 1s heartbeat and factor=3, periodic re-announce should happen in ~3s. + waitForAtLeast(t, 5*time.Second, func() int { return producer.count(announceEvent) }, 2, "periodic re-announce") +}