package discovery import ( "context" "encoding/json" "errors" "strings" "sync" "time" "github.com/nats-io/nats.go" "github.com/tech/sendico/pkg/merrors" msg "github.com/tech/sendico/pkg/messaging" mb "github.com/tech/sendico/pkg/messaging/broker" cons "github.com/tech/sendico/pkg/messaging/consumer" me "github.com/tech/sendico/pkg/messaging/envelope" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) const DefaultRegistryStaleCleanupInterval = time.Duration(DefaultHealthIntervalSec) * time.Second type RegistryOption func(*RegistryService) func WithRegistryKVTTL(ttl time.Duration) RegistryOption { return func(s *RegistryService) { if s == nil { return } s.kvOptions = append(s.kvOptions, WithKVTTL(ttl)) } } // WithRegistryStaleCleanupInterval configures periodic stale-entry eviction. func WithRegistryStaleCleanupInterval(interval time.Duration) RegistryOption { return func(s *RegistryService) { if s == nil { return } s.staleCleanupInterval = interval } } type RegistryService struct { logger mlogger.Logger registry *Registry producer msg.Producer sender string consumers []consumerHandler kv *KVStore kvWatcher nats.KeyWatcher kvOptions []KVStoreOption staleCleanupInterval time.Duration staleCleanupStop chan struct{} staleCleanupDone chan struct{} startOnce sync.Once stopOnce sync.Once } type consumerHandler struct { consumer msg.Consumer handler msg.MessageHandlerT event string } func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer, registry *Registry, sender string, opts ...RegistryOption) (*RegistryService, error) { if msgBroker == nil { return nil, merrors.InvalidArgument("discovery registry: broker is nil", "broker") } if registry == nil { registry = NewRegistry() } if logger == nil { return nil, merrors.InvalidArgument("discovery registry: no logger provided", "logger") } logger = logger.Named("discovery_registry") initMetrics() sender = strings.TrimSpace(sender) if sender == "" { sender = "discovery" } serviceConsumer, err := cons.NewConsumer(logger, msgBroker, ServiceAnnounceEvent()) if err != nil { return nil, err } gatewayConsumer, err := cons.NewConsumer(logger, msgBroker, GatewayAnnounceEvent()) if err != nil { return nil, err } heartbeatConsumer, err := cons.NewConsumer(logger, msgBroker, HeartbeatEvent()) if err != nil { return nil, err } lookupConsumer, err := cons.NewConsumer(logger, msgBroker, LookupRequestEvent()) if err != nil { return nil, err } svc := &RegistryService{ logger: logger, registry: registry, producer: producer, sender: sender, staleCleanupInterval: DefaultRegistryStaleCleanupInterval, } for _, opt := range opts { if opt != nil { opt(svc) } } svc.consumers = []consumerHandler{ {consumer: serviceConsumer, event: ServiceAnnounceEvent().ToString(), handler: func(ctx context.Context, env me.Envelope) error { return svc.handleAnnounce(ctx, env) }}, {consumer: gatewayConsumer, event: GatewayAnnounceEvent().ToString(), handler: func(ctx context.Context, env me.Envelope) error { return svc.handleAnnounce(ctx, env) }}, {consumer: heartbeatConsumer, event: HeartbeatEvent().ToString(), handler: svc.handleHeartbeat}, {consumer: lookupConsumer, event: LookupRequestEvent().ToString(), handler: svc.handleLookup}, } svc.initKV(msgBroker) return svc, nil } func (s *RegistryService) Start() { if s == nil { return } s.startOnce.Do(func() { fields := []zap.Field{zap.Int("consumers", len(s.consumers)), zap.Bool("kv_enabled", s.kv != nil)} if s.kv != nil { if bucket := s.kv.Bucket(); bucket != "" { fields = append(fields, zap.String("kv_bucket", bucket)) } } s.logInfo("Discovery registry service starting", fields...) for _, ch := range s.consumers { ch := ch go func() { if err := ch.consumer.ConsumeMessages(ch.handler); err != nil { s.logger.Warn("Discovery consumer stopped with error", zap.String("event", ch.event), zap.Error(err)) } }() } s.startKVWatch() s.startStaleCleanup() }) } func (s *RegistryService) Stop() { if s == nil { return } s.stopOnce.Do(func() { for _, ch := range s.consumers { if ch.consumer != nil { ch.consumer.Close() } } if s.kvWatcher != nil { _ = s.kvWatcher.Stop() } if s.staleCleanupStop != nil { close(s.staleCleanupStop) } if s.staleCleanupDone != nil { <-s.staleCleanupDone } s.logInfo("Discovery registry service stopped") }) } func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) (err error) { start := time.Now() defer func() { observeEvent("announce", err, time.Since(start)) }() var payload Announcement if err = json.Unmarshal(env.GetData(), &payload); err != nil { fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err)) s.logWarn("Failed to decode discovery announce payload", fields...) return err } s.logDebug("Discovery announce received", append(envelopeFields(env), announcementFields(payload)...)...) if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" { fields := append(envelopeFields(env), announcementFields(payload)...) s.logWarn("Discovery announce missing id and instance id", fields...) return nil } if strings.TrimSpace(payload.InstanceID) == "" { fields := append(envelopeFields(env), announcementFields(payload)...) s.logWarn("Discovery announce missing instance id", fields...) } now := time.Now() result := s.registry.UpsertFromAnnouncement(payload, now) s.persistEntry(result.Entry) if result.IsNew || result.BecameHealthy { s.logInfo("Discovery registry entry updated", append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy))...) s.publishRefresh(result.Entry) } return nil } func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) (err error) { start := time.Now() defer func() { observeEvent("heartbeat", err, time.Since(start)) }() var payload Heartbeat if err = json.Unmarshal(env.GetData(), &payload); err != nil { fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err)) s.logWarn("Failed to decode discovery heartbeat payload", fields...) return err } entryKey := strings.TrimSpace(payload.EntryKey) s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("entry_key", entryKey), zap.String("status", payload.Status))...) if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" { return nil } if entryKey == "" && strings.TrimSpace(payload.InstanceID) == "" { fields := append(envelopeFields(env), zap.String("id", payload.ID)) s.logWarn("Discovery heartbeat missing instance id", fields...) } status := strings.TrimSpace(payload.Status) ts := time.Unix(payload.TS, 0) if ts.Unix() <= 0 { ts = time.Now() } now := time.Now() var results []UpdateResult if entryKey != "" { result, found := s.registry.UpdateHeartbeatByKey(entryKey, status, ts, now) if !found { s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("entry_key", entryKey), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID)) return nil } results = append(results, result) } else { results = s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, status, ts, now) if len(results) == 0 { s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID)) return nil } } for _, result := range results { if result.BecameHealthy { s.logInfo("Discovery registry entry became healthy", append(entryFields(result.Entry), zap.String("status", result.Entry.Status))...) s.publishRefresh(result.Entry) } s.persistEntry(result.Entry) } return nil } func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) (err error) { start := time.Now() defer func() { observeEvent("lookup", err, time.Since(start)) }() if s.producer == nil { s.logWarn("Discovery lookup request ignored: producer not configured", envelopeFields(env)...) return nil } var payload LookupRequest if err = json.Unmarshal(env.GetData(), &payload); err != nil { fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err)) s.logWarn("Failed to decode discovery lookup payload", fields...) return err } resp := s.registry.Lookup(time.Now()) resp.RequestID = strings.TrimSpace(payload.RequestID) s.logDebug("Discovery lookup prepared", zap.String("request_id", resp.RequestID), zap.Int("services", len(resp.Services)), zap.Int("gateways", len(resp.Gateways))) if err = s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil { fields := []zap.Field{zap.String("request_id", resp.RequestID), zap.Error(err)} s.logWarn("Failed to publish discovery lookup response", fields...) return err } return nil } func (s *RegistryService) publishRefresh(entry RegistryEntry) { if s == nil || s.producer == nil { return } payload := RefreshEvent{ InstanceID: entry.InstanceID, Service: entry.Service, Rail: entry.Rail, Network: entry.Network, Message: "new module available", } if err := s.producer.SendMessage(NewRefreshUIEnvelope(s.sender, payload)); err != nil { fields := append(entryFields(entry), zap.Error(err)) s.logWarn("Failed to publish discovery refresh event", fields...) } } type jetStreamProvider interface { JetStream() nats.JetStreamContext } func (s *RegistryService) initKV(msgBroker mb.Broker) { if s == nil || msgBroker == nil { return } provider, ok := msgBroker.(jetStreamProvider) if !ok { s.logInfo("Discovery KV disabled: broker does not support JetStream") return } js := provider.JetStream() if js == nil { s.logWarn("Discovery KV disabled: JetStream not configured") return } store, err := NewKVStore(s.logger, js, "", s.kvOptions...) if err != nil { s.logWarn("Failed to initialise discovery KV store", zap.Error(err)) return } s.kv = store } func (s *RegistryService) startKVWatch() { if s == nil || s.kv == nil { return } watcher, err := s.kv.WatchAll() if err != nil { s.logWarn("Failed to start discovery KV watch", zap.Error(err)) return } s.kvWatcher = watcher if bucket := s.kv.Bucket(); bucket != "" { s.logInfo("Discovery KV watch started", zap.String("bucket", bucket)) } go s.consumeKVUpdates(watcher) } func (s *RegistryService) startStaleCleanup() { if s == nil || s.registry == nil || s.staleCleanupInterval <= 0 { return } s.staleCleanupStop = make(chan struct{}) s.staleCleanupDone = make(chan struct{}) go func() { defer close(s.staleCleanupDone) ticker := time.NewTicker(s.staleCleanupInterval) defer ticker.Stop() s.pruneStaleEntries(time.Now()) for { select { case <-ticker.C: s.pruneStaleEntries(time.Now()) case <-s.staleCleanupStop: return } } }() } func (s *RegistryService) pruneStaleEntries(now time.Time) { if s == nil || s.registry == nil { return } removedKeys := s.registry.DeleteStale(now) if len(removedKeys) == 0 { return } for _, key := range removedKeys { if strings.TrimSpace(key) == "" { continue } s.logInfo("Discovery registry stale entry removed", zap.String("key", key)) if s.kv == nil { continue } if err := s.kv.Delete(key); err != nil && !errors.Is(err, nats.ErrKeyNotFound) { s.logWarn("Failed to delete stale discovery KV entry", zap.String("key", key), zap.Error(err)) } } } func (s *RegistryService) consumeKVUpdates(watcher nats.KeyWatcher) { if s == nil || watcher == nil { return } initial := true initialCount := 0 for entry := range watcher.Updates() { if entry == nil { if initial { fields := []zap.Field{zap.Int("entries", initialCount)} if s.kv != nil { if bucket := s.kv.Bucket(); bucket != "" { fields = append(fields, zap.String("bucket", bucket)) } } s.logInfo("Discovery KV initial sync complete", fields...) initial = false } continue } if initial && entry.Operation() == nats.KeyValuePut { initialCount++ } switch entry.Operation() { case nats.KeyValueDelete, nats.KeyValuePurge: key := registryKeyFromKVKey(entry.Key()) if key != "" { if s.registry.Delete(key) { s.logInfo("Discovery registry entry removed", zap.String("key", key)) } } continue case nats.KeyValuePut: default: continue } var payload RegistryEntry if err := json.Unmarshal(entry.Value(), &payload); err != nil { s.logWarn("Failed to decode discovery KV entry", zap.String("key", entry.Key()), zap.Error(err)) continue } result := s.registry.UpsertEntry(payload, time.Now()) if result.IsNew || result.BecameHealthy { s.logInfo("Discovery registry entry updated from KV", append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy))...) s.publishRefresh(result.Entry) } } } func (s *RegistryService) persistEntry(entry RegistryEntry) { if s == nil || s.kv == nil { return } if err := s.kv.Put(entry); err != nil { s.logWarn("Failed to persist discovery entry", append(entryFields(entry), zap.Error(err))...) } } func (s *RegistryService) logWarn(message string, fields ...zap.Field) { if s == nil { return } s.logger.Warn(message, fields...) } func (s *RegistryService) logDebug(message string, fields ...zap.Field) { if s == nil { return } s.logger.Debug(message, fields...) } func (s *RegistryService) logInfo(message string, fields ...zap.Field) { if s == nil { return } s.logger.Info(message, fields...) }