Merge pull request 'discovery stale records dropout implemented' (#577) from discovery-576 into main
All checks were successful
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/gateway_mntx Pipeline was successful
ci/woodpecker/push/gateway_chain Pipeline was successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
ci/woodpecker/push/billing_documents Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/discovery Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/gateway_tron Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
ci/woodpecker/push/payments_methods Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful
ci/woodpecker/push/payments_quotation Pipeline was successful
All checks were successful
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/gateway_mntx Pipeline was successful
ci/woodpecker/push/gateway_chain Pipeline was successful
ci/woodpecker/push/gateway_tgsettle Pipeline was successful
ci/woodpecker/push/billing_documents Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/discovery Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/gateway_tron Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
ci/woodpecker/push/payments_methods Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful
ci/woodpecker/push/payments_quotation Pipeline was successful
Reviewed-on: #577
This commit was merged in pull request #577.
This commit is contained in:
@@ -9,6 +9,7 @@ replace github.com/tech/sendico/gateway/common => ../common
|
|||||||
require (
|
require (
|
||||||
github.com/go-chi/chi/v5 v5.2.5
|
github.com/go-chi/chi/v5 v5.2.5
|
||||||
github.com/prometheus/client_golang v1.23.2
|
github.com/prometheus/client_golang v1.23.2
|
||||||
|
github.com/prometheus/client_model v0.6.2
|
||||||
github.com/shopspring/decimal v1.4.0
|
github.com/shopspring/decimal v1.4.0
|
||||||
github.com/tech/sendico/gateway/common v0.1.0
|
github.com/tech/sendico/gateway/common v0.1.0
|
||||||
github.com/tech/sendico/pkg v0.1.0
|
github.com/tech/sendico/pkg v0.1.0
|
||||||
@@ -35,7 +36,6 @@ require (
|
|||||||
github.com/nats-io/nats.go v1.49.0 // indirect
|
github.com/nats-io/nats.go v1.49.0 // indirect
|
||||||
github.com/nats-io/nkeys v0.4.15 // indirect
|
github.com/nats-io/nkeys v0.4.15 // indirect
|
||||||
github.com/nats-io/nuid v1.0.1 // indirect
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
github.com/prometheus/client_model v0.6.2 // indirect
|
|
||||||
github.com/prometheus/common v0.67.5 // indirect
|
github.com/prometheus/common v0.67.5 // indirect
|
||||||
github.com/prometheus/procfs v0.20.0 // indirect
|
github.com/prometheus/procfs v0.20.0 // indirect
|
||||||
github.com/rogpeppe/go-internal v1.12.0 // indirect
|
github.com/rogpeppe/go-internal v1.12.0 // indirect
|
||||||
|
|||||||
@@ -117,9 +117,18 @@ func (a *Announcer) sendAnnouncement() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *Announcer) sendHeartbeat() {
|
func (a *Announcer) sendHeartbeat() {
|
||||||
|
entryKey := registryKey(
|
||||||
|
a.announce.Service,
|
||||||
|
a.announce.Rail,
|
||||||
|
legacyNetworkFromCurrencies(a.announce.Currencies),
|
||||||
|
a.announce.Operations,
|
||||||
|
a.announce.Version,
|
||||||
|
a.announce.InstanceID,
|
||||||
|
)
|
||||||
hb := Heartbeat{
|
hb := Heartbeat{
|
||||||
ID: a.announce.ID,
|
ID: a.announce.ID,
|
||||||
InstanceID: a.announce.InstanceID,
|
InstanceID: a.announce.InstanceID,
|
||||||
|
EntryKey: entryKey,
|
||||||
Status: "ok",
|
Status: "ok",
|
||||||
TS: time.Now().Unix(),
|
TS: time.Now().Unix(),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -150,21 +150,34 @@ func (r *Registry) UpdateHeartbeat(id string, instanceID string, status string,
|
|||||||
if instanceID != "" && entry.InstanceID != instanceID {
|
if instanceID != "" && entry.InstanceID != instanceID {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wasHealthy := entry.isHealthyAt(now)
|
results = append(results, updateHeartbeatEntry(entry, status, ts, now))
|
||||||
entry.Status = status
|
|
||||||
entry.LastHeartbeat = ts
|
|
||||||
entry.Healthy = entry.isHealthyAt(now)
|
|
||||||
|
|
||||||
results = append(results, UpdateResult{
|
|
||||||
Entry: *entry,
|
|
||||||
IsNew: false,
|
|
||||||
WasHealthy: wasHealthy,
|
|
||||||
BecameHealthy: !wasHealthy && entry.Healthy,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
return results
|
return results
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Registry) UpdateHeartbeatByKey(key string, status string, ts time.Time, now time.Time) (UpdateResult, bool) {
|
||||||
|
key = strings.TrimSpace(key)
|
||||||
|
if key == "" {
|
||||||
|
return UpdateResult{}, false
|
||||||
|
}
|
||||||
|
if status == "" {
|
||||||
|
status = "ok"
|
||||||
|
}
|
||||||
|
if ts.IsZero() {
|
||||||
|
ts = now
|
||||||
|
}
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
entry := r.entries[key]
|
||||||
|
if entry == nil {
|
||||||
|
return UpdateResult{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return updateHeartbeatEntry(entry, status, ts, now), true
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Registry) Delete(key string) bool {
|
func (r *Registry) Delete(key string) bool {
|
||||||
key = strings.TrimSpace(key)
|
key = strings.TrimSpace(key)
|
||||||
if key == "" {
|
if key == "" {
|
||||||
@@ -181,6 +194,20 @@ func (r *Registry) Delete(key string) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateHeartbeatEntry(entry *RegistryEntry, status string, ts time.Time, now time.Time) UpdateResult {
|
||||||
|
wasHealthy := entry.isHealthyAt(now)
|
||||||
|
entry.Status = status
|
||||||
|
entry.LastHeartbeat = ts
|
||||||
|
entry.Healthy = entry.isHealthyAt(now)
|
||||||
|
|
||||||
|
return UpdateResult{
|
||||||
|
Entry: *entry,
|
||||||
|
IsNew: false,
|
||||||
|
WasHealthy: wasHealthy,
|
||||||
|
BecameHealthy: !wasHealthy && entry.Healthy,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteStale removes entries whose heartbeat timeout has elapsed.
|
// DeleteStale removes entries whose heartbeat timeout has elapsed.
|
||||||
func (r *Registry) DeleteStale(now time.Time) []string {
|
func (r *Registry) DeleteStale(now time.Time) []string {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
|
|||||||
133
api/pkg/discovery/registry_heartbeat_test.go
Normal file
133
api/pkg/discovery/registry_heartbeat_test.go
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
package discovery
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRegistryUpdateHeartbeatByKey_UpdatesOnlyTargetEntry(t *testing.T) {
|
||||||
|
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
|
||||||
|
oldHeartbeat := now.Add(-45 * time.Second)
|
||||||
|
registry := NewRegistry()
|
||||||
|
|
||||||
|
oldVersion := RegistryEntry{
|
||||||
|
ID: "mcards",
|
||||||
|
InstanceID: "inst-1",
|
||||||
|
Service: "CARD",
|
||||||
|
Rail: RailCardPayout,
|
||||||
|
Operations: []string{OperationSend},
|
||||||
|
Version: "3.0.0-old",
|
||||||
|
Status: "ok",
|
||||||
|
Health: HealthParams{TimeoutSec: 30},
|
||||||
|
LastHeartbeat: oldHeartbeat,
|
||||||
|
}
|
||||||
|
newVersion := RegistryEntry{
|
||||||
|
ID: "mcards",
|
||||||
|
InstanceID: "inst-1",
|
||||||
|
Service: "CARD",
|
||||||
|
Rail: RailCardPayout,
|
||||||
|
Operations: []string{OperationSend},
|
||||||
|
Version: "3.0.0-new",
|
||||||
|
Status: "ok",
|
||||||
|
Health: HealthParams{TimeoutSec: 30},
|
||||||
|
LastHeartbeat: oldHeartbeat,
|
||||||
|
}
|
||||||
|
|
||||||
|
registry.UpsertEntry(oldVersion, now)
|
||||||
|
registry.UpsertEntry(newVersion, now)
|
||||||
|
|
||||||
|
oldKey := registryEntryKey(normalizeEntry(oldVersion))
|
||||||
|
newKey := registryEntryKey(normalizeEntry(newVersion))
|
||||||
|
|
||||||
|
result, found := registry.UpdateHeartbeatByKey(newKey, "ok", now, now)
|
||||||
|
if !found {
|
||||||
|
t.Fatalf("expected heartbeat target key to exist: %q", newKey)
|
||||||
|
}
|
||||||
|
if got := registryEntryKey(normalizeEntry(result.Entry)); got != newKey {
|
||||||
|
t.Fatalf("unexpected updated key: got=%q want=%q", got, newKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := registry.List(now, false)
|
||||||
|
if len(entries) != 2 {
|
||||||
|
t.Fatalf("unexpected entries count: got=%d want=2", len(entries))
|
||||||
|
}
|
||||||
|
|
||||||
|
sawOld := false
|
||||||
|
sawNew := false
|
||||||
|
for _, entry := range entries {
|
||||||
|
key := registryEntryKey(normalizeEntry(entry))
|
||||||
|
switch key {
|
||||||
|
case oldKey:
|
||||||
|
sawOld = true
|
||||||
|
if !entry.LastHeartbeat.Equal(oldHeartbeat) {
|
||||||
|
t.Fatalf("old key heartbeat changed unexpectedly: got=%s want=%s", entry.LastHeartbeat, oldHeartbeat)
|
||||||
|
}
|
||||||
|
if entry.Healthy {
|
||||||
|
t.Fatalf("old key should remain unhealthy")
|
||||||
|
}
|
||||||
|
case newKey:
|
||||||
|
sawNew = true
|
||||||
|
if !entry.LastHeartbeat.Equal(now) {
|
||||||
|
t.Fatalf("new key heartbeat not updated: got=%s want=%s", entry.LastHeartbeat, now)
|
||||||
|
}
|
||||||
|
if !entry.Healthy {
|
||||||
|
t.Fatalf("new key should be healthy after heartbeat")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !sawOld {
|
||||||
|
t.Fatalf("old key entry not found after update: %q", oldKey)
|
||||||
|
}
|
||||||
|
if !sawNew {
|
||||||
|
t.Fatalf("new key entry not found after update: %q", newKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegistryUpdateHeartbeat_LegacyFallbackUpdatesAllMatchingInstanceEntries(t *testing.T) {
|
||||||
|
now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC)
|
||||||
|
oldHeartbeat := now.Add(-45 * time.Second)
|
||||||
|
registry := NewRegistry()
|
||||||
|
|
||||||
|
entries := []RegistryEntry{
|
||||||
|
{
|
||||||
|
ID: "mcards",
|
||||||
|
InstanceID: "inst-1",
|
||||||
|
Service: "CARD",
|
||||||
|
Rail: RailCardPayout,
|
||||||
|
Operations: []string{OperationSend},
|
||||||
|
Version: "3.0.0-old",
|
||||||
|
Status: "ok",
|
||||||
|
Health: HealthParams{TimeoutSec: 30},
|
||||||
|
LastHeartbeat: oldHeartbeat,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ID: "mcards",
|
||||||
|
InstanceID: "inst-1",
|
||||||
|
Service: "CARD",
|
||||||
|
Rail: RailCardPayout,
|
||||||
|
Operations: []string{OperationSend},
|
||||||
|
Version: "3.0.0-new",
|
||||||
|
Status: "ok",
|
||||||
|
Health: HealthParams{TimeoutSec: 30},
|
||||||
|
LastHeartbeat: oldHeartbeat,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, entry := range entries {
|
||||||
|
registry.UpsertEntry(entry, now)
|
||||||
|
}
|
||||||
|
|
||||||
|
results := registry.UpdateHeartbeat("mcards", "inst-1", "ok", now, now)
|
||||||
|
if len(results) != 2 {
|
||||||
|
t.Fatalf("unexpected updated count: got=%d want=2", len(results))
|
||||||
|
}
|
||||||
|
|
||||||
|
all := registry.List(now, false)
|
||||||
|
for _, entry := range all {
|
||||||
|
if !entry.LastHeartbeat.Equal(now) {
|
||||||
|
t.Fatalf("entry heartbeat not updated: key=%s got=%s want=%s", registryEntryKey(normalizeEntry(entry)), entry.LastHeartbeat, now)
|
||||||
|
}
|
||||||
|
if !entry.Healthy {
|
||||||
|
t.Fatalf("entry should be healthy after legacy heartbeat: key=%s", registryEntryKey(normalizeEntry(entry)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -216,23 +216,36 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) (e
|
|||||||
s.logWarn("Failed to decode discovery heartbeat payload", fields...)
|
s.logWarn("Failed to decode discovery heartbeat payload", fields...)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("status", payload.Status))...)
|
entryKey := strings.TrimSpace(payload.EntryKey)
|
||||||
if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
|
s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("entry_key", entryKey), zap.String("status", payload.Status))...)
|
||||||
|
if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if strings.TrimSpace(payload.InstanceID) == "" {
|
if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" {
|
||||||
fields := append(envelopeFields(env), zap.String("id", payload.ID))
|
fields := append(envelopeFields(env), zap.String("id", payload.ID))
|
||||||
s.logWarn("Discovery heartbeat missing instance id", fields...)
|
s.logWarn("Discovery heartbeat missing instance id", fields...)
|
||||||
}
|
}
|
||||||
|
status := strings.TrimSpace(payload.Status)
|
||||||
ts := time.Unix(payload.TS, 0)
|
ts := time.Unix(payload.TS, 0)
|
||||||
if ts.Unix() <= 0 {
|
if ts.Unix() <= 0 {
|
||||||
ts = time.Now()
|
ts = time.Now()
|
||||||
}
|
}
|
||||||
results := s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, strings.TrimSpace(payload.Status), ts, time.Now())
|
now := time.Now()
|
||||||
|
var results []UpdateResult
|
||||||
|
if entryKey != "" {
|
||||||
|
result, found := s.registry.UpdateHeartbeatByKey(entryKey, status, ts, now)
|
||||||
|
if !found {
|
||||||
|
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("entry_key", entryKey), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
results = append(results, result)
|
||||||
|
} else {
|
||||||
|
results = s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, status, ts, now)
|
||||||
if len(results) == 0 {
|
if len(results) == 0 {
|
||||||
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
|
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
for _, result := range results {
|
for _, result := range results {
|
||||||
if result.BecameHealthy {
|
if result.BecameHealthy {
|
||||||
s.logInfo("Discovery registry entry became healthy", append(entryFields(result.Entry), zap.String("status", result.Entry.Status))...)
|
s.logInfo("Discovery registry entry became healthy", append(entryFields(result.Entry), zap.String("status", result.Entry.Status))...)
|
||||||
|
|||||||
@@ -116,6 +116,7 @@ type Announcement struct {
|
|||||||
type Heartbeat struct {
|
type Heartbeat struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
InstanceID string `json:"instanceId"`
|
InstanceID string `json:"instanceId"`
|
||||||
|
EntryKey string `json:"entryKey,omitempty"`
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
TS int64 `json:"ts"`
|
TS int64 `json:"ts"`
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user