package discovery import ( "encoding/json" "errors" "os" "strconv" "strings" "time" "github.com/nats-io/nats.go" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) const ( DefaultKVBucket = "discovery_registry" kvTTLSecondsEnv = "DISCOVERY_KV_TTL_SECONDS" defaultKVTTLSeconds = 3600 ) type KVStore struct { logger mlogger.Logger kv nats.KeyValue bucket string } func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string) (*KVStore, error) { if js == nil { return nil, errors.New("discovery kv: jetstream is nil") } if logger != nil { logger = logger.Named("discovery_kv") } bucket = strings.TrimSpace(bucket) if bucket == "" { bucket = DefaultKVBucket } ttl := kvTTL(logger) kv, err := js.KeyValue(bucket) if err != nil { if errors.Is(err, nats.ErrBucketNotFound) { kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ Bucket: bucket, Description: "service discovery registry", History: 1, TTL: ttl, }) if err == nil && logger != nil { fields := []zap.Field{zap.String("bucket", bucket)} if ttl > 0 { fields = append(fields, zap.Duration("ttl", ttl)) } logger.Info("Discovery KV bucket created", fields...) } } if err != nil { return nil, err } } else if ttl > 0 { ensureKVTTL(logger, js, kv, bucket, ttl) } return &KVStore{ logger: logger, kv: kv, bucket: bucket, }, nil } func kvTTL(logger mlogger.Logger) time.Duration { raw := strings.TrimSpace(os.Getenv(kvTTLSecondsEnv)) if raw == "" { return time.Duration(defaultKVTTLSeconds) * time.Second } secs, err := strconv.Atoi(raw) if err != nil { if logger != nil { logger.Warn("Invalid discovery KV TTL seconds, using default", zap.String("env", kvTTLSecondsEnv), zap.String("value", raw), zap.Int("default_seconds", defaultKVTTLSeconds)) } return time.Duration(defaultKVTTLSeconds) * time.Second } if secs <= 0 { return 0 } return time.Duration(secs) * time.Second } func ensureKVTTL(logger mlogger.Logger, js nats.JetStreamContext, kv nats.KeyValue, bucket string, ttl time.Duration) { if kv == nil || js == nil || ttl <= 0 { return } status, err := kv.Status() if err != nil { if logger != nil { logger.Warn("Failed to read discovery KV status", zap.String("bucket", bucket), zap.Error(err)) } return } if status.TTL() == ttl { return } stream := "KV_" + bucket info, err := js.StreamInfo(stream) if err != nil { if logger != nil { logger.Warn("Failed to read discovery KV stream info", zap.String("bucket", bucket), zap.String("stream", stream), zap.Error(err)) } return } cfg := info.Config cfg.MaxAge = ttl if _, err := js.UpdateStream(&cfg); err != nil { if logger != nil { logger.Warn("Failed to update discovery KV TTL", zap.String("bucket", bucket), zap.Duration("ttl", ttl), zap.Error(err)) } return } if logger != nil { logger.Info("Discovery KV TTL updated", zap.String("bucket", bucket), zap.Duration("ttl", ttl)) } } func (s *KVStore) Put(entry RegistryEntry) error { if s == nil || s.kv == nil { return errors.New("discovery kv: not configured") } key := registryEntryKey(normalizeEntry(entry)) if key == "" { return errors.New("discovery kv: entry key is empty") } payload, err := json.Marshal(entry) if err != nil { return err } _, err = s.kv.Put(kvKeyFromRegistryKey(key), payload) if err != nil && s.logger != nil { fields := append(entryFields(entry), zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err)) s.logger.Warn("Failed to persist discovery entry", fields...) } return err } func (s *KVStore) Delete(id string) error { if s == nil || s.kv == nil { return errors.New("discovery kv: not configured") } key := kvKeyFromRegistryKey(id) if key == "" { return nil } if err := s.kv.Delete(key); err != nil && s.logger != nil { s.logger.Warn("Failed to delete discovery entry", zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err)) return err } return nil } func (s *KVStore) WatchAll() (nats.KeyWatcher, error) { if s == nil || s.kv == nil { return nil, errors.New("discovery kv: not configured") } return s.kv.WatchAll() } func (s *KVStore) Bucket() string { if s == nil { return "" } return s.bucket }