discovery stale records dropout implemented #577

Merged
tech merged 1 commits from discovery-576 into main 2026-02-27 17:08:07 +00:00
6 changed files with 202 additions and 19 deletions

View File

@@ -9,6 +9,7 @@ replace github.com/tech/sendico/gateway/common => ../common
require (
github.com/go-chi/chi/v5 v5.2.5
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/shopspring/decimal v1.4.0
github.com/tech/sendico/gateway/common v0.1.0
github.com/tech/sendico/pkg v0.1.0
@@ -35,7 +36,6 @@ require (
github.com/nats-io/nats.go v1.49.0 // indirect
github.com/nats-io/nkeys v0.4.15 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.20.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect

View File

@@ -117,9 +117,18 @@ func (a *Announcer) sendAnnouncement() {
}
func (a *Announcer) sendHeartbeat() {
entryKey := registryKey(
a.announce.Service,
a.announce.Rail,
legacyNetworkFromCurrencies(a.announce.Currencies),
a.announce.Operations,
a.announce.Version,
a.announce.InstanceID,
)
hb := Heartbeat{
ID: a.announce.ID,
InstanceID: a.announce.InstanceID,
EntryKey: entryKey,
Status: "ok",
TS: time.Now().Unix(),
}

View File

@@ -150,21 +150,34 @@ func (r *Registry) UpdateHeartbeat(id string, instanceID string, status string,
if instanceID != "" && entry.InstanceID != instanceID {
continue
}
wasHealthy := entry.isHealthyAt(now)
entry.Status = status
entry.LastHeartbeat = ts
entry.Healthy = entry.isHealthyAt(now)
results = append(results, UpdateResult{
Entry: *entry,
IsNew: false,
WasHealthy: wasHealthy,
BecameHealthy: !wasHealthy && entry.Healthy,
})
results = append(results, updateHeartbeatEntry(entry, status, ts, now))
}
return results
}
func (r *Registry) UpdateHeartbeatByKey(key string, status string, ts time.Time, now time.Time) (UpdateResult, bool) {
key = strings.TrimSpace(key)
if key == "" {
return UpdateResult{}, false
}
if status == "" {
status = "ok"
}
if ts.IsZero() {
ts = now
}
r.mu.Lock()
defer r.mu.Unlock()
entry := r.entries[key]
if entry == nil {
return UpdateResult{}, false
}
return updateHeartbeatEntry(entry, status, ts, now), true
}
func (r *Registry) Delete(key string) bool {
key = strings.TrimSpace(key)
if key == "" {
@@ -181,6 +194,20 @@ func (r *Registry) Delete(key string) bool {
return true
}
func updateHeartbeatEntry(entry *RegistryEntry, status string, ts time.Time, now time.Time) UpdateResult {
wasHealthy := entry.isHealthyAt(now)
entry.Status = status
entry.LastHeartbeat = ts
entry.Healthy = entry.isHealthyAt(now)
return UpdateResult{
Entry: *entry,
IsNew: false,
WasHealthy: wasHealthy,
BecameHealthy: !wasHealthy && entry.Healthy,
}
}
// DeleteStale removes entries whose heartbeat timeout has elapsed.
func (r *Registry) DeleteStale(now time.Time) []string {
r.mu.Lock()

View File

@@ -0,0 +1,133 @@
package discovery
import (
"testing"
"time"
)
func TestRegistryUpdateHeartbeatByKey_UpdatesOnlyTargetEntry(t *testing.T) {
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
oldHeartbeat := now.Add(-45 * time.Second)
registry := NewRegistry()
oldVersion := RegistryEntry{
ID: "mcards",
InstanceID: "inst-1",
Service: "CARD",
Rail: RailCardPayout,
Operations: []string{OperationSend},
Version: "3.0.0-old",
Status: "ok",
Health: HealthParams{TimeoutSec: 30},
LastHeartbeat: oldHeartbeat,
}
newVersion := RegistryEntry{
ID: "mcards",
InstanceID: "inst-1",
Service: "CARD",
Rail: RailCardPayout,
Operations: []string{OperationSend},
Version: "3.0.0-new",
Status: "ok",
Health: HealthParams{TimeoutSec: 30},
LastHeartbeat: oldHeartbeat,
}
registry.UpsertEntry(oldVersion, now)
registry.UpsertEntry(newVersion, now)
oldKey := registryEntryKey(normalizeEntry(oldVersion))
newKey := registryEntryKey(normalizeEntry(newVersion))
result, found := registry.UpdateHeartbeatByKey(newKey, "ok", now, now)
if !found {
t.Fatalf("expected heartbeat target key to exist: %q", newKey)
}
if got := registryEntryKey(normalizeEntry(result.Entry)); got != newKey {
t.Fatalf("unexpected updated key: got=%q want=%q", got, newKey)
}
entries := registry.List(now, false)
if len(entries) != 2 {
t.Fatalf("unexpected entries count: got=%d want=2", len(entries))
}
sawOld := false
sawNew := false
for _, entry := range entries {
key := registryEntryKey(normalizeEntry(entry))
switch key {
case oldKey:
sawOld = true
if !entry.LastHeartbeat.Equal(oldHeartbeat) {
t.Fatalf("old key heartbeat changed unexpectedly: got=%s want=%s", entry.LastHeartbeat, oldHeartbeat)
}
if entry.Healthy {
t.Fatalf("old key should remain unhealthy")
}
case newKey:
sawNew = true
if !entry.LastHeartbeat.Equal(now) {
t.Fatalf("new key heartbeat not updated: got=%s want=%s", entry.LastHeartbeat, now)
}
if !entry.Healthy {
t.Fatalf("new key should be healthy after heartbeat")
}
}
}
if !sawOld {
t.Fatalf("old key entry not found after update: %q", oldKey)
}
if !sawNew {
t.Fatalf("new key entry not found after update: %q", newKey)
}
}
func TestRegistryUpdateHeartbeat_LegacyFallbackUpdatesAllMatchingInstanceEntries(t *testing.T) {
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
oldHeartbeat := now.Add(-45 * time.Second)
registry := NewRegistry()
entries := []RegistryEntry{
{
ID: "mcards",
InstanceID: "inst-1",
Service: "CARD",
Rail: RailCardPayout,
Operations: []string{OperationSend},
Version: "3.0.0-old",
Status: "ok",
Health: HealthParams{TimeoutSec: 30},
LastHeartbeat: oldHeartbeat,
},
{
ID: "mcards",
InstanceID: "inst-1",
Service: "CARD",
Rail: RailCardPayout,
Operations: []string{OperationSend},
Version: "3.0.0-new",
Status: "ok",
Health: HealthParams{TimeoutSec: 30},
LastHeartbeat: oldHeartbeat,
},
}
for _, entry := range entries {
registry.UpsertEntry(entry, now)
}
results := registry.UpdateHeartbeat("mcards", "inst-1", "ok", now, now)
if len(results) != 2 {
t.Fatalf("unexpected updated count: got=%d want=2", len(results))
}
all := registry.List(now, false)
for _, entry := range all {
if !entry.LastHeartbeat.Equal(now) {
t.Fatalf("entry heartbeat not updated: key=%s got=%s want=%s", registryEntryKey(normalizeEntry(entry)), entry.LastHeartbeat, now)
}
if !entry.Healthy {
t.Fatalf("entry should be healthy after legacy heartbeat: key=%s", registryEntryKey(normalizeEntry(entry)))
}
}
}

View File

@@ -216,22 +216,35 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) (e
s.logWarn("Failed to decode discovery heartbeat payload", fields...)
return err
}
s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("status", payload.Status))...)
if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
entryKey := strings.TrimSpace(payload.EntryKey)
s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("entry_key", entryKey), zap.String("status", payload.Status))...)
if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
return nil
}
if strings.TrimSpace(payload.InstanceID) == "" {
if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" {
fields := append(envelopeFields(env), zap.String("id", payload.ID))
s.logWarn("Discovery heartbeat missing instance id", fields...)
}
status := strings.TrimSpace(payload.Status)
ts := time.Unix(payload.TS, 0)
if ts.Unix() <= 0 {
ts = time.Now()
}
results := s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, strings.TrimSpace(payload.Status), ts, time.Now())
if len(results) == 0 {
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
return nil
now := time.Now()
var results []UpdateResult
if entryKey != "" {
result, found := s.registry.UpdateHeartbeatByKey(entryKey, status, ts, now)
if !found {
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("entry_key", entryKey), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
return nil
}
results = append(results, result)
} else {
results = s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, status, ts, now)
if len(results) == 0 {
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
return nil
}
}
for _, result := range results {
if result.BecameHealthy {

View File

@@ -116,6 +116,7 @@ type Announcement struct {
type Heartbeat struct {
ID string `json:"id"`
InstanceID string `json:"instanceId"`
EntryKey string `json:"entryKey,omitempty"`
Status string `json:"status"`
TS int64 `json:"ts"`
}