discovery stale records dropout implemented

This commit is contained in:
Stephan D
2026-02-27 18:07:42 +01:00
parent 077c510690
commit d47159278b
6 changed files with 202 additions and 19 deletions

View File

@@ -9,6 +9,7 @@ replace github.com/tech/sendico/gateway/common => ../common
require ( require (
github.com/go-chi/chi/v5 v5.2.5 github.com/go-chi/chi/v5 v5.2.5
github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/shopspring/decimal v1.4.0 github.com/shopspring/decimal v1.4.0
github.com/tech/sendico/gateway/common v0.1.0 github.com/tech/sendico/gateway/common v0.1.0
github.com/tech/sendico/pkg v0.1.0 github.com/tech/sendico/pkg v0.1.0
@@ -35,7 +36,6 @@ require (
github.com/nats-io/nats.go v1.49.0 // indirect github.com/nats-io/nats.go v1.49.0 // indirect
github.com/nats-io/nkeys v0.4.15 // indirect github.com/nats-io/nkeys v0.4.15 // indirect
github.com/nats-io/nuid v1.0.1 // indirect github.com/nats-io/nuid v1.0.1 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.20.0 // indirect github.com/prometheus/procfs v0.20.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect

View File

@@ -117,9 +117,18 @@ func (a *Announcer) sendAnnouncement() {
} }
func (a *Announcer) sendHeartbeat() { func (a *Announcer) sendHeartbeat() {
entryKey := registryKey(
a.announce.Service,
a.announce.Rail,
legacyNetworkFromCurrencies(a.announce.Currencies),
a.announce.Operations,
a.announce.Version,
a.announce.InstanceID,
)
hb := Heartbeat{ hb := Heartbeat{
ID: a.announce.ID, ID: a.announce.ID,
InstanceID: a.announce.InstanceID, InstanceID: a.announce.InstanceID,
EntryKey: entryKey,
Status: "ok", Status: "ok",
TS: time.Now().Unix(), TS: time.Now().Unix(),
} }

View File

@@ -150,21 +150,34 @@ func (r *Registry) UpdateHeartbeat(id string, instanceID string, status string,
if instanceID != "" && entry.InstanceID != instanceID { if instanceID != "" && entry.InstanceID != instanceID {
continue continue
} }
wasHealthy := entry.isHealthyAt(now) results = append(results, updateHeartbeatEntry(entry, status, ts, now))
entry.Status = status
entry.LastHeartbeat = ts
entry.Healthy = entry.isHealthyAt(now)
results = append(results, UpdateResult{
Entry: *entry,
IsNew: false,
WasHealthy: wasHealthy,
BecameHealthy: !wasHealthy && entry.Healthy,
})
} }
return results return results
} }
func (r *Registry) UpdateHeartbeatByKey(key string, status string, ts time.Time, now time.Time) (UpdateResult, bool) {
key = strings.TrimSpace(key)
if key == "" {
return UpdateResult{}, false
}
if status == "" {
status = "ok"
}
if ts.IsZero() {
ts = now
}
r.mu.Lock()
defer r.mu.Unlock()
entry := r.entries[key]
if entry == nil {
return UpdateResult{}, false
}
return updateHeartbeatEntry(entry, status, ts, now), true
}
func (r *Registry) Delete(key string) bool { func (r *Registry) Delete(key string) bool {
key = strings.TrimSpace(key) key = strings.TrimSpace(key)
if key == "" { if key == "" {
@@ -181,6 +194,20 @@ func (r *Registry) Delete(key string) bool {
return true return true
} }
func updateHeartbeatEntry(entry *RegistryEntry, status string, ts time.Time, now time.Time) UpdateResult {
wasHealthy := entry.isHealthyAt(now)
entry.Status = status
entry.LastHeartbeat = ts
entry.Healthy = entry.isHealthyAt(now)
return UpdateResult{
Entry: *entry,
IsNew: false,
WasHealthy: wasHealthy,
BecameHealthy: !wasHealthy && entry.Healthy,
}
}
// DeleteStale removes entries whose heartbeat timeout has elapsed. // DeleteStale removes entries whose heartbeat timeout has elapsed.
func (r *Registry) DeleteStale(now time.Time) []string { func (r *Registry) DeleteStale(now time.Time) []string {
r.mu.Lock() r.mu.Lock()

View File

@@ -0,0 +1,133 @@
package discovery
import (
"testing"
"time"
)
func TestRegistryUpdateHeartbeatByKey_UpdatesOnlyTargetEntry(t *testing.T) {
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
oldHeartbeat := now.Add(-45 * time.Second)
registry := NewRegistry()
oldVersion := RegistryEntry{
ID: "mcards",
InstanceID: "inst-1",
Service: "CARD",
Rail: RailCardPayout,
Operations: []string{OperationSend},
Version: "3.0.0-old",
Status: "ok",
Health: HealthParams{TimeoutSec: 30},
LastHeartbeat: oldHeartbeat,
}
newVersion := RegistryEntry{
ID: "mcards",
InstanceID: "inst-1",
Service: "CARD",
Rail: RailCardPayout,
Operations: []string{OperationSend},
Version: "3.0.0-new",
Status: "ok",
Health: HealthParams{TimeoutSec: 30},
LastHeartbeat: oldHeartbeat,
}
registry.UpsertEntry(oldVersion, now)
registry.UpsertEntry(newVersion, now)
oldKey := registryEntryKey(normalizeEntry(oldVersion))
newKey := registryEntryKey(normalizeEntry(newVersion))
result, found := registry.UpdateHeartbeatByKey(newKey, "ok", now, now)
if !found {
t.Fatalf("expected heartbeat target key to exist: %q", newKey)
}
if got := registryEntryKey(normalizeEntry(result.Entry)); got != newKey {
t.Fatalf("unexpected updated key: got=%q want=%q", got, newKey)
}
entries := registry.List(now, false)
if len(entries) != 2 {
t.Fatalf("unexpected entries count: got=%d want=2", len(entries))
}
sawOld := false
sawNew := false
for _, entry := range entries {
key := registryEntryKey(normalizeEntry(entry))
switch key {
case oldKey:
sawOld = true
if !entry.LastHeartbeat.Equal(oldHeartbeat) {
t.Fatalf("old key heartbeat changed unexpectedly: got=%s want=%s", entry.LastHeartbeat, oldHeartbeat)
}
if entry.Healthy {
t.Fatalf("old key should remain unhealthy")
}
case newKey:
sawNew = true
if !entry.LastHeartbeat.Equal(now) {
t.Fatalf("new key heartbeat not updated: got=%s want=%s", entry.LastHeartbeat, now)
}
if !entry.Healthy {
t.Fatalf("new key should be healthy after heartbeat")
}
}
}
if !sawOld {
t.Fatalf("old key entry not found after update: %q", oldKey)
}
if !sawNew {
t.Fatalf("new key entry not found after update: %q", newKey)
}
}
func TestRegistryUpdateHeartbeat_LegacyFallbackUpdatesAllMatchingInstanceEntries(t *testing.T) {
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
oldHeartbeat := now.Add(-45 * time.Second)
registry := NewRegistry()
entries := []RegistryEntry{
{
ID: "mcards",
InstanceID: "inst-1",
Service: "CARD",
Rail: RailCardPayout,
Operations: []string{OperationSend},
Version: "3.0.0-old",
Status: "ok",
Health: HealthParams{TimeoutSec: 30},
LastHeartbeat: oldHeartbeat,
},
{
ID: "mcards",
InstanceID: "inst-1",
Service: "CARD",
Rail: RailCardPayout,
Operations: []string{OperationSend},
Version: "3.0.0-new",
Status: "ok",
Health: HealthParams{TimeoutSec: 30},
LastHeartbeat: oldHeartbeat,
},
}
for _, entry := range entries {
registry.UpsertEntry(entry, now)
}
results := registry.UpdateHeartbeat("mcards", "inst-1", "ok", now, now)
if len(results) != 2 {
t.Fatalf("unexpected updated count: got=%d want=2", len(results))
}
all := registry.List(now, false)
for _, entry := range all {
if !entry.LastHeartbeat.Equal(now) {
t.Fatalf("entry heartbeat not updated: key=%s got=%s want=%s", registryEntryKey(normalizeEntry(entry)), entry.LastHeartbeat, now)
}
if !entry.Healthy {
t.Fatalf("entry should be healthy after legacy heartbeat: key=%s", registryEntryKey(normalizeEntry(entry)))
}
}
}

View File

@@ -216,22 +216,35 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) (e
s.logWarn("Failed to decode discovery heartbeat payload", fields...) s.logWarn("Failed to decode discovery heartbeat payload", fields...)
return err return err
} }
s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("status", payload.Status))...) entryKey := strings.TrimSpace(payload.EntryKey)
if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" { s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("entry_key", entryKey), zap.String("status", payload.Status))...)
if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
return nil return nil
} }
if strings.TrimSpace(payload.InstanceID) == "" { if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" {
fields := append(envelopeFields(env), zap.String("id", payload.ID)) fields := append(envelopeFields(env), zap.String("id", payload.ID))
s.logWarn("Discovery heartbeat missing instance id", fields...) s.logWarn("Discovery heartbeat missing instance id", fields...)
} }
status := strings.TrimSpace(payload.Status)
ts := time.Unix(payload.TS, 0) ts := time.Unix(payload.TS, 0)
if ts.Unix() <= 0 { if ts.Unix() <= 0 {
ts = time.Now() ts = time.Now()
} }
results := s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, strings.TrimSpace(payload.Status), ts, time.Now()) now := time.Now()
if len(results) == 0 { var results []UpdateResult
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID)) if entryKey != "" {
return nil result, found := s.registry.UpdateHeartbeatByKey(entryKey, status, ts, now)
if !found {
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("entry_key", entryKey), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
return nil
}
results = append(results, result)
} else {
results = s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, status, ts, now)
if len(results) == 0 {
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
return nil
}
} }
for _, result := range results { for _, result := range results {
if result.BecameHealthy { if result.BecameHealthy {

View File

@@ -116,6 +116,7 @@ type Announcement struct {
type Heartbeat struct { type Heartbeat struct {
ID string `json:"id"` ID string `json:"id"`
InstanceID string `json:"instanceId"` InstanceID string `json:"instanceId"`
EntryKey string `json:"entryKey,omitempty"`
Status string `json:"status"` Status string `json:"status"`
TS int64 `json:"ts"` TS int64 `json:"ts"`
} }