TG settlement service

This commit is contained in:
Stephan D
2026-01-02 14:54:18 +01:00
parent ea1c69f14a
commit 743f683d92
82 changed files with 4693 additions and 503 deletions

View File

@@ -93,7 +93,13 @@ func (s *RegistryService) Start() {
return
}
s.startOnce.Do(func() {
s.logInfo("Discovery registry service starting", zap.Int("consumers", len(s.consumers)), zap.Bool("kv_enabled", s.kv != nil))
fields := []zap.Field{zap.Int("consumers", len(s.consumers)), zap.Bool("kv_enabled", s.kv != nil)}
if s.kv != nil {
if bucket := s.kv.Bucket(); bucket != "" {
fields = append(fields, zap.String("kv_bucket", bucket))
}
}
s.logInfo("Discovery registry service starting", fields...)
for _, ch := range s.consumers {
ch := ch
go func() {
@@ -130,6 +136,12 @@ func (s *RegistryService) handleAnnounce(_ context.Context, env me.Envelope) err
s.logWarn("Failed to decode discovery announce payload", fields...)
return err
}
s.logDebug("Discovery announce received", append(envelopeFields(env), announcementFields(payload)...)...)
if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
fields := append(envelopeFields(env), announcementFields(payload)...)
s.logWarn("Discovery announce missing id and instance id", fields...)
return nil
}
if strings.TrimSpace(payload.InstanceID) == "" {
fields := append(envelopeFields(env), announcementFields(payload)...)
s.logWarn("Discovery announce missing instance id", fields...)
@@ -151,6 +163,7 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) er
s.logWarn("Failed to decode discovery heartbeat payload", fields...)
return err
}
s.logDebug("Discovery heartbeat received", append(envelopeFields(env), zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID), zap.String("status", payload.Status))...)
if strings.TrimSpace(payload.InstanceID) == "" && strings.TrimSpace(payload.ID) == "" {
return nil
}
@@ -163,6 +176,10 @@ func (s *RegistryService) handleHeartbeat(_ context.Context, env me.Envelope) er
ts = time.Now()
}
results := s.registry.UpdateHeartbeat(payload.ID, payload.InstanceID, strings.TrimSpace(payload.Status), ts, time.Now())
if len(results) == 0 {
s.logDebug("Discovery heartbeat ignored: entry not found", zap.String("id", payload.ID), zap.String("instance_id", payload.InstanceID))
return nil
}
for _, result := range results {
if result.BecameHealthy {
s.logInfo("Discovery registry entry became healthy", append(entryFields(result.Entry), zap.String("status", result.Entry.Status))...)
@@ -186,6 +203,7 @@ func (s *RegistryService) handleLookup(_ context.Context, env me.Envelope) error
}
resp := s.registry.Lookup(time.Now())
resp.RequestID = strings.TrimSpace(payload.RequestID)
s.logDebug("Discovery lookup prepared", zap.String("request_id", resp.RequestID), zap.Int("services", len(resp.Services)), zap.Int("gateways", len(resp.Gateways)))
if err := s.producer.SendMessage(NewLookupResponseEnvelope(s.sender, resp)); err != nil {
fields := []zap.Field{zap.String("request_id", resp.RequestID), zap.Error(err)}
s.logWarn("Failed to publish discovery lookup response", fields...)
@@ -221,10 +239,12 @@ func (s *RegistryService) initKV(msgBroker mb.Broker) {
}
provider, ok := msgBroker.(jetStreamProvider)
if !ok {
s.logInfo("Discovery KV disabled: broker does not support JetStream")
return
}
js := provider.JetStream()
if js == nil {
s.logWarn("Discovery KV disabled: JetStream not configured")
return
}
store, err := NewKVStore(s.logger, js, "")
@@ -255,10 +275,25 @@ func (s *RegistryService) consumeKVUpdates(watcher nats.KeyWatcher) {
if s == nil || watcher == nil {
return
}
initial := true
initialCount := 0
for entry := range watcher.Updates() {
if entry == nil {
if initial {
fields := []zap.Field{zap.Int("entries", initialCount)}
if s.kv != nil {
if bucket := s.kv.Bucket(); bucket != "" {
fields = append(fields, zap.String("bucket", bucket))
}
}
s.logInfo("Discovery KV initial sync complete", fields...)
initial = false
}
continue
}
if initial && entry.Operation() == nats.KeyValuePut {
initialCount++
}
switch entry.Operation() {
case nats.KeyValueDelete, nats.KeyValuePurge:
key := registryKeyFromKVKey(entry.Key())
@@ -302,6 +337,13 @@ func (s *RegistryService) logWarn(message string, fields ...zap.Field) {
s.logger.Warn(message, fields...)
}
func (s *RegistryService) logDebug(message string, fields ...zap.Field) {
if s.logger == nil {
return
}
s.logger.Debug(message, fields...)
}
func (s *RegistryService) logInfo(message string, fields ...zap.Field) {
if s.logger == nil {
return

View File

@@ -92,10 +92,23 @@ func (w *RegistryWatcher) consume(watcher nats.KeyWatcher) {
if w == nil || watcher == nil {
return
}
initial := true
initialCount := 0
for entry := range watcher.Updates() {
if entry == nil {
if initial && w.logger != nil {
fields := []zap.Field{zap.Int("entries", initialCount)}
if w.kv != nil {
fields = append(fields, zap.String("bucket", w.kv.Bucket()))
}
w.logger.Info("Discovery registry watcher initial sync complete", fields...)
initial = false
}
continue
}
if initial && entry.Operation() == nats.KeyValuePut {
initialCount++
}
switch entry.Operation() {
case nats.KeyValueDelete, nats.KeyValuePurge:
key := registryKeyFromKVKey(entry.Key())