improved ledger account discovery

This commit is contained in:
Stephan D
2026-01-22 20:05:27 +01:00
parent c3226cb59e
commit 980c9fc9c7
23 changed files with 480 additions and 53 deletions

View File

@@ -0,0 +1,65 @@
package discovery
import (
"errors"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/tech/sendico/pkg/merrors"
)
var (
metricsOnce sync.Once
eventLatency *prometheus.HistogramVec
eventStatus *prometheus.CounterVec
)
func initMetrics() {
metricsOnce.Do(func() {
eventLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "sendico",
Subsystem: "discovery",
Name: "event_latency_seconds",
Help: "Latency distribution for discovery event handling.",
Buckets: prometheus.DefBuckets,
}, []string{"event"})
eventStatus = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sendico",
Subsystem: "discovery",
Name: "event_requests_total",
Help: "Total number of discovery events handled, grouped by event and status.",
}, []string{"event", "status"})
})
}
func observeEvent(event string, err error, duration time.Duration) {
if eventLatency != nil {
eventLatency.WithLabelValues(event).Observe(duration.Seconds())
}
if eventStatus != nil {
eventStatus.WithLabelValues(event, statusLabel(err)).Inc()
}
}
func statusLabel(err error) string {
switch {
case err == nil:
return "ok"
case errors.Is(err, merrors.ErrInvalidArg):
return "invalid_argument"
case errors.Is(err, merrors.ErrNoData):
return "not_found"
case errors.Is(err, merrors.ErrDataConflict):
return "conflict"
case errors.Is(err, merrors.ErrAccessDenied):
return "denied"
case errors.Is(err, merrors.ErrInternal):
return "internal"
default:
return "error"
}
}

View File

@@ -59,6 +59,7 @@ func NewRegistryService(logger mlogger.Logger, msgBroker mb.Broker, producer msg
return nil, merrors.InvalidArgument("discovery registry: no logger provided", "logger")
}
logger = logger.Named("discovery_registry")
initMetrics()
sender = strings.TrimSpace(sender)
if sender == "" {
sender = "discovery"
@@ -147,9 +148,14 @@ func (s *RegistryService) Stop() {
})
}
func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) error {
func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) (err error) {
start := time.Now()
defer func() {
observeEvent("announce", err, time.Since(start))
}()
var payload Announcement
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
if err = json.Unmarshal(env.GetData(), &payload); err != nil {
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery announce payload", fields...)
return err
@@ -174,9 +180,14 @@ func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) err
return nil
}
func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) error {
func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) (err error) {
start := time.Now()
defer func() {
observeEvent("heartbeat", err, time.Since(start))
}()
var payload Heartbeat
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
if err = json.Unmarshal(env.GetData(), &payload); err != nil {
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery heartbeat payload", fields...)
return err
@@ -208,13 +219,18 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) er
return nil
}
func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) error {
func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) (err error) {
start := time.Now()
defer func() {
observeEvent("lookup", err, time.Since(start))
}()
if s.producer == nil {
s.logWarn("Discovery lookup request ignored: producer not configured", envelopeFields(env)...)
return nil
}
var payload LookupRequest
if err := json.Unmarshal(env.GetData(), &payload); err != nil {
if err = json.Unmarshal(env.GetData(), &payload); err != nil {
fields := append(envelopeFields(env), zap.Int("data_len", len(env.GetData())), zap.Error(err))
s.logWarn("Failed to decode discovery lookup payload", fields...)
return err
@@ -222,7 +238,7 @@ func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) error
resp := s.registry.Lookup(time.Now())
resp.RequestID = strings.TrimSpace(payload.RequestID)
s.logDebug("Discovery lookup prepared", zap.String("request_id", resp.RequestID), zap.Int("services", len(resp.Services)), zap.Int("gateways", len(resp.Gateways)))
if err := s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil {
if err = s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil {
fields := []zap.Field{zap.String("request_id", resp.RequestID), zap.Error(err)}
s.logWarn("Failed to publish discovery lookup response", fields...)
return err