added service reannounce #594

Merged
tech merged 1 commits from discovery-593 into main 2026-03-01 13:02:47 +00:00
2 changed files with 120 additions and 8 deletions
Showing only changes of commit ce23de94ce - Show all commits

View File

@@ -12,6 +12,8 @@ import (
"go.uber.org/zap"
)
const defaultReannounceHeartbeatFactor = 6
type Announcer struct {
logger mlogger.Logger
producer msg.Producer
@@ -84,22 +86,33 @@ func (a *Announcer) Stop() {
func (a *Announcer) heartbeatLoop() {
defer close(a.doneCh)
interval := time.Duration(a.announce.Health.IntervalSec) * time.Second
if interval <= 0 {
interval = time.Duration(DefaultHealthIntervalSec) * time.Second
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
heartbeatInterval := heartbeatIntervalForHealth(a.announce.Health)
heartbeatTicker := time.NewTicker(heartbeatInterval)
defer heartbeatTicker.Stop()
heartbeatTicks := 0
for {
select {
case <-a.stopCh:
return
case <-ticker.C:
case <-heartbeatTicker.C:
heartbeatTicks++
if heartbeatTicks%defaultReannounceHeartbeatFactor == 0 {
a.sendAnnouncement()
} else {
a.sendHeartbeat()
}
}
}
}
func heartbeatIntervalForHealth(health HealthParams) time.Duration {
interval := time.Duration(health.IntervalSec) * time.Second
if interval <= 0 {
return time.Duration(DefaultHealthIntervalSec) * time.Second
}
return interval
}
func (a *Announcer) sendAnnouncement() {
env := NewServiceAnnounceEnvelope(a.sender, a.announce)

View File

@@ -0,0 +1,99 @@
package discovery
import (
"sync"
"testing"
"time"
me "github.com/tech/sendico/pkg/messaging/envelope"
"go.uber.org/zap"
)
type recordingProducer struct {
mu sync.Mutex
counts map[string]int
}
func newRecordingProducer() *recordingProducer {
return &recordingProducer{
counts: make(map[string]int),
}
}
func (p *recordingProducer) SendMessage(env me.Envelope) error {
if p == nil || env == nil {
return nil
}
sig := env.GetSignature()
if sig == nil {
return nil
}
p.mu.Lock()
p.counts[sig.ToString()]++
p.mu.Unlock()
return nil
}
func (p *recordingProducer) count(event string) int {
if p == nil {
return 0
}
p.mu.Lock()
defer p.mu.Unlock()
return p.counts[event]
}
func waitForAtLeast(t *testing.T, timeout time.Duration, countFn func() int, want int, label string) {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if countFn() >= want {
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("timed out waiting for %s: got=%d want_at_least=%d", label, countFn(), want)
}
func TestHeartbeatIntervalForHealth_Default(t *testing.T) {
got := heartbeatIntervalForHealth(HealthParams{})
want := time.Duration(DefaultHealthIntervalSec) * time.Second
if got != want {
t.Fatalf("unexpected heartbeat interval: got=%s want=%s", got, want)
}
}
func TestReannounceIntervalForHealth_Default(t *testing.T) {
got := heartbeatIntervalForHealth(HealthParams{}) * defaultReannounceHeartbeatFactor
want := time.Duration(DefaultHealthIntervalSec*defaultReannounceHeartbeatFactor) * time.Second
if got != want {
t.Fatalf("unexpected reannounce interval: got=%s want=%s", got, want)
}
}
func TestAnnouncerPeriodicReannounce(t *testing.T) {
producer := newRecordingProducer()
announce := Announcement{
Service: "FX_INGESTOR",
Operations: []string{OperationFXIngest},
Health: HealthParams{
IntervalSec: 1,
TimeoutSec: 2,
},
}
announcer := NewAnnouncer(zap.NewNop(), producer, "fx_ingestor", announce)
announcer.Start()
defer announcer.Stop()
announceEvent := ServiceAnnounceEvent().ToString()
heartbeatEvent := HeartbeatEvent().ToString()
waitForAtLeast(t, 1*time.Second, func() int { return producer.count(announceEvent) }, 1, "initial announce")
waitForAtLeast(t, 1*time.Second, func() int { return producer.count(heartbeatEvent) }, 1, "initial heartbeat")
// With 1s heartbeat and factor=3, periodic re-announce should happen in ~3s.
waitForAtLeast(t, 5*time.Second, func() int { return producer.count(announceEvent) }, 2, "periodic re-announce")
}