diff --git a/api/gateway/mntx/go.mod b/api/gateway/mntx/go.mod index 0ca8b1f3..542ea30d 100644 --- a/api/gateway/mntx/go.mod +++ b/api/gateway/mntx/go.mod @@ -9,6 +9,7 @@ replace github.com/tech/sendico/gateway/common => ../common require ( github.com/go-chi/chi/v5 v5.2.5 github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 github.com/shopspring/decimal v1.4.0 github.com/tech/sendico/gateway/common v0.1.0 github.com/tech/sendico/pkg v0.1.0 @@ -35,7 +36,6 @@ require ( github.com/nats-io/nats.go v1.49.0 // indirect github.com/nats-io/nkeys v0.4.15 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.5 // indirect github.com/prometheus/procfs v0.20.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect diff --git a/api/pkg/discovery/announcer.go b/api/pkg/discovery/announcer.go index 8c082939..8678b527 100644 --- a/api/pkg/discovery/announcer.go +++ b/api/pkg/discovery/announcer.go @@ -117,9 +117,18 @@ func (a *Announcer) sendAnnouncement() { } func (a *Announcer) sendHeartbeat() { + entryKey := registryKey( + a.announce.Service, + a.announce.Rail, + legacyNetworkFromCurrencies(a.announce.Currencies), + a.announce.Operations, + a.announce.Version, + a.announce.InstanceID, + ) hb := Heartbeat{ ID: a.announce.ID, InstanceID: a.announce.InstanceID, + EntryKey: entryKey, Status: "ok", TS: time.Now().Unix(), } diff --git a/api/pkg/discovery/registry.go b/api/pkg/discovery/registry.go index c94aaa02..e0051568 100644 --- a/api/pkg/discovery/registry.go +++ b/api/pkg/discovery/registry.go @@ -150,21 +150,34 @@ func (r *Registry) UpdateHeartbeat(id string, instanceID string, status string, if instanceID != "" && entry.InstanceID != instanceID { continue } - wasHealthy := entry.isHealthyAt(now) - entry.Status = status - entry.LastHeartbeat = ts - entry.Healthy = entry.isHealthyAt(now) - - results = append(results, UpdateResult{ - Entry: *entry, - IsNew: false, - WasHealthy: wasHealthy, - BecameHealthy: !wasHealthy && entry.Healthy, - }) + results = append(results, updateHeartbeatEntry(entry, status, ts, now)) } return results } +func (r *Registry) UpdateHeartbeatByKey(key string, status string, ts time.Time, now time.Time) (UpdateResult, bool) { + key = strings.TrimSpace(key) + if key == "" { + return UpdateResult{}, false + } + if status == "" { + status = "ok" + } + if ts.IsZero() { + ts = now + } + + r.mu.Lock() + defer r.mu.Unlock() + + entry := r.entries[key] + if entry == nil { + return UpdateResult{}, false + } + + return updateHeartbeatEntry(entry, status, ts, now), true +} + func (r *Registry) Delete(key string) bool { key = strings.TrimSpace(key) if key == "" { @@ -181,6 +194,20 @@ func (r *Registry) Delete(key string) bool { return true } +func updateHeartbeatEntry(entry *RegistryEntry, status string, ts time.Time, now time.Time) UpdateResult { + wasHealthy := entry.isHealthyAt(now) + entry.Status = status + entry.LastHeartbeat = ts + entry.Healthy = entry.isHealthyAt(now) + + return UpdateResult{ + Entry: *entry, + IsNew: false, + WasHealthy: wasHealthy, + BecameHealthy: !wasHealthy && entry.Healthy, + } +} + // DeleteStale removes entries whose heartbeat timeout has elapsed. func (r *Registry) DeleteStale(now time.Time) []string { r.mu.Lock() diff --git a/api/pkg/discovery/registry_heartbeat_test.go b/api/pkg/discovery/registry_heartbeat_test.go new file mode 100644 index 00000000..f9459b38 --- /dev/null +++ b/api/pkg/discovery/registry_heartbeat_test.go @@ -0,0 +1,133 @@ +package discovery + +import ( + "testing" + "time" +) + +func TestRegistryUpdateHeartbeatByKey_UpdatesOnlyTargetEntry(t *testing.T) { + now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC) + oldHeartbeat := now.Add(-45 * time.Second) + registry := NewRegistry() + + oldVersion := RegistryEntry{ + ID: "mcards", + InstanceID: "inst-1", + Service: "CARD", + Rail: RailCardPayout, + Operations: []string{OperationSend}, + Version: "3.0.0-old", + Status: "ok", + Health: HealthParams{TimeoutSec: 30}, + LastHeartbeat: oldHeartbeat, + } + newVersion := RegistryEntry{ + ID: "mcards", + InstanceID: "inst-1", + Service: "CARD", + Rail: RailCardPayout, + Operations: []string{OperationSend}, + Version: "3.0.0-new", + Status: "ok", + Health: HealthParams{TimeoutSec: 30}, + LastHeartbeat: oldHeartbeat, + } + + registry.UpsertEntry(oldVersion, now) + registry.UpsertEntry(newVersion, now) + + oldKey := registryEntryKey(normalizeEntry(oldVersion)) + newKey := registryEntryKey(normalizeEntry(newVersion)) + + result, found := registry.UpdateHeartbeatByKey(newKey, "ok", now, now) + if !found { + t.Fatalf("expected heartbeat target key to exist: %q", newKey) + } + if got := registryEntryKey(normalizeEntry(result.Entry)); got != newKey { + t.Fatalf("unexpected updated key: got=%q want=%q", got, newKey) + } + + entries := registry.List(now, false) + if len(entries) != 2 { + t.Fatalf("unexpected entries count: got=%d want=2", len(entries)) + } + + sawOld := false + sawNew := false + for _, entry := range entries { + key := registryEntryKey(normalizeEntry(entry)) + switch key { + case oldKey: + sawOld = true + if !entry.LastHeartbeat.Equal(oldHeartbeat) { + t.Fatalf("old key heartbeat changed unexpectedly: got=%s want=%s", entry.LastHeartbeat, oldHeartbeat) + } + if entry.Healthy { + t.Fatalf("old key should remain unhealthy") + } + case newKey: + sawNew = true + if !entry.LastHeartbeat.Equal(now) { + t.Fatalf("new key heartbeat not updated: got=%s want=%s", entry.LastHeartbeat, now) + } + if !entry.Healthy { + t.Fatalf("new key should be healthy after heartbeat") + } + } + } + if !sawOld { + t.Fatalf("old key entry not found after update: %q", oldKey) + } + if !sawNew { + t.Fatalf("new key entry not found after update: %q", newKey) + } +} + +func TestRegistryUpdateHeartbeat_LegacyFallbackUpdatesAllMatchingInstanceEntries(t *testing.T) { + now := time.Date(2026, 2, 27, 12, 0, 0, 0, time.UTC) + oldHeartbeat := now.Add(-45 * time.Second) + registry := NewRegistry() + + entries := []RegistryEntry{ + { + ID: "mcards", + InstanceID: "inst-1", + Service: "CARD", + Rail: RailCardPayout, + Operations: []string{OperationSend}, + Version: "3.0.0-old", + Status: "ok", + Health: HealthParams{TimeoutSec: 30}, + LastHeartbeat: oldHeartbeat, + }, + { + ID: "mcards", + InstanceID: "inst-1", + Service: "CARD", + Rail: RailCardPayout, + Operations: []string{OperationSend}, + Version: "3.0.0-new", + Status: "ok", + Health: HealthParams{TimeoutSec: 30}, + LastHeartbeat: oldHeartbeat, + }, + } + for _, entry := range entries { + registry.UpsertEntry(entry, now) + } + + results := registry.UpdateHeartbeat("mcards", "inst-1", "ok", now, now) + if len(results) != 2 { + t.Fatalf("unexpected updated count: got=%d want=2", len(results)) + } + + all := registry.List(now, false) + for _, entry := range all { + if !entry.LastHeartbeat.Equal(now) { + t.Fatalf("entry heartbeat not updated: key=%s got=%s want=%s", registryEntryKey(normalizeEntry(entry)), entry.LastHeartbeat, now) + } + if !entry.Healthy { + t.Fatalf("entry should be healthy after legacy heartbeat: key=%s", registryEntryKey(normalizeEntry(entry))) + } + } +} diff --git a/api/pkg/discovery/service.go b/api/pkg/discovery/service.go index 7e718f83..97088310 100644 --- a/api/pkg/discovery/service.go +++ b/api/pkg/discovery/service.go @@ -216,22 +216,35 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) (e s.logWarn("Failed to decode discovery heartbeat payload", fields...) return err } - s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("status", payload.Status))...) - if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" { + entryKey := strings.TrimSpace(payload.EntryKey) + s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("entry_key", entryKey), zap.String("status", payload.Status))...) + if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" { return nil } - if strings.TrimSpace(payload.InstanceID) == "" { + if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" { fields := append(envelopeFields(env), zap.String("id", payload.ID)) s.logWarn("Discovery heartbeat missing instance id", fields...) } + status := strings.TrimSpace(payload.Status) ts := time.Unix(payload.TS, 0) if ts.Unix() <= 0 { ts = time.Now() } - results := s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, strings.TrimSpace(payload.Status), ts, time.Now()) - if len(results) == 0 { - s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID)) - return nil + now := time.Now() + var results []UpdateResult + if entryKey != "" { + result, found := s.registry.UpdateHeartbeatByKey(entryKey, status, ts, now) + if !found { + s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("entry_key", entryKey), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID)) + return nil + } + results = append(results, result) + } else { + results = s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, status, ts, now) + if len(results) == 0 { + s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID)) + return nil + } } for _, result := range results { if result.BecameHealthy { diff --git a/api/pkg/discovery/types.go b/api/pkg/discovery/types.go index 1f904d63..76f9a91b 100644 --- a/api/pkg/discovery/types.go +++ b/api/pkg/discovery/types.go @@ -116,6 +116,7 @@ type Announcement struct { type Heartbeat struct { ID string `json:"id"` InstanceID string `json:"instanceId"` + EntryKey string `json:"entryKey,omitempty"` Status string `json:"status"` TS int64 `json:"ts"` }