discovery stale records dropout implemented #577
@@ -9,6 +9,7 @@ replace github.com/tech/sendico/gateway/common => ../common
|
||||
require (
|
||||
github.com/go-chi/chi/v5 v5.2.5
|
||||
github.com/prometheus/client_golang v1.23.2
|
||||
github.com/prometheus/client_model v0.6.2
|
||||
github.com/shopspring/decimal v1.4.0
|
||||
github.com/tech/sendico/gateway/common v0.1.0
|
||||
github.com/tech/sendico/pkg v0.1.0
|
||||
@@ -35,7 +36,6 @@ require (
|
||||
github.com/nats-io/nats.go v1.49.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.15 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/common v0.67.5 // indirect
|
||||
github.com/prometheus/procfs v0.20.0 // indirect
|
||||
github.com/rogpeppe/go-internal v1.12.0 // indirect
|
||||
|
||||
@@ -117,9 +117,18 @@ func (a *Announcer) sendAnnouncement() {
|
||||
}
|
||||
|
||||
func (a *Announcer) sendHeartbeat() {
|
||||
entryKey := registryKey(
|
||||
a.announce.Service,
|
||||
a.announce.Rail,
|
||||
legacyNetworkFromCurrencies(a.announce.Currencies),
|
||||
a.announce.Operations,
|
||||
a.announce.Version,
|
||||
a.announce.InstanceID,
|
||||
)
|
||||
hb := Heartbeat{
|
||||
ID: a.announce.ID,
|
||||
InstanceID: a.announce.InstanceID,
|
||||
EntryKey: entryKey,
|
||||
Status: "ok",
|
||||
TS: time.Now().Unix(),
|
||||
}
|
||||
|
||||
@@ -150,21 +150,34 @@ func (r *Registry) UpdateHeartbeat(id string, instanceID string, status string,
|
||||
if instanceID != "" && entry.InstanceID != instanceID {
|
||||
continue
|
||||
}
|
||||
wasHealthy := entry.isHealthyAt(now)
|
||||
entry.Status = status
|
||||
entry.LastHeartbeat = ts
|
||||
entry.Healthy = entry.isHealthyAt(now)
|
||||
|
||||
results = append(results, UpdateResult{
|
||||
Entry: *entry,
|
||||
IsNew: false,
|
||||
WasHealthy: wasHealthy,
|
||||
BecameHealthy: !wasHealthy && entry.Healthy,
|
||||
})
|
||||
results = append(results, updateHeartbeatEntry(entry, status, ts, now))
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
||||
func (r *Registry) UpdateHeartbeatByKey(key string, status string, ts time.Time, now time.Time) (UpdateResult, bool) {
|
||||
key = strings.TrimSpace(key)
|
||||
if key == "" {
|
||||
return UpdateResult{}, false
|
||||
}
|
||||
if status == "" {
|
||||
status = "ok"
|
||||
}
|
||||
if ts.IsZero() {
|
||||
ts = now
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
entry := r.entries[key]
|
||||
if entry == nil {
|
||||
return UpdateResult{}, false
|
||||
}
|
||||
|
||||
return updateHeartbeatEntry(entry, status, ts, now), true
|
||||
}
|
||||
|
||||
func (r *Registry) Delete(key string) bool {
|
||||
key = strings.TrimSpace(key)
|
||||
if key == "" {
|
||||
@@ -181,6 +194,20 @@ func (r *Registry) Delete(key string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func updateHeartbeatEntry(entry *RegistryEntry, status string, ts time.Time, now time.Time) UpdateResult {
|
||||
wasHealthy := entry.isHealthyAt(now)
|
||||
entry.Status = status
|
||||
entry.LastHeartbeat = ts
|
||||
entry.Healthy = entry.isHealthyAt(now)
|
||||
|
||||
return UpdateResult{
|
||||
Entry: *entry,
|
||||
IsNew: false,
|
||||
WasHealthy: wasHealthy,
|
||||
BecameHealthy: !wasHealthy && entry.Healthy,
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteStale removes entries whose heartbeat timeout has elapsed.
|
||||
func (r *Registry) DeleteStale(now time.Time) []string {
|
||||
r.mu.Lock()
|
||||
|
||||
133
api/pkg/discovery/registry_heartbeat_test.go
Normal file
133
api/pkg/discovery/registry_heartbeat_test.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRegistryUpdateHeartbeatByKey_UpdatesOnlyTargetEntry(t *testing.T) {
|
||||
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
|
||||
oldHeartbeat := now.Add(-45 * time.Second)
|
||||
registry := NewRegistry()
|
||||
|
||||
oldVersion := RegistryEntry{
|
||||
ID: "mcards",
|
||||
InstanceID: "inst-1",
|
||||
Service: "CARD",
|
||||
Rail: RailCardPayout,
|
||||
Operations: []string{OperationSend},
|
||||
Version: "3.0.0-old",
|
||||
Status: "ok",
|
||||
Health: HealthParams{TimeoutSec: 30},
|
||||
LastHeartbeat: oldHeartbeat,
|
||||
}
|
||||
newVersion := RegistryEntry{
|
||||
ID: "mcards",
|
||||
InstanceID: "inst-1",
|
||||
Service: "CARD",
|
||||
Rail: RailCardPayout,
|
||||
Operations: []string{OperationSend},
|
||||
Version: "3.0.0-new",
|
||||
Status: "ok",
|
||||
Health: HealthParams{TimeoutSec: 30},
|
||||
LastHeartbeat: oldHeartbeat,
|
||||
}
|
||||
|
||||
registry.UpsertEntry(oldVersion, now)
|
||||
registry.UpsertEntry(newVersion, now)
|
||||
|
||||
oldKey := registryEntryKey(normalizeEntry(oldVersion))
|
||||
newKey := registryEntryKey(normalizeEntry(newVersion))
|
||||
|
||||
result, found := registry.UpdateHeartbeatByKey(newKey, "ok", now, now)
|
||||
if !found {
|
||||
t.Fatalf("expected heartbeat target key to exist: %q", newKey)
|
||||
}
|
||||
if got := registryEntryKey(normalizeEntry(result.Entry)); got != newKey {
|
||||
t.Fatalf("unexpected updated key: got=%q want=%q", got, newKey)
|
||||
}
|
||||
|
||||
entries := registry.List(now, false)
|
||||
if len(entries) != 2 {
|
||||
t.Fatalf("unexpected entries count: got=%d want=2", len(entries))
|
||||
}
|
||||
|
||||
sawOld := false
|
||||
sawNew := false
|
||||
for _, entry := range entries {
|
||||
key := registryEntryKey(normalizeEntry(entry))
|
||||
switch key {
|
||||
case oldKey:
|
||||
sawOld = true
|
||||
if !entry.LastHeartbeat.Equal(oldHeartbeat) {
|
||||
t.Fatalf("old key heartbeat changed unexpectedly: got=%s want=%s", entry.LastHeartbeat, oldHeartbeat)
|
||||
}
|
||||
if entry.Healthy {
|
||||
t.Fatalf("old key should remain unhealthy")
|
||||
}
|
||||
case newKey:
|
||||
sawNew = true
|
||||
if !entry.LastHeartbeat.Equal(now) {
|
||||
t.Fatalf("new key heartbeat not updated: got=%s want=%s", entry.LastHeartbeat, now)
|
||||
}
|
||||
if !entry.Healthy {
|
||||
t.Fatalf("new key should be healthy after heartbeat")
|
||||
}
|
||||
}
|
||||
}
|
||||
if !sawOld {
|
||||
t.Fatalf("old key entry not found after update: %q", oldKey)
|
||||
}
|
||||
if !sawNew {
|
||||
t.Fatalf("new key entry not found after update: %q", newKey)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistryUpdateHeartbeat_LegacyFallbackUpdatesAllMatchingInstanceEntries(t *testing.T) {
|
||||
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
|
||||
oldHeartbeat := now.Add(-45 * time.Second)
|
||||
registry := NewRegistry()
|
||||
|
||||
entries := []RegistryEntry{
|
||||
{
|
||||
ID: "mcards",
|
||||
InstanceID: "inst-1",
|
||||
Service: "CARD",
|
||||
Rail: RailCardPayout,
|
||||
Operations: []string{OperationSend},
|
||||
Version: "3.0.0-old",
|
||||
Status: "ok",
|
||||
Health: HealthParams{TimeoutSec: 30},
|
||||
LastHeartbeat: oldHeartbeat,
|
||||
},
|
||||
{
|
||||
ID: "mcards",
|
||||
InstanceID: "inst-1",
|
||||
Service: "CARD",
|
||||
Rail: RailCardPayout,
|
||||
Operations: []string{OperationSend},
|
||||
Version: "3.0.0-new",
|
||||
Status: "ok",
|
||||
Health: HealthParams{TimeoutSec: 30},
|
||||
LastHeartbeat: oldHeartbeat,
|
||||
},
|
||||
}
|
||||
for _, entry := range entries {
|
||||
registry.UpsertEntry(entry, now)
|
||||
}
|
||||
|
||||
results := registry.UpdateHeartbeat("mcards", "inst-1", "ok", now, now)
|
||||
if len(results) != 2 {
|
||||
t.Fatalf("unexpected updated count: got=%d want=2", len(results))
|
||||
}
|
||||
|
||||
all := registry.List(now, false)
|
||||
for _, entry := range all {
|
||||
if !entry.LastHeartbeat.Equal(now) {
|
||||
t.Fatalf("entry heartbeat not updated: key=%s got=%s want=%s", registryEntryKey(normalizeEntry(entry)), entry.LastHeartbeat, now)
|
||||
}
|
||||
if !entry.Healthy {
|
||||
t.Fatalf("entry should be healthy after legacy heartbeat: key=%s", registryEntryKey(normalizeEntry(entry)))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -216,23 +216,36 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) (e
|
||||
s.logWarn("Failed to decode discovery heartbeat payload", fields...)
|
||||
return err
|
||||
}
|
||||
s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("status", payload.Status))...)
|
||||
if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
|
||||
entryKey := strings.TrimSpace(payload.EntryKey)
|
||||
s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("entry_key", entryKey), zap.String("status", payload.Status))...)
|
||||
if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
|
||||
return nil
|
||||
}
|
||||
if strings.TrimSpace(payload.InstanceID) == "" {
|
||||
if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" {
|
||||
fields := append(envelopeFields(env), zap.String("id", payload.ID))
|
||||
s.logWarn("Discovery heartbeat missing instance id", fields...)
|
||||
}
|
||||
status := strings.TrimSpace(payload.Status)
|
||||
ts := time.Unix(payload.TS, 0)
|
||||
if ts.Unix() <= 0 {
|
||||
ts = time.Now()
|
||||
}
|
||||
results := s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, strings.TrimSpace(payload.Status), ts, time.Now())
|
||||
now := time.Now()
|
||||
var results []UpdateResult
|
||||
if entryKey != "" {
|
||||
result, found := s.registry.UpdateHeartbeatByKey(entryKey, status, ts, now)
|
||||
if !found {
|
||||
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("entry_key", entryKey), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
|
||||
return nil
|
||||
}
|
||||
results = append(results, result)
|
||||
} else {
|
||||
results = s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, status, ts, now)
|
||||
if len(results) == 0 {
|
||||
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
for _, result := range results {
|
||||
if result.BecameHealthy {
|
||||
s.logInfo("Discovery registry entry became healthy", append(entryFields(result.Entry), zap.String("status", result.Entry.Status))...)
|
||||
|
||||
@@ -116,6 +116,7 @@ type Announcement struct {
|
||||
type Heartbeat struct {
|
||||
ID string `json:"id"`
|
||||
InstanceID string `json:"instanceId"`
|
||||
EntryKey string `json:"entryKey,omitempty"`
|
||||
Status string `json:"status"`
|
||||
TS int64 `json:"ts"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user