discovery service

This commit is contained in:
Stephan D
2026-01-02 02:44:01 +01:00
parent 97ba7500dc
commit ea1c69f14a
47 changed files with 2799 additions and 701 deletions

View File

@@ -9,6 +9,7 @@ import (
"github.com/google/uuid"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap"
)
type Announcer struct {
@@ -31,8 +32,11 @@ func NewAnnouncer(logger mlogger.Logger, producer msg.Producer, sender string, a
if announce.Service == "" {
announce.Service = strings.TrimSpace(sender)
}
if announce.InstanceID == "" {
announce.InstanceID = InstanceID()
}
if announce.ID == "" {
announce.ID = DefaultInstanceID(announce.Service)
announce.ID = DefaultEntryID(announce.Service)
}
if announce.InvokeURI == "" && announce.Service != "" {
announce.InvokeURI = DefaultInvokeURI(announce.Service)
@@ -53,15 +57,16 @@ func (a *Announcer) Start() {
}
a.startOnce.Do(func() {
if a.producer == nil {
a.logWarn("Discovery announce skipped: producer not configured")
a.logWarn("Discovery announce skipped: producer not configured", announcementFields(a.announce)...)
close(a.doneCh)
return
}
if strings.TrimSpace(a.announce.ID) == "" {
a.logWarn("Discovery announce skipped: missing instance id")
a.logWarn("Discovery announce skipped: missing instance id", announcementFields(a.announce)...)
close(a.doneCh)
return
}
a.logInfo("Discovery announcer starting", announcementFields(a.announce)...)
a.sendAnnouncement()
a.sendHeartbeat()
go a.heartbeatLoop()
@@ -75,6 +80,7 @@ func (a *Announcer) Stop() {
a.stopOnce.Do(func() {
close(a.stopCh)
<-a.doneCh
a.logInfo("Discovery announcer stopped", announcementFields(a.announce)...)
})
}
@@ -99,42 +105,47 @@ func (a *Announcer) heartbeatLoop() {
func (a *Announcer) sendAnnouncement() {
env := NewServiceAnnounceEnvelope(a.sender, a.announce)
event := ServiceAnnounceEvent()
if a.announce.Rail != "" {
env = NewGatewayAnnounceEnvelope(a.sender, a.announce)
event = GatewayAnnounceEvent()
}
if err := a.producer.SendMessage(env); err != nil {
a.logWarn("Failed to publish discovery announce: " + err.Error())
fields := append(announcementFields(a.announce), zap.String("event", event.ToString()), zap.Error(err))
a.logWarn("Failed to publish discovery announce", fields...)
return
}
a.logInfo("Discovery announce published")
a.logInfo("Discovery announce published", append(announcementFields(a.announce), zap.String("event", event.ToString()))...)
}
func (a *Announcer) sendHeartbeat() {
hb := Heartbeat{
ID: a.announce.ID,
Status: "ok",
TS: time.Now().Unix(),
ID: a.announce.ID,
InstanceID: a.announce.InstanceID,
Status: "ok",
TS: time.Now().Unix(),
}
if err := a.producer.SendMessage(NewHeartbeatEnvelope(a.sender, hb)); err != nil {
a.logWarn("Failed to publish discovery heartbeat: " + err.Error())
fields := append(announcementFields(a.announce), zap.String("event", HeartbeatEvent().ToString()), zap.Error(err))
a.logWarn("Failed to publish discovery heartbeat", fields...)
}
}
func (a *Announcer) logInfo(message string) {
func (a *Announcer) logInfo(message string, fields ...zap.Field) {
if a.logger == nil {
return
}
a.logger.Info(message)
a.logger.Info(message, fields...)
}
func (a *Announcer) logWarn(message string) {
func (a *Announcer) logWarn(message string, fields ...zap.Field) {
if a.logger == nil {
return
}
a.logger.Warn(message)
a.logger.Warn(message, fields...)
}
func DefaultInstanceID(service string) string {
func DefaultEntryID(service string) string {
clean := strings.ToLower(strings.TrimSpace(service))
if clean == "" {
clean = "service"
@@ -148,6 +159,10 @@ func DefaultInstanceID(service string) string {
return clean + "_" + host + "_" + uid
}
func DefaultInstanceID(service string) string {
return DefaultEntryID(service)
}
func DefaultInvokeURI(service string) string {
clean := strings.ToLower(strings.TrimSpace(service))
if clean == "" {

View File

@@ -9,7 +9,7 @@ import (
"github.com/google/uuid"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/messaging/broker"
mb "github.com/tech/sendico/pkg/messaging/broker"
cons "github.com/tech/sendico/pkg/messaging/consumer"
me "github.com/tech/sendico/pkg/messaging/envelope"
msgproducer "github.com/tech/sendico/pkg/messaging/producer"
@@ -27,22 +27,22 @@ type Client struct {
pending map[string]chan LookupResponse
}
func NewClient(logger mlogger.Logger, broker broker.Broker, producer msg.Producer, sender string) (*Client, error) {
if broker == nil {
func NewClient(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer, sender string) (*Client, error) {
if msgBroker == nil {
return nil, errors.New("discovery client: broker is nil")
}
if logger != nil {
logger = logger.Named("discovery_client")
}
if producer == nil {
producer = msgproducer.NewProducer(logger, broker)
producer = msgproducer.NewProducer(logger, msgBroker)
}
sender = strings.TrimSpace(sender)
if sender == "" {
sender = "discovery_client"
}
consumer, err := cons.NewConsumer(logger, broker, LookupResponseEvent())
consumer, err := cons.NewConsumer(logger, msgBroker, LookupResponseEvent())
if err != nil {
return nil, err
}
@@ -57,7 +57,7 @@ func NewClient(logger mlogger.Logger, broker broker.Broker, producer msg.Produce
go func() {
if err := consumer.ConsumeMessages(client.handleLookupResponse); err != nil && client.logger != nil {
client.logger.Warn("Discovery lookup consumer stopped", zap.Error(err))
client.logger.Warn("Discovery lookup consumer stopped", zap.String("event", LookupResponseEvent().ToString()), zap.Error(err))
}
}()
@@ -112,7 +112,8 @@ func (c *Client) Lookup(ctx context.Context) (LookupResponse, error) {
func (c *Client) handleLookupResponse(_ context.Context, env me.Envelope) error {
var payload LookupResponse
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
c.logWarn("Failed to decode discovery lookup response", zap.Error(err))
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
c.logWarn("Failed to decode discovery lookup response", fields...)
return err
}
requestID := strings.TrimSpace(payload.RequestID)

View File

@@ -0,0 +1,27 @@
package discovery
import (
"strings"
"sync"
"github.com/google/uuid"
)
var (
instanceID string
instanceOnce sync.Once
instanceIDGenerator = func() string {
return uuid.NewString()
}
)
// InstanceID returns a unique, process-stable identifier for the running service instance.
func InstanceID() string {
instanceOnce.Do(func() {
instanceID = strings.TrimSpace(instanceIDGenerator())
if instanceID == "" {
instanceID = uuid.NewString()
}
})
return instanceID
}

View File

@@ -0,0 +1,53 @@
package discovery
import (
"fmt"
"sync"
"testing"
)
func resetInstanceIDForTest() {
instanceID = ""
instanceOnce = sync.Once{}
}
func TestInstanceIDStable(t *testing.T) {
resetInstanceIDForTest()
original := instanceIDGenerator
defer func() {
instanceIDGenerator = original
resetInstanceIDForTest()
}()
instanceIDGenerator = func() string {
return "fixed-id"
}
first := InstanceID()
second := InstanceID()
if first != "fixed-id" || second != "fixed-id" {
t.Fatalf("expected stable instance id, got %q and %q", first, second)
}
}
func TestInstanceIDRegeneratesAfterReset(t *testing.T) {
resetInstanceIDForTest()
original := instanceIDGenerator
defer func() {
instanceIDGenerator = original
resetInstanceIDForTest()
}()
counter := 0
instanceIDGenerator = func() string {
counter++
return fmt.Sprintf("id-%d", counter)
}
first := InstanceID()
resetInstanceIDForTest()
second := InstanceID()
if first == second {
t.Fatalf("expected new instance id after reset, got %q", first)
}
}

99
api/pkg/discovery/keys.go Normal file
View File

@@ -0,0 +1,99 @@
package discovery
import "strings"
const kvEntryPrefix = "entry."
func registryEntryKey(entry RegistryEntry) string {
return registryKey(entry.Service, entry.Rail, entry.Network, entry.Operations, entry.Version, entry.InstanceID)
}
func registryKey(service, rail, network string, operations []string, version, instanceID string) string {
service = normalizeKeyPart(service)
rail = normalizeKeyPart(rail)
op := normalizeKeyPart(firstOperation(operations))
version = normalizeKeyPart(version)
instanceID = normalizeKeyPart(instanceID)
if instanceID == "" {
return ""
}
if service == "" {
service = "service"
}
if rail == "" {
rail = "none"
}
if op == "" {
op = "none"
}
if version == "" {
version = "unknown"
}
parts := []string{service, rail, op, version, instanceID}
if network != "" {
netPart := normalizeKeyPart(network)
if netPart != "" {
parts = append(parts, netPart)
}
}
return strings.Join(parts, ".")
}
func kvKeyFromRegistryKey(key string) string {
key = strings.TrimSpace(key)
if key == "" {
return ""
}
if strings.HasPrefix(key, kvEntryPrefix) {
return key
}
return kvEntryPrefix + key
}
func registryKeyFromKVKey(key string) string {
key = strings.TrimSpace(key)
if strings.HasPrefix(key, kvEntryPrefix) {
return strings.TrimPrefix(key, kvEntryPrefix)
}
return key
}
func firstOperation(ops []string) string {
for _, op := range ops {
op = strings.TrimSpace(op)
if op != "" {
return op
}
}
return ""
}
func normalizeKeyPart(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
if value == "" {
return ""
}
var b strings.Builder
b.Grow(len(value))
lastDash := false
for _, r := range value {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') {
b.WriteRune(r)
lastDash = false
continue
}
if r == '-' || r == '_' {
if !lastDash {
b.WriteByte('-')
lastDash = true
}
continue
}
if !lastDash {
b.WriteByte('-')
lastDash = true
}
}
out := strings.Trim(b.String(), "-")
return out
}

103
api/pkg/discovery/kv.go Normal file
View File

@@ -0,0 +1,103 @@
package discovery
import (
"encoding/json"
"errors"
"strings"
"github.com/nats-io/nats.go"
"github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap"
)
const DefaultKVBucket = "discovery_registry"
type KVStore struct {
logger mlogger.Logger
kv nats.KeyValue
bucket string
}
func NewKVStore(logger mlogger.Logger, js nats.JetStreamContext, bucket string) (*KVStore, error) {
if js == nil {
return nil, errors.New("discovery kv: jetstream is nil")
}
if logger != nil {
logger = logger.Named("discovery_kv")
}
bucket = strings.TrimSpace(bucket)
if bucket == "" {
bucket = DefaultKVBucket
}
kv, err := js.KeyValue(bucket)
if err != nil {
if errors.Is(err, nats.ErrBucketNotFound) {
kv, err = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: bucket,
Description: "service discovery registry",
History: 1,
})
if err == nil && logger != nil {
logger.Info("Discovery KV bucket created", zap.String("bucket", bucket))
}
}
if err != nil {
return nil, err
}
}
return &KVStore{
logger: logger,
kv: kv,
bucket: bucket,
}, nil
}
func (s *KVStore) Put(entry RegistryEntry) error {
if s == nil || s.kv == nil {
return errors.New("discovery kv: not configured")
}
key := registryEntryKey(normalizeEntry(entry))
if key == "" {
return errors.New("discovery kv: entry key is empty")
}
payload, err := json.Marshal(entry)
if err != nil {
return err
}
_, err = s.kv.Put(kvKeyFromRegistryKey(key), payload)
if err != nil && s.logger != nil {
fields := append(entryFields(entry), zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err))
s.logger.Warn("Failed to persist discovery entry", fields...)
}
return err
}
func (s *KVStore) Delete(id string) error {
if s == nil || s.kv == nil {
return errors.New("discovery kv: not configured")
}
key := kvKeyFromRegistryKey(id)
if key == "" {
return nil
}
if err := s.kv.Delete(key); err != nil && s.logger != nil {
s.logger.Warn("Failed to delete discovery entry", zap.String("bucket", s.bucket), zap.String("key", key), zap.Error(err))
return err
}
return nil
}
func (s *KVStore) WatchAll() (nats.KeyWatcher, error) {
if s == nil || s.kv == nil {
return nil, errors.New("discovery kv: not configured")
}
return s.kv.WatchAll()
}
func (s *KVStore) Bucket() string {
if s == nil {
return ""
}
return s.bucket
}

View File

@@ -0,0 +1,108 @@
package discovery
import (
"strings"
me "github.com/tech/sendico/pkg/messaging/envelope"
"go.uber.org/zap"
)
func announcementFields(announce Announcement) []zap.Field {
fields := make([]zap.Field, 0, 10)
if announce.ID != "" {
fields = append(fields, zap.String("id", announce.ID))
}
if announce.InstanceID != "" {
fields = append(fields, zap.String("instance_id", announce.InstanceID))
}
if announce.Service != "" {
fields = append(fields, zap.String("service", announce.Service))
}
if announce.Rail != "" {
fields = append(fields, zap.String("rail", announce.Rail))
}
if announce.Network != "" {
fields = append(fields, zap.String("network", announce.Network))
}
if announce.InvokeURI != "" {
fields = append(fields, zap.String("invoke_uri", announce.InvokeURI))
}
if announce.Version != "" {
fields = append(fields, zap.String("version", announce.Version))
}
if announce.RoutingPriority != 0 {
fields = append(fields, zap.Int("routing_priority", announce.RoutingPriority))
}
if len(announce.Operations) > 0 {
fields = append(fields, zap.Int("ops", len(announce.Operations)))
}
if len(announce.Currencies) > 0 {
fields = append(fields, zap.Int("currencies", len(announce.Currencies)))
}
if announce.Health.IntervalSec > 0 {
fields = append(fields, zap.Int("interval_sec", announce.Health.IntervalSec))
}
if announce.Health.TimeoutSec > 0 {
fields = append(fields, zap.Int("timeout_sec", announce.Health.TimeoutSec))
}
return fields
}
func entryFields(entry RegistryEntry) []zap.Field {
fields := make([]zap.Field, 0, 12)
if entry.ID != "" {
fields = append(fields, zap.String("id", entry.ID))
}
if entry.InstanceID != "" {
fields = append(fields, zap.String("instance_id", entry.InstanceID))
}
if entry.Service != "" {
fields = append(fields, zap.String("service", entry.Service))
}
if entry.Rail != "" {
fields = append(fields, zap.String("rail", entry.Rail))
}
if entry.Network != "" {
fields = append(fields, zap.String("network", entry.Network))
}
if entry.Version != "" {
fields = append(fields, zap.String("version", entry.Version))
}
if entry.InvokeURI != "" {
fields = append(fields, zap.String("invoke_uri", entry.InvokeURI))
}
if entry.Status != "" {
fields = append(fields, zap.String("status", entry.Status))
}
if !entry.LastHeartbeat.IsZero() {
fields = append(fields, zap.Time("last_heartbeat", entry.LastHeartbeat))
}
fields = append(fields, zap.Bool("healthy", entry.Healthy))
if entry.RoutingPriority != 0 {
fields = append(fields, zap.Int("routing_priority", entry.RoutingPriority))
}
if len(entry.Operations) > 0 {
fields = append(fields, zap.Int("ops", len(entry.Operations)))
}
if len(entry.Currencies) > 0 {
fields = append(fields, zap.Int("currencies", len(entry.Currencies)))
}
return fields
}
func envelopeFields(env me.Envelope) []zap.Field {
if env == nil {
return nil
}
fields := make([]zap.Field, 0, 4)
sender := strings.TrimSpace(env.GetSender())
if sender != "" {
fields = append(fields, zap.String("sender", sender))
}
if signature := env.GetSignature(); signature != nil {
fields = append(fields, zap.String("event", signature.ToString()))
}
fields = append(fields, zap.String("message_id", env.GetMessageId().String()))
fields = append(fields, zap.Time("timestamp", env.GetTimeStamp()))
return fields
}

View File

@@ -1,5 +1,7 @@
package discovery
import "time"
type LookupRequest struct {
RequestID string `json:"requestId,omitempty"`
}
@@ -11,16 +13,18 @@ type LookupResponse struct {
}
type ServiceSummary struct {
ID string `json:"id"`
Service string `json:"service"`
Ops []string `json:"ops,omitempty"`
Version string `json:"version,omitempty"`
Healthy bool `json:"healthy,omitempty"`
InvokeURI string `json:"invokeURI,omitempty"`
ID string `json:"id"`
InstanceID string `json:"instanceId"`
Service string `json:"service"`
Ops []string `json:"ops,omitempty"`
Version string `json:"version,omitempty"`
Healthy bool `json:"healthy,omitempty"`
InvokeURI string `json:"invokeURI,omitempty"`
}
type GatewaySummary struct {
ID string `json:"id"`
InstanceID string `json:"instanceId"`
Rail string `json:"rail"`
Network string `json:"network,omitempty"`
Currencies []string `json:"currencies,omitempty"`
@@ -43,6 +47,7 @@ func (r *Registry) Lookup(now time.Time) LookupResponse {
if entry.Rail != "" {
resp.Gateways = append(resp.Gateways, GatewaySummary{
ID: entry.ID,
InstanceID: entry.InstanceID,
Rail: entry.Rail,
Network: entry.Network,
Currencies: cloneStrings(entry.Currencies),
@@ -56,12 +61,13 @@ func (r *Registry) Lookup(now time.Time) LookupResponse {
continue
}
resp.Services = append(resp.Services, ServiceSummary{
ID: entry.ID,
Service: entry.Service,
Ops: cloneStrings(entry.Operations),
Version: entry.Version,
Healthy: entry.Healthy,
InvokeURI: entry.InvokeURI,
ID: entry.ID,
InstanceID: entry.InstanceID,
Service: entry.Service,
Ops: cloneStrings(entry.Operations),
Version: entry.Version,
Healthy: entry.Healthy,
InvokeURI: entry.InvokeURI,
})
}

View File

@@ -13,6 +13,7 @@ const (
type RegistryEntry struct {
ID string `json:"id"`
InstanceID string `bson:"instanceId" json:"instanceId"`
Service string `json:"service"`
Rail string `json:"rail,omitempty"`
Network string `json:"network,omitempty"`
@@ -29,8 +30,10 @@ type RegistryEntry struct {
}
type Registry struct {
mu sync.RWMutex
entries map[string]*RegistryEntry
mu sync.RWMutex
entries map[string]*RegistryEntry
byID map[string]map[string]struct{}
byInstance map[string]map[string]struct{}
}
type UpdateResult struct {
@@ -42,23 +45,31 @@ type UpdateResult struct {
func NewRegistry() *Registry {
return &Registry{
entries: map[string]*RegistryEntry{},
entries: map[string]*RegistryEntry{},
byID: map[string]map[string]struct{}{},
byInstance: map[string]map[string]struct{}{},
}
}
func (r *Registry) UpsertFromAnnouncement(announce Announcement, now time.Time) UpdateResult {
entry := registryEntryFromAnnouncement(normalizeAnnouncement(announce), now)
key := registryEntryKey(entry)
if key == "" {
return UpdateResult{Entry: entry}
}
r.mu.Lock()
defer r.mu.Unlock()
existing, ok := r.entries[entry.ID]
existing, ok := r.entries[key]
wasHealthy := false
if ok && existing != nil {
wasHealthy = existing.isHealthyAt(now)
r.unindexEntry(key, existing)
}
entry.Healthy = entry.isHealthyAt(now)
r.entries[entry.ID] = &entry
r.entries[key] = &entry
r.indexEntry(key, &entry)
return UpdateResult{
Entry: entry,
@@ -68,10 +79,45 @@ func (r *Registry) UpsertFromAnnouncement(announce Announcement, now time.Time)
}
}
func (r *Registry) UpdateHeartbeat(id string, status string, ts time.Time, now time.Time) (UpdateResult, bool) {
func (r *Registry) UpsertEntry(entry RegistryEntry, now time.Time) UpdateResult {
entry = normalizeEntry(entry)
key := registryEntryKey(entry)
if key == "" {
return UpdateResult{Entry: entry}
}
if entry.LastHeartbeat.IsZero() {
entry.LastHeartbeat = now
}
if strings.TrimSpace(entry.Status) == "" {
entry.Status = "ok"
}
r.mu.Lock()
defer r.mu.Unlock()
existing, ok := r.entries[key]
wasHealthy := false
if ok && existing != nil {
wasHealthy = existing.isHealthyAt(now)
r.unindexEntry(key, existing)
}
entry.Healthy = entry.isHealthyAt(now)
r.entries[key] = &entry
r.indexEntry(key, &entry)
return UpdateResult{
Entry: entry,
IsNew: !ok,
WasHealthy: wasHealthy,
BecameHealthy: !wasHealthy && entry.Healthy,
}
}
func (r *Registry) UpdateHeartbeat(id string, instanceID string, status string, ts time.Time, now time.Time) []UpdateResult {
id = strings.TrimSpace(id)
if id == "" {
return UpdateResult{}, false
instanceID = strings.TrimSpace(instanceID)
if id == "" && instanceID == "" {
return nil
}
if status == "" {
status = "ok"
@@ -83,21 +129,54 @@ func (r *Registry) UpdateHeartbeat(id string, status string, ts time.Time, now t
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.entries[id]
if !ok || entry == nil {
return UpdateResult{}, false
keys := keysFromIndex(r.byInstance[instanceID])
if len(keys) == 0 && id != "" {
keys = keysFromIndex(r.byID[id])
}
wasHealthy := entry.isHealthyAt(now)
entry.Status = status
entry.LastHeartbeat = ts
entry.Healthy = entry.isHealthyAt(now)
if len(keys) == 0 {
return nil
}
results := make([]UpdateResult, 0, len(keys))
for _, key := range keys {
entry := r.entries[key]
if entry == nil {
continue
}
if id != "" && entry.ID != id {
continue
}
if instanceID != "" && entry.InstanceID != instanceID {
continue
}
wasHealthy := entry.isHealthyAt(now)
entry.Status = status
entry.LastHeartbeat = ts
entry.Healthy = entry.isHealthyAt(now)
return UpdateResult{
Entry: *entry,
IsNew: false,
WasHealthy: wasHealthy,
BecameHealthy: !wasHealthy && entry.Healthy,
}, true
results = append(results, UpdateResult{
Entry: *entry,
IsNew: false,
WasHealthy: wasHealthy,
BecameHealthy: !wasHealthy && entry.Healthy,
})
}
return results
}
func (r *Registry) Delete(key string) bool {
key = strings.TrimSpace(key)
if key == "" {
return false
}
r.mu.Lock()
defer r.mu.Unlock()
entry, ok := r.entries[key]
if !ok {
return false
}
delete(r.entries, key)
r.unindexEntry(key, entry)
return true
}
func (r *Registry) List(now time.Time, onlyHealthy bool) []RegistryEntry {
@@ -123,6 +202,7 @@ func registryEntryFromAnnouncement(announce Announcement, now time.Time) Registr
status := "ok"
return RegistryEntry{
ID: strings.TrimSpace(announce.ID),
InstanceID: strings.TrimSpace(announce.InstanceID),
Service: strings.TrimSpace(announce.Service),
Rail: strings.ToUpper(strings.TrimSpace(announce.Rail)),
Network: strings.ToUpper(strings.TrimSpace(announce.Network)),
@@ -138,8 +218,33 @@ func registryEntryFromAnnouncement(announce Announcement, now time.Time) Registr
}
}
func normalizeEntry(entry RegistryEntry) RegistryEntry {
entry.ID = strings.TrimSpace(entry.ID)
entry.InstanceID = strings.TrimSpace(entry.InstanceID)
if entry.InstanceID == "" {
entry.InstanceID = entry.ID
}
entry.Service = strings.TrimSpace(entry.Service)
entry.Rail = strings.ToUpper(strings.TrimSpace(entry.Rail))
entry.Network = strings.ToUpper(strings.TrimSpace(entry.Network))
entry.Operations = normalizeStrings(entry.Operations, false)
entry.Currencies = normalizeStrings(entry.Currencies, true)
entry.InvokeURI = strings.TrimSpace(entry.InvokeURI)
entry.Version = strings.TrimSpace(entry.Version)
entry.Status = strings.TrimSpace(entry.Status)
entry.Health = normalizeHealth(entry.Health)
if entry.Limits != nil {
entry.Limits = normalizeLimits(*entry.Limits)
}
return entry
}
func normalizeAnnouncement(announce Announcement) Announcement {
announce.ID = strings.TrimSpace(announce.ID)
announce.InstanceID = strings.TrimSpace(announce.InstanceID)
if announce.InstanceID == "" {
announce.InstanceID = announce.ID
}
announce.Service = strings.TrimSpace(announce.Service)
announce.Rail = strings.ToUpper(strings.TrimSpace(announce.Rail))
announce.Network = strings.ToUpper(strings.TrimSpace(announce.Network))
@@ -239,6 +344,67 @@ func cloneStrings(values []string) []string {
return out
}
func (r *Registry) indexEntry(key string, entry *RegistryEntry) {
if r == nil || entry == nil || key == "" {
return
}
if entry.ID != "" {
addIndex(r.byID, entry.ID, key)
}
if entry.InstanceID != "" {
addIndex(r.byInstance, entry.InstanceID, key)
}
}
func (r *Registry) unindexEntry(key string, entry *RegistryEntry) {
if r == nil || entry == nil || key == "" {
return
}
if entry.ID != "" {
removeIndex(r.byID, entry.ID, key)
}
if entry.InstanceID != "" {
removeIndex(r.byInstance, entry.InstanceID, key)
}
}
func addIndex(index map[string]map[string]struct{}, id string, key string) {
if id == "" || key == "" {
return
}
set := index[id]
if set == nil {
set = map[string]struct{}{}
index[id] = set
}
set[key] = struct{}{}
}
func removeIndex(index map[string]map[string]struct{}, id string, key string) {
if id == "" || key == "" {
return
}
set := index[id]
if set == nil {
return
}
delete(set, key)
if len(set) == 0 {
delete(index, id)
}
}
func keysFromIndex(index map[string]struct{}) []string {
if len(index) == 0 {
return nil
}
keys := make([]string, 0, len(index))
for key := range index {
keys = append(keys, key)
}
return keys
}
func (e *RegistryEntry) isHealthyAt(now time.Time) bool {
if e == nil {
return false

View File

@@ -8,8 +8,9 @@ import (
"sync"
"time"
"github.com/nats-io/nats.go"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/messaging/broker"
mb "github.com/tech/sendico/pkg/messaging/broker"
cons "github.com/tech/sendico/pkg/messaging/consumer"
me "github.com/tech/sendico/pkg/messaging/envelope"
"github.com/tech/sendico/pkg/mlogger"
@@ -22,6 +23,8 @@ type RegistryService struct {
producer msg.Producer
sender string
consumers []consumerHandler
kv *KVStore
kvWatcher nats.KeyWatcher
startOnce sync.Once
stopOnce sync.Once
@@ -30,10 +33,11 @@ type RegistryService struct {
type consumerHandler struct {
consumer msg.Consumer
handler msg.MessageHandlerT
event string
}
func NewRegistryService(logger mlogger.Logger, broker broker.Broker, producer msg.Producer, registry *Registry, sender string) (*RegistryService, error) {
if broker == nil {
func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg.Producer, registry *Registry, sender string) (*RegistryService, error) {
if msgBroker == nil {
return nil, errors.New("discovery registry: broker is nil")
}
if registry == nil {
@@ -47,19 +51,19 @@ func NewRegistryService(logger mlogger.Logger, broker broker.Broker, producer ms
sender = "discovery"
}
serviceConsumer, err := cons.NewConsumer(logger, broker, ServiceAnnounceEvent())
serviceConsumer, err := cons.NewConsumer(logger, msgBroker, ServiceAnnounceEvent())
if err != nil {
return nil, err
}
gatewayConsumer, err := cons.NewConsumer(logger, broker, GatewayAnnounceEvent())
gatewayConsumer, err := cons.NewConsumer(logger, msgBroker, GatewayAnnounceEvent())
if err != nil {
return nil, err
}
heartbeatConsumer, err := cons.NewConsumer(logger, broker, HeartbeatEvent())
heartbeatConsumer, err := cons.NewConsumer(logger, msgBroker, HeartbeatEvent())
if err != nil {
return nil, err
}
lookupConsumer, err := cons.NewConsumer(logger, broker, LookupRequestEvent())
lookupConsumer, err := cons.NewConsumer(logger, msgBroker, LookupRequestEvent())
if err != nil {
return nil, err
}
@@ -69,17 +73,18 @@ func NewRegistryService(logger mlogger.Logger, broker broker.Broker, producer ms
registry: registry,
producer: producer,
sender: sender,
consumers: []consumerHandler{
{consumer: serviceConsumer, handler: func(ctx context.Context, env me.Envelope) error {
return svc.handleAnnounce(ctx, env)
}},
{consumer: gatewayConsumer, handler: func(ctx context.Context, env me.Envelope) error {
return svc.handleAnnounce(ctx, env)
}},
{consumer: heartbeatConsumer, handler: svc.handleHeartbeat},
{consumer: lookupConsumer, handler: svc.handleLookup},
},
}
svc.consumers = []consumerHandler{
{consumer: serviceConsumer, event: ServiceAnnounceEvent().ToString(), handler: func(ctx context.Context, env me.Envelope) error {
return svc.handleAnnounce(ctx, env)
}},
{consumer: gatewayConsumer, event: GatewayAnnounceEvent().ToString(), handler: func(ctx context.Context, env me.Envelope) error {
return svc.handleAnnounce(ctx, env)
}},
{consumer: heartbeatConsumer, event: HeartbeatEvent().ToString(), handler: svc.handleHeartbeat},
{consumer: lookupConsumer, event: LookupRequestEvent().ToString(), handler: svc.handleLookup},
}
svc.initKV(msgBroker)
return svc, nil
}
@@ -88,14 +93,16 @@ func (s *RegistryService) Start() {
return
}
s.startOnce.Do(func() {
s.logInfo("Discovery registry service starting", zap.Int("consumers", len(s.consumers)), zap.Bool("kv_enabled", s.kv != nil))
for _, ch := range s.consumers {
ch := ch
go func() {
if err := ch.consumer.ConsumeMessages(ch.handler); err != nil && s.logger != nil {
s.logger.Warn("Discovery consumer stopped with error", zap.Error(err))
s.logger.Warn("Discovery consumer stopped with error", zap.String("event", ch.event), zap.Error(err))
}
}()
}
s.startKVWatch()
})
}
@@ -109,18 +116,29 @@ func (s *RegistryService) Stop() {
ch.consumer.Close()
}
}
if s.kvWatcher != nil {
_ = s.kvWatcher.Stop()
}
s.logInfo("Discovery registry service stopped")
})
}
func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) error {
var payload Announcement
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
s.logWarn("Failed to decode discovery announce payload", zap.Error(err))
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery announce payload", fields...)
return err
}
if strings.TrimSpace(payload.InstanceID) == "" {
fields := append(envelopeFields(env), announcementFields(payload)...)
s.logWarn("Discovery announce missing instance id", fields...)
}
now := time.Now()
result := s.registry.UpsertFromAnnouncement(payload, now)
s.persistEntry(result.Entry)
if result.IsNew || result.BecameHealthy {
s.logInfo("Discovery registry entry updated", append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy))...)
s.publishRefresh(result.Entry)
}
return nil
@@ -129,37 +147,48 @@ func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) err
func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) error {
var payload Heartbeat
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
s.logWarn("Failed to decode discovery heartbeat payload", zap.Error(err))
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery heartbeat payload", fields...)
return err
}
if payload.ID == "" {
if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
return nil
}
if strings.TrimSpace(payload.InstanceID) == "" {
fields := append(envelopeFields(env), zap.String("id", payload.ID))
s.logWarn("Discovery heartbeat missing instance id", fields...)
}
ts := time.Unix(payload.TS, 0)
if ts.Unix() <= 0 {
ts = time.Now()
}
result, ok := s.registry.UpdateHeartbeat(payload.ID, strings.TrimSpace(payload.Status), ts, time.Now())
if ok && result.BecameHealthy {
s.publishRefresh(result.Entry)
results := s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, strings.TrimSpace(payload.Status), ts, time.Now())
for _, result := range results {
if result.BecameHealthy {
s.logInfo("Discovery registry entry became healthy", append(entryFields(result.Entry), zap.String("status", result.Entry.Status))...)
s.publishRefresh(result.Entry)
}
s.persistEntry(result.Entry)
}
return nil
}
func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) error {
if s.producer == nil {
s.logWarn("Discovery lookup request ignored: producer not configured")
s.logWarn("Discovery lookup request ignored: producer not configured", envelopeFields(env)...)
return nil
}
var payload LookupRequest
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
s.logWarn("Failed to decode discovery lookup payload", zap.Error(err))
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery lookup payload", fields...)
return err
}
resp := s.registry.Lookup(time.Now())
resp.RequestID = strings.TrimSpace(payload.RequestID)
if err := s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil {
s.logWarn("Failed to publish discovery lookup response", zap.Error(err))
fields := []zap.Field{zap.String("request_id", resp.RequestID), zap.Error(err)}
s.logWarn("Failed to publish discovery lookup response", fields...)
return err
}
return nil
@@ -170,13 +199,99 @@ func (s *RegistryService) publishRefresh(entry RegistryEntry) {
return
}
payload := RefreshEvent{
Service: entry.Service,
Rail: entry.Rail,
Network: entry.Network,
Message: "new module available",
InstanceID: entry.InstanceID,
Service: entry.Service,
Rail: entry.Rail,
Network: entry.Network,
Message: "new module available",
}
if err := s.producer.SendMessage(NewRefreshUIEnvelope(s.sender, payload)); err != nil {
s.logWarn("Failed to publish discovery refresh event", zap.Error(err))
fields := append(entryFields(entry), zap.Error(err))
s.logWarn("Failed to publish discovery refresh event", fields...)
}
}
type jetStreamProvider interface {
JetStream() nats.JetStreamContext
}
func (s *RegistryService) initKV(msgBroker mb.Broker) {
if s == nil || msgBroker == nil {
return
}
provider, ok := msgBroker.(jetStreamProvider)
if !ok {
return
}
js := provider.JetStream()
if js == nil {
return
}
store, err := NewKVStore(s.logger, js, "")
if err != nil {
s.logWarn("Failed to initialise discovery KV store", zap.Error(err))
return
}
s.kv = store
}
func (s *RegistryService) startKVWatch() {
if s == nil || s.kv == nil {
return
}
watcher, err := s.kv.WatchAll()
if err != nil {
s.logWarn("Failed to start discovery KV watch", zap.Error(err))
return
}
s.kvWatcher = watcher
if bucket := s.kv.Bucket(); bucket != "" {
s.logInfo("Discovery KV watch started", zap.String("bucket", bucket))
}
go s.consumeKVUpdates(watcher)
}
func (s *RegistryService) consumeKVUpdates(watcher nats.KeyWatcher) {
if s == nil || watcher == nil {
return
}
for entry := range watcher.Updates() {
if entry == nil {
continue
}
switch entry.Operation() {
case nats.KeyValueDelete, nats.KeyValuePurge:
key := registryKeyFromKVKey(entry.Key())
if key != "" {
if s.registry.Delete(key) {
s.logInfo("Discovery registry entry removed", zap.String("key", key))
}
}
continue
case nats.KeyValuePut:
default:
continue
}
var payload RegistryEntry
if err := json.Unmarshal(entry.Value(), &payload); err != nil {
s.logWarn("Failed to decode discovery KV entry", zap.String("key", entry.Key()), zap.Error(err))
continue
}
result := s.registry.UpsertEntry(payload, time.Now())
if result.IsNew || result.BecameHealthy {
s.logInfo("Discovery registry entry updated from KV", append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy))...)
s.publishRefresh(result.Entry)
}
}
}
func (s *RegistryService) persistEntry(entry RegistryEntry) {
if s == nil || s.kv == nil {
return
}
if err := s.kv.Put(entry); err != nil {
s.logWarn("Failed to persist discovery entry", append(entryFields(entry), zap.Error(err))...)
}
}
@@ -186,3 +301,10 @@ func (s *RegistryService) logWarn(message string, fields ...zap.Field) {
}
s.logger.Warn(message, fields...)
}
func (s *RegistryService) logInfo(message string, fields ...zap.Field) {
if s.logger == nil {
return
}
s.logger.Info(message, fields...)
}

View File

@@ -14,6 +14,7 @@ type Limits struct {
type Announcement struct {
ID string `json:"id"`
InstanceID string `bson:"instanceId" json:"instanceId"`
Service string `json:"service"`
Rail string `json:"rail,omitempty"`
Network string `json:"network,omitempty"`
@@ -27,14 +28,16 @@ type Announcement struct {
}
type Heartbeat struct {
ID string `json:"id"`
Status string `json:"status"`
TS int64 `json:"ts"`
ID string `json:"id"`
InstanceID string `json:"instanceId"`
Status string `json:"status"`
TS int64 `json:"ts"`
}
type RefreshEvent struct {
Service string `json:"service,omitempty"`
Rail string `json:"rail,omitempty"`
Network string `json:"network,omitempty"`
Message string `json:"message,omitempty"`
InstanceID string `json:"instanceId,omitempty"`
Service string `json:"service,omitempty"`
Rail string `json:"rail,omitempty"`
Network string `json:"network,omitempty"`
Message string `json:"message,omitempty"`
}

View File

@@ -0,0 +1,126 @@
package discovery
import (
"encoding/json"
"errors"
"sync"
"time"
"github.com/nats-io/nats.go"
mb "github.com/tech/sendico/pkg/messaging/broker"
"github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap"
)
type RegistryWatcher struct {
logger mlogger.Logger
registry *Registry
kv *KVStore
watcher nats.KeyWatcher
stopOnce sync.Once
}
func NewRegistryWatcher(logger mlogger.Logger, msgBroker mb.Broker, registry *Registry) (*RegistryWatcher, error) {
if msgBroker == nil {
return nil, errors.New("discovery watcher: broker is nil")
}
if registry == nil {
registry = NewRegistry()
}
if logger != nil {
logger = logger.Named("discovery_watcher")
}
provider, ok := msgBroker.(jetStreamProvider)
if !ok {
return nil, errors.New("discovery watcher: jetstream not available")
}
js := provider.JetStream()
if js == nil {
return nil, errors.New("discovery watcher: jetstream not configured")
}
store, err := NewKVStore(logger, js, "")
if err != nil {
return nil, err
}
return &RegistryWatcher{
logger: logger,
registry: registry,
kv: store,
}, nil
}
func (w *RegistryWatcher) Start() error {
if w == nil || w.kv == nil {
return errors.New("discovery watcher: not configured")
}
watcher, err := w.kv.WatchAll()
if err != nil {
return err
}
w.watcher = watcher
if w.logger != nil {
w.logger.Info("Discovery registry watcher started", zap.String("bucket", w.kv.Bucket()))
}
go w.consume(watcher)
return nil
}
func (w *RegistryWatcher) Stop() {
if w == nil {
return
}
w.stopOnce.Do(func() {
if w.watcher != nil {
_ = w.watcher.Stop()
}
if w.logger != nil {
w.logger.Info("Discovery registry watcher stopped")
}
})
}
func (w *RegistryWatcher) Registry() *Registry {
if w == nil {
return nil
}
return w.registry
}
func (w *RegistryWatcher) consume(watcher nats.KeyWatcher) {
if w == nil || watcher == nil {
return
}
for entry := range watcher.Updates() {
if entry == nil {
continue
}
switch entry.Operation() {
case nats.KeyValueDelete, nats.KeyValuePurge:
key := registryKeyFromKVKey(entry.Key())
if key != "" {
if w.registry.Delete(key) && w.logger != nil {
w.logger.Info("Discovery registry entry removed", zap.String("key", key))
}
}
continue
case nats.KeyValuePut:
default:
continue
}
var payload RegistryEntry
if err := json.Unmarshal(entry.Value(), &payload); err != nil {
if w.logger != nil {
w.logger.Warn("Failed to decode discovery KV entry", zap.String("key", entry.Key()), zap.Error(err))
}
continue
}
result := w.registry.UpsertEntry(payload, time.Now())
if w.logger != nil && (result.IsNew || result.BecameHealthy) {
fields := append(entryFields(result.Entry), zap.Bool("is_new", result.IsNew), zap.Bool("became_healthy", result.BecameHealthy))
w.logger.Info("Discovery registry entry updated from KV", fields...)
}
}
}

View File

@@ -6,6 +6,7 @@ import (
"net/url"
"os"
"strconv"
"strings"
"sync"
"time"
@@ -20,6 +21,7 @@ type natsSubscriotions = map[string]*TopicSubscription
type NatsBroker struct {
nc *nats.Conn
js nats.JetStreamContext
logger *zap.Logger
topicSubs natsSubscriotions
mu sync.Mutex
@@ -78,23 +80,46 @@ func loadEnv(settings *nc.Settings, l *zap.Logger) (*envConfig, error) {
func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, error) {
l := logger.Named("broker")
// Helper function to get environment variables
cfg, err := loadEnv(settings, l)
if err != nil {
return nil, err
var err error
var cfg *envConfig
var natsURL string
if settings != nil && strings.TrimSpace(settings.URLEnv) != "" {
urlVal := strings.TrimSpace(os.Getenv(settings.URLEnv))
if urlVal != "" {
natsURL = urlVal
}
}
if natsURL == "" {
// Helper function to get environment variables
cfg, err = loadEnv(settings, l)
if err != nil {
return nil, err
}
u := &url.URL{
Scheme: "nats",
Host: net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)),
u := &url.URL{
Scheme: "nats",
Host: net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)),
}
natsURL = u.String()
}
natsURL := u.String()
opts := []nats.Option{
nats.Name(settings.NATSName),
nats.MaxReconnects(settings.MaxReconnects),
nats.ReconnectWait(time.Duration(settings.ReconnectWait) * time.Second),
nats.UserInfo(cfg.User, cfg.Password),
}
if cfg != nil {
opts = append(opts, nats.UserInfo(cfg.User, cfg.Password))
} else if settings != nil {
userEnv := strings.TrimSpace(settings.UsernameEnv)
passEnv := strings.TrimSpace(settings.PasswordEnv)
if userEnv != "" && passEnv != "" {
user := strings.TrimSpace(os.Getenv(userEnv))
pass := strings.TrimSpace(os.Getenv(passEnv))
if user != "" || pass != "" {
opts = append(opts, nats.UserInfo(user, pass))
}
}
}
res := &NatsBroker{
@@ -106,8 +131,18 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
l.Error("Failed to connect to NATS", zap.String("url", natsURL), zap.Error(err))
return nil, err
}
if res.js, err = res.nc.JetStream(); err != nil {
l.Warn("Failed to initialise JetStream context", zap.Error(err))
}
logger.Info("Connected to NATS", zap.String("broker", settings.NATSName),
zap.String("url", fmt.Sprintf("nats://%s@%s", cfg.User, net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)))))
zap.String("url", natsURL))
return res, nil
}
func (b *NatsBroker) JetStream() nats.JetStreamContext {
if b == nil {
return nil
}
return b.js
}

View File

@@ -7,6 +7,7 @@ import (
"os/signal"
"syscall"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/mlogger"
lf "github.com/tech/sendico/pkg/mlogger/factory"
"github.com/tech/sendico/pkg/server"
@@ -28,6 +29,7 @@ func prepareLogger() mlogger.Logger {
func RunServer(rootLoggerName string, av version.Printer, factory server.ServerFactoryT) {
logger := prepareLogger().Named(rootLoggerName)
logger = logger.With(zap.String("instance_id", discovery.InstanceID()))
defer logger.Sync()
// Show version information