Files
sendico/api/pkg/discovery/kv.go
2026-01-06 14:20:08 +01:00

172 lines
4.2 KiB
Go

package discovery
import (
"encoding/json"
"errors"
"os"
"strconv"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap"
)
const (
DefaultKVBucket = "discovery_registry"
kvTTLSecondsEnv = "DISCOVERY_KV_TTL_SECONDS"
defaultKVTTLSeconds = 3600
)
type KVStore struct {
logger mlogger.Logger
kv nats.KeyValue
bucket string
}
func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string) (*KVStore, error) {
if js == nil {
return nil, errors.New("discovery kv: jetstream is nil")
}
if logger != nil {
logger = logger.Named("discovery_kv")
}
bucket = strings.TrimSpace(bucket)
if bucket == "" {
bucket = DefaultKVBucket
}
ttl := kvTTL(logger)
kv, err := js.KeyValue(bucket)
if err != nil {
if errors.Is(err, nats.ErrBucketNotFound) {
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: bucket,
Description: "service discovery registry",
History: 1,
TTL: ttl,
})
if err == nil && logger != nil {
fields := []zap.Field{zap.String("bucket", bucket)}
if ttl > 0 {
fields = append(fields, zap.Duration("ttl", ttl))
}
logger.Info("Discovery KV bucket created", fields...)
}
}
if err != nil {
return nil, err
}
} else if ttl > 0 {
ensureKVTTL(logger, js, kv, bucket, ttl)
}
return &KVStore{
logger: logger,
kv: kv,
bucket: bucket,
}, nil
}
func kvTTL(logger mlogger.Logger) time.Duration {
raw := strings.TrimSpace(os.Getenv(kvTTLSecondsEnv))
if raw == "" {
return time.Duration(defaultKVTTLSeconds) * time.Second
}
secs, err := strconv.Atoi(raw)
if err != nil {
if logger != nil {
logger.Warn("Invalid discovery KV TTL seconds, using default", zap.String("env", kvTTLSecondsEnv), zap.String("value", raw), zap.Int("default_seconds", defaultKVTTLSeconds))
}
return time.Duration(defaultKVTTLSeconds) * time.Second
}
if secs <= 0 {
return 0
}
return time.Duration(secs) * time.Second
}
func ensureKVTTL(logger mlogger.Logger, js nats.JetStreamContext, kv nats.KeyValue, bucket string, ttl time.Duration) {
if kv == nil || js == nil || ttl <= 0 {
return
}
status, err := kv.Status()
if err != nil {
if logger != nil {
logger.Warn("Failed to read discovery KV status", zap.String("bucket", bucket), zap.Error(err))
}
return
}
if status.TTL() == ttl {
return
}
stream := "KV_" + bucket
info, err := js.StreamInfo(stream)
if err != nil {
if logger != nil {
logger.Warn("Failed to read discovery KV stream info", zap.String("bucket", bucket), zap.String("stream", stream), zap.Error(err))
}
return
}
cfg := info.Config
cfg.MaxAge = ttl
if _, err := js.UpdateStream(&cfg); err != nil {
if logger != nil {
logger.Warn("Failed to update discovery KV TTL", zap.String("bucket", bucket), zap.Duration("ttl", ttl), zap.Error(err))
}
return
}
if logger != nil {
logger.Info("Discovery KV TTL updated", zap.String("bucket", bucket), zap.Duration("ttl", ttl))
}
}
func (s *KVStore) Put(entry RegistryEntry) error {
if s == nil || s.kv == nil {
return errors.New("discovery kv: not configured")
}
key := registryEntryKey(normalizeEntry(entry))
if key == "" {
return errors.New("discovery kv: entry key is empty")
}
payload, err := json.Marshal(entry)
if err != nil {
return err
}
_, err = s.kv.Put(kvKeyFromRegistryKey(key), payload)
if err != nil && s.logger != nil {
fields := append(entryFields(entry), zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err))
s.logger.Warn("Failed to persist discovery entry", fields...)
}
return err
}
func (s *KVStore) Delete(id string) error {
if s == nil || s.kv == nil {
return errors.New("discovery kv: not configured")
}
key := kvKeyFromRegistryKey(id)
if key == "" {
return nil
}
if err := s.kv.Delete(key); err != nil && s.logger != nil {
s.logger.Warn("Failed to delete discovery entry", zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err))
return err
}
return nil
}
func (s *KVStore) WatchAll() (nats.KeyWatcher, error) {
if s == nil || s.kv == nil {
return nil, errors.New("discovery kv: not configured")
}
return s.kv.WatchAll()
}
func (s *KVStore) Bucket() string {
if s == nil {
return ""
}
return s.bucket
}