Merge pull request 'added service reannounce' (#594) from discovery-593 into main
Some checks failed
ci/woodpecker/push/gateway_tgsettle Pipeline is pending
ci/woodpecker/push/gateway_tron Pipeline is pending
ci/woodpecker/push/ledger Pipeline is pending
ci/woodpecker/push/notification Pipeline is pending
ci/woodpecker/push/payments_methods Pipeline is pending
ci/woodpecker/push/payments_orchestrator Pipeline is pending
ci/woodpecker/push/payments_quotation Pipeline is pending
ci/woodpecker/push/billing_documents Pipeline failed
ci/woodpecker/push/bff Pipeline failed
ci/woodpecker/push/billing_fees Pipeline failed
ci/woodpecker/push/callbacks Pipeline failed
ci/woodpecker/push/discovery Pipeline failed
ci/woodpecker/push/fx_ingestor Pipeline failed
ci/woodpecker/push/fx_oracle Pipeline failed
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/gateway_mntx Pipeline failed
ci/woodpecker/push/gateway_chain Pipeline failed
Some checks failed
ci/woodpecker/push/gateway_tgsettle Pipeline is pending
ci/woodpecker/push/gateway_tron Pipeline is pending
ci/woodpecker/push/ledger Pipeline is pending
ci/woodpecker/push/notification Pipeline is pending
ci/woodpecker/push/payments_methods Pipeline is pending
ci/woodpecker/push/payments_orchestrator Pipeline is pending
ci/woodpecker/push/payments_quotation Pipeline is pending
ci/woodpecker/push/billing_documents Pipeline failed
ci/woodpecker/push/bff Pipeline failed
ci/woodpecker/push/billing_fees Pipeline failed
ci/woodpecker/push/callbacks Pipeline failed
ci/woodpecker/push/discovery Pipeline failed
ci/woodpecker/push/fx_ingestor Pipeline failed
ci/woodpecker/push/fx_oracle Pipeline failed
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/gateway_mntx Pipeline failed
ci/woodpecker/push/gateway_chain Pipeline failed
Reviewed-on: #594
This commit was merged in pull request #594.
This commit is contained in:
@@ -12,6 +12,8 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const defaultReannounceHeartbeatFactor = 6
|
||||||
|
|
||||||
type Announcer struct {
|
type Announcer struct {
|
||||||
logger mlogger.Logger
|
logger mlogger.Logger
|
||||||
producer msg.Producer
|
producer msg.Producer
|
||||||
@@ -84,22 +86,33 @@ func (a *Announcer) Stop() {
|
|||||||
|
|
||||||
func (a *Announcer) heartbeatLoop() {
|
func (a *Announcer) heartbeatLoop() {
|
||||||
defer close(a.doneCh)
|
defer close(a.doneCh)
|
||||||
interval := time.Duration(a.announce.Health.IntervalSec) * time.Second
|
heartbeatInterval := heartbeatIntervalForHealth(a.announce.Health)
|
||||||
if interval <= 0 {
|
heartbeatTicker := time.NewTicker(heartbeatInterval)
|
||||||
interval = time.Duration(DefaultHealthIntervalSec) * time.Second
|
defer heartbeatTicker.Stop()
|
||||||
}
|
heartbeatTicks := 0
|
||||||
ticker := time.NewTicker(interval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-a.stopCh:
|
case <-a.stopCh:
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-heartbeatTicker.C:
|
||||||
|
heartbeatTicks++
|
||||||
|
if heartbeatTicks%defaultReannounceHeartbeatFactor == 0 {
|
||||||
|
a.sendAnnouncement()
|
||||||
|
} else {
|
||||||
a.sendHeartbeat()
|
a.sendHeartbeat()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func heartbeatIntervalForHealth(health HealthParams) time.Duration {
|
||||||
|
interval := time.Duration(health.IntervalSec) * time.Second
|
||||||
|
if interval <= 0 {
|
||||||
|
return time.Duration(DefaultHealthIntervalSec) * time.Second
|
||||||
|
}
|
||||||
|
return interval
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Announcer) sendAnnouncement() {
|
func (a *Announcer) sendAnnouncement() {
|
||||||
env := NewServiceAnnounceEnvelope(a.sender, a.announce)
|
env := NewServiceAnnounceEnvelope(a.sender, a.announce)
|
||||||
|
|||||||
99
api/pkg/discovery/announcer_test.go
Normal file
99
api/pkg/discovery/announcer_test.go
Normal file
@@ -0,0 +1,99 @@
|
|||||||
|
package discovery
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
me "github.com/tech/sendico/pkg/messaging/envelope"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type recordingProducer struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
counts map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRecordingProducer() *recordingProducer {
|
||||||
|
return &recordingProducer{
|
||||||
|
counts: make(map[string]int),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *recordingProducer) SendMessage(env me.Envelope) error {
|
||||||
|
if p == nil || env == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sig := env.GetSignature()
|
||||||
|
if sig == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
p.mu.Lock()
|
||||||
|
p.counts[sig.ToString()]++
|
||||||
|
p.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *recordingProducer) count(event string) int {
|
||||||
|
if p == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
return p.counts[event]
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForAtLeast(t *testing.T, timeout time.Duration, countFn func() int, want int, label string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if countFn() >= want {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
}
|
||||||
|
t.Fatalf("timed out waiting for %s: got=%d want_at_least=%d", label, countFn(), want)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeartbeatIntervalForHealth_Default(t *testing.T) {
|
||||||
|
got := heartbeatIntervalForHealth(HealthParams{})
|
||||||
|
want := time.Duration(DefaultHealthIntervalSec) * time.Second
|
||||||
|
if got != want {
|
||||||
|
t.Fatalf("unexpected heartbeat interval: got=%s want=%s", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReannounceIntervalForHealth_Default(t *testing.T) {
|
||||||
|
got := heartbeatIntervalForHealth(HealthParams{}) * defaultReannounceHeartbeatFactor
|
||||||
|
want := time.Duration(DefaultHealthIntervalSec*defaultReannounceHeartbeatFactor) * time.Second
|
||||||
|
if got != want {
|
||||||
|
t.Fatalf("unexpected reannounce interval: got=%s want=%s", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAnnouncerPeriodicReannounce(t *testing.T) {
|
||||||
|
producer := newRecordingProducer()
|
||||||
|
announce := Announcement{
|
||||||
|
Service: "FX_INGESTOR",
|
||||||
|
Operations: []string{OperationFXIngest},
|
||||||
|
Health: HealthParams{
|
||||||
|
IntervalSec: 1,
|
||||||
|
TimeoutSec: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
announcer := NewAnnouncer(zap.NewNop(), producer, "fx_ingestor", announce)
|
||||||
|
announcer.Start()
|
||||||
|
defer announcer.Stop()
|
||||||
|
|
||||||
|
announceEvent := ServiceAnnounceEvent().ToString()
|
||||||
|
heartbeatEvent := HeartbeatEvent().ToString()
|
||||||
|
|
||||||
|
waitForAtLeast(t, 1*time.Second, func() int { return producer.count(announceEvent) }, 1, "initial announce")
|
||||||
|
waitForAtLeast(t, 1*time.Second, func() int { return producer.count(heartbeatEvent) }, 1, "initial heartbeat")
|
||||||
|
|
||||||
|
// With 1s heartbeat and factor=3, periodic re-announce should happen in ~3s.
|
||||||
|
waitForAtLeast(t, 5*time.Second, func() int { return producer.count(announceEvent) }, 2, "periodic re-announce")
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user