package discovery import ( "context" "encoding/json" "strings" "sync" "time" "github.com/nats-io/nats.go" "github.com/tech/sendico/pkg/merrors" msg "github.com/tech/sendico/pkg/messaging" mb "github.com/tech/sendico/pkg/messaging/broker" cons "github.com/tech/sendico/pkg/messaging/consumer" me "github.com/tech/sendico/pkg/messaging/envelope" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) type RegistryOption func(*RegistryService) func WithRegistryKVTTL(ttl time.Duration) RegistryOption { return func(s *RegistryService) { if s == nil { return } s.kvOptions = append(s.kvOptions, WithKVTTL(ttl)) } } type RegistryService struct { logger mlogger.Logger registry *Registry producer msg.Producer sender string consumers []consumerHandler kv *KVStore kvWatcher nats.KeyWatcher kvOptions []KVStoreOption startOnce sync.Once stopOnce sync.Once } type consumerHandler struct { consumer msg.Consumer handler msg.MessageHandlerT event string } func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer, registry *Registry, sender string, opts ...RegistryOption) (*RegistryService, error) { if msgBroker == nil { return nil, merrors.InvalidArgument("discovery registry: broker is nil", "broker") } if registry == nil { registry = NewRegistry() } if logger == nil { return nil, merrors.InvalidArgument("discovery registry: no logger provided", "logger") } logger = logger.Named("discovery_registry") sender = strings.TrimSpace(sender) if sender == "" { sender = "discovery" } serviceConsumer, err := cons.NewConsumer(logger, msgBroker, ServiceAnnounceEvent()) if err != nil { return nil, err } gatewayConsumer, err := cons.NewConsumer(logger, msgBroker, GatewayAnnounceEvent()) if err != nil { return nil, err } heartbeatConsumer, err := cons.NewConsumer(logger, msgBroker, HeartbeatEvent()) if err != nil { return nil, err } lookupConsumer, err := cons.NewConsumer(logger, msgBroker, LookupRequestEvent()) if err != nil { return nil, err } svc := &RegistryService{ logger: logger, registry: registry, producer: producer, sender: sender, } for _, opt := range opts { if opt != nil { opt(svc) } } svc.consumers = []consumerHandler{ {consumer: serviceConsumer, event: ServiceAnnounceEvent().ToString(), handler: func(ctx context.Context, env me.Envelope) error { return svc.handleAnnounce(ctx, env) }}, {consumer: gatewayConsumer, event: GatewayAnnounceEvent().ToString(), handler: func(ctx context.Context, env me.Envelope) error { return svc.handleAnnounce(ctx, env) }}, {consumer: heartbeatConsumer, event: HeartbeatEvent().ToString(), handler: svc.handleHeartbeat}, {consumer: lookupConsumer, event: LookupRequestEvent().ToString(), handler: svc.handleLookup}, } svc.initKV(msgBroker) return svc, nil } func (s *RegistryService) Start() { if s == nil { return } s.startOnce.Do(func() { fields := []zap.Field{zap.Int("consumers", len(s.consumers)), zap.Bool("kv_enabled", s.kv != nil)} if s.kv != nil { if bucket := s.kv.Bucket(); bucket != "" { fields = append(fields, zap.String("kv_bucket", bucket)) } } s.logInfo("Discovery registry service starting", fields...) for _, ch := range s.consumers { ch := ch go func() { if err := ch.consumer.ConsumeMessages(ch.handler); err != nil { s.logger.Warn("Discovery consumer stopped with error", zap.String("event", ch.event), zap.Error(err)) } }() } s.startKVWatch() }) } func (s *RegistryService) Stop() { if s == nil { return } s.stopOnce.Do(func() { for _, ch := range s.consumers { if ch.consumer != nil { ch.consumer.Close() } } if s.kvWatcher != nil { _ = s.kvWatcher.Stop() } s.logInfo("Discovery registry service stopped") }) } func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) error { var payload Announcement if err := json.Unmarshal(env.GetData(), &payload); err != nil { fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err)) s.logWarn("Failed to decode discovery announce payload", fields...) return err } s.logDebug("Discovery announce received", append(envelopeFields(env), announcementFields(payload)...)...) if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" { fields := append(envelopeFields(env), announcementFields(payload)...) s.logWarn("Discovery announce missing id and instance id", fields...) return nil } if strings.TrimSpace(payload.InstanceID) == "" { fields := append(envelopeFields(env), announcementFields(payload)...) s.logWarn("Discovery announce missing instance id", fields...) } now := time.Now() result := s.registry.UpsertFromAnnouncement(payload, now) s.persistEntry(result.Entry) if result.IsNew || result.BecameHealthy { s.logInfo("Discovery registry entry updated", append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy))...) s.publishRefresh(result.Entry) } return nil } func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) error { var payload Heartbeat if err := json.Unmarshal(env.GetData(), &payload); err != nil { fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err)) s.logWarn("Failed to decode discovery heartbeat payload", fields...) return err } s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("status", payload.Status))...) if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" { return nil } if strings.TrimSpace(payload.InstanceID) == "" { fields := append(envelopeFields(env), zap.String("id", payload.ID)) s.logWarn("Discovery heartbeat missing instance id", fields...) } ts := time.Unix(payload.TS, 0) if ts.Unix() <= 0 { ts = time.Now() } results := s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, strings.TrimSpace(payload.Status), ts, time.Now()) if len(results) == 0 { s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID)) return nil } for _, result := range results { if result.BecameHealthy { s.logInfo("Discovery registry entry became healthy", append(entryFields(result.Entry), zap.String("status", result.Entry.Status))...) s.publishRefresh(result.Entry) } s.persistEntry(result.Entry) } return nil } func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) error { if s.producer == nil { s.logWarn("Discovery lookup request ignored: producer not configured", envelopeFields(env)...) return nil } var payload LookupRequest if err := json.Unmarshal(env.GetData(), &payload); err != nil { fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err)) s.logWarn("Failed to decode discovery lookup payload", fields...) return err } resp := s.registry.Lookup(time.Now()) resp.RequestID = strings.TrimSpace(payload.RequestID) s.logDebug("Discovery lookup prepared", zap.String("request_id", resp.RequestID), zap.Int("services", len(resp.Services)), zap.Int("gateways", len(resp.Gateways))) if err := s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil { fields := []zap.Field{zap.String("request_id", resp.RequestID), zap.Error(err)} s.logWarn("Failed to publish discovery lookup response", fields...) return err } return nil } func (s *RegistryService) publishRefresh(entry RegistryEntry) { if s == nil || s.producer == nil { return } payload := RefreshEvent{ InstanceID: entry.InstanceID, Service: entry.Service, Rail: entry.Rail, Network: entry.Network, Message: "new module available", } if err := s.producer.SendMessage(NewRefreshUIEnvelope(s.sender, payload)); err != nil { fields := append(entryFields(entry), zap.Error(err)) s.logWarn("Failed to publish discovery refresh event", fields...) } } type jetStreamProvider interface { JetStream() nats.JetStreamContext } func (s *RegistryService) initKV(msgBroker mb.Broker) { if s == nil || msgBroker == nil { return } provider, ok := msgBroker.(jetStreamProvider) if !ok { s.logInfo("Discovery KV disabled: broker does not support JetStream") return } js := provider.JetStream() if js == nil { s.logWarn("Discovery KV disabled: JetStream not configured") return } store, err := NewKVStore(s.logger, js, "", s.kvOptions...) if err != nil { s.logWarn("Failed to initialise discovery KV store", zap.Error(err)) return } s.kv = store } func (s *RegistryService) startKVWatch() { if s == nil || s.kv == nil { return } watcher, err := s.kv.WatchAll() if err != nil { s.logWarn("Failed to start discovery KV watch", zap.Error(err)) return } s.kvWatcher = watcher if bucket := s.kv.Bucket(); bucket != "" { s.logInfo("Discovery KV watch started", zap.String("bucket", bucket)) } go s.consumeKVUpdates(watcher) } func (s *RegistryService) consumeKVUpdates(watcher nats.KeyWatcher) { if s == nil || watcher == nil { return } initial := true initialCount := 0 for entry := range watcher.Updates() { if entry == nil { if initial { fields := []zap.Field{zap.Int("entries", initialCount)} if s.kv != nil { if bucket := s.kv.Bucket(); bucket != "" { fields = append(fields, zap.String("bucket", bucket)) } } s.logInfo("Discovery KV initial sync complete", fields...) initial = false } continue } if initial && entry.Operation() == nats.KeyValuePut { initialCount++ } switch entry.Operation() { case nats.KeyValueDelete, nats.KeyValuePurge: key := registryKeyFromKVKey(entry.Key()) if key != "" { if s.registry.Delete(key) { s.logInfo("Discovery registry entry removed", zap.String("key", key)) } } continue case nats.KeyValuePut: default: continue } var payload RegistryEntry if err := json.Unmarshal(entry.Value(), &payload); err != nil { s.logWarn("Failed to decode discovery KV entry", zap.String("key", entry.Key()), zap.Error(err)) continue } result := s.registry.UpsertEntry(payload, time.Now()) if result.IsNew || result.BecameHealthy { s.logInfo("Discovery registry entry updated from KV", append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy))...) s.publishRefresh(result.Entry) } } } func (s *RegistryService) persistEntry(entry RegistryEntry) { if s == nil || s.kv == nil { return } if err := s.kv.Put(entry); err != nil { s.logWarn("Failed to persist discovery entry", append(entryFields(entry), zap.Error(err))...) } } func (s *RegistryService) logWarn(message string, fields ...zap.Field) { if s == nil { return } s.logger.Warn(message, fields...) } func (s *RegistryService) logDebug(message string, fields ...zap.Field) { if s == nil { return } s.logger.Debug(message, fields...) } func (s *RegistryService) logInfo(message string, fields ...zap.Field) { if s == nil { return } s.logger.Info(message, fields...) }