package discovery import ( "encoding/json" "errors" "strings" "time" "github.com/nats-io/nats.go" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) const DefaultKVBucket = "discovery_registry" type kvStoreOptions struct { ttl time.Duration ttlSet bool } type KVStoreOption func(*kvStoreOptions) func WithKVTTL(ttl time.Duration) KVStoreOption { return func(opts *kvStoreOptions) { if opts == nil { return } opts.ttl = ttl opts.ttlSet = true } } func newKVStoreOptions(opts ...KVStoreOption) kvStoreOptions { var options kvStoreOptions for _, opt := range opts { if opt != nil { opt(&options) } } return options } type KVStore struct { logger mlogger.Logger kv nats.KeyValue bucket string } func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string, opts ...KVStoreOption) (*KVStore, error) { if js == nil { return nil, errors.New("discovery kv: jetstream is nil") } if logger == nil { logger = zap.NewNop() } logger = logger.Named("discovery_kv") bucket = strings.TrimSpace(bucket) if bucket == "" { bucket = DefaultKVBucket } options := newKVStoreOptions(opts...) ttl := options.ttl kv, err := js.KeyValue(bucket) if err != nil { if errors.Is(err, nats.ErrBucketNotFound) { kv, err = js.CreateKeyValue(&nats.KeyValueConfig{ Bucket: bucket, Description: "service discovery registry", History: 1, TTL: ttl, }) if err == nil { fields := []zap.Field{zap.String("bucket", bucket)} if options.ttlSet { fields = append(fields, zap.Duration("ttl", ttl)) } logger.Info("Discovery KV bucket created", fields...) } } if err != nil { return nil, err } } else if options.ttlSet { ensureKVTTL(logger, js, kv, bucket, ttl) } return &KVStore{ logger: logger, kv: kv, bucket: bucket, }, nil } func ensureKVTTL(logger mlogger.Logger, js nats.JetStreamContext, kv nats.KeyValue, bucket string, ttl time.Duration) { if kv == nil || js == nil { return } status, err := kv.Status() if err != nil { logger.Warn("Failed to read discovery KV status", zap.String("bucket", bucket), zap.Error(err)) return } if status.TTL() == ttl { return } stream := "KV_" + bucket info, err := js.StreamInfo(stream) if err != nil { logger.Warn("Failed to read discovery KV stream info", zap.String("bucket", bucket), zap.String("stream", stream), zap.Error(err)) return } cfg := info.Config cfg.MaxAge = ttl if _, err := js.UpdateStream(&cfg); err != nil { logger.Warn("Failed to update discovery KV TTL", zap.String("bucket", bucket), zap.Duration("ttl", ttl), zap.Error(err)) return } logger.Info("Discovery KV TTL updated", zap.String("bucket", bucket), zap.Duration("ttl", ttl)) } func (s *KVStore) Put(entry RegistryEntry) error { if s == nil || s.kv == nil { return errors.New("discovery kv: not configured") } key := registryEntryKey(normalizeEntry(entry)) if key == "" { return errors.New("discovery kv: entry key is empty") } payload, err := json.Marshal(entry) if err != nil { return err } _, err = s.kv.Put(kvKeyFromRegistryKey(key), payload) if err != nil { fields := append(entryFields(entry), zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err)) s.logger.Warn("Failed to persist discovery entry", fields...) } return err } func (s *KVStore) Delete(id string) error { if s == nil || s.kv == nil { return errors.New("discovery kv: not configured") } key := kvKeyFromRegistryKey(id) if key == "" { return nil } if err := s.kv.Delete(key); err != nil { s.logger.Warn("Failed to delete discovery entry", zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err)) return err } return nil } func (s *KVStore) WatchAll() (nats.KeyWatcher, error) { if s == nil || s.kv == nil { return nil, errors.New("discovery kv: not configured") } return s.kv.WatchAll() } func (s *KVStore) Bucket() string { if s == nil { return "" } return s.bucket }