Files
sendico/api/pkg/discovery/announcer.go
2026-03-03 22:29:03 +01:00

197 lines
4.8 KiB
Go

package discovery
import (
"os"
"strings"
"sync"
"time"
"github.com/google/uuid"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
const defaultReannounceHeartbeatFactor = 6
type Announcer struct {
logger mlogger.Logger
producer msg.Producer
sender string
announce Announcement
startOnce sync.Once
stopOnce sync.Once
stopCh chan struct{}
doneCh chan struct{}
announceLevel zapcore.Level
}
func NewAnnouncer(logger mlogger.Logger, producer msg.Producer, sender string, announce Announcement) *Announcer {
if logger == nil {
logger = zap.NewNop()
}
logger = logger.Named("discovery")
announce = normalizeAnnouncement(announce)
if announce.Service == "" {
announce.Service = strings.TrimSpace(sender)
}
if announce.InstanceID == "" {
announce.InstanceID = InstanceID()
}
if announce.ID == "" {
announce.ID = DefaultEntryID(announce.Service)
}
return &Announcer{
logger: logger,
producer: producer,
sender: strings.TrimSpace(sender),
announce: announce,
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
announceLevel: zapcore.InfoLevel,
}
}
func (a *Announcer) Start() {
if a == nil {
return
}
a.startOnce.Do(func() {
if a.producer == nil {
a.logWarn("Discovery announce skipped: producer not configured", announcementFields(a.announce)...)
close(a.doneCh)
return
}
if strings.TrimSpace(a.announce.ID) == "" {
a.logWarn("Discovery announce skipped: missing instance id", announcementFields(a.announce)...)
close(a.doneCh)
return
}
a.logInfo("Discovery announcer starting", announcementFields(a.announce)...)
a.sendAnnouncement()
a.sendHeartbeat()
go a.heartbeatLoop()
})
}
func (a *Announcer) Stop() {
if a == nil {
return
}
a.stopOnce.Do(func() {
close(a.stopCh)
<-a.doneCh
a.logInfo("Discovery announcer stopped", announcementFields(a.announce)...)
})
}
func (a *Announcer) heartbeatLoop() {
defer close(a.doneCh)
heartbeatInterval := heartbeatIntervalForHealth(a.announce.Health)
heartbeatTicker := time.NewTicker(heartbeatInterval)
defer heartbeatTicker.Stop()
heartbeatTicks := 0
for {
select {
case <-a.stopCh:
return
case <-heartbeatTicker.C:
heartbeatTicks++
if heartbeatTicks%defaultReannounceHeartbeatFactor == 0 {
a.sendAnnouncement()
} else {
a.sendHeartbeat()
}
}
}
}
func heartbeatIntervalForHealth(health HealthParams) time.Duration {
interval := time.Duration(health.IntervalSec) * time.Second
if interval <= 0 {
return time.Duration(DefaultHealthIntervalSec) * time.Second
}
return interval
}
func (a *Announcer) sendAnnouncement() {
env := NewServiceAnnounceEnvelope(a.sender, a.announce)
event := ServiceAnnounceEvent()
if a.announce.Rail != "" {
env = NewGatewayAnnounceEnvelope(a.sender, a.announce)
event = GatewayAnnounceEvent()
}
if err := a.producer.SendMessage(env); err != nil {
fields := append(announcementFields(a.announce), zap.String("event", event.ToString()), zap.Error(err))
a.logWarn("Failed to publish discovery announce", fields...)
return
}
a.logger.Log(a.announceLevel, "Discovery announce published", append(announcementFields(a.announce), zap.String("event", event.ToString()))...)
a.announceLevel = zapcore.DebugLevel
}
func (a *Announcer) sendHeartbeat() {
entryKey := registryKey(
a.announce.Service,
a.announce.Rail,
legacyNetworkFromCurrencies(a.announce.Currencies),
a.announce.Operations,
a.announce.Version,
a.announce.InstanceID,
)
hb := Heartbeat{
ID: a.announce.ID,
InstanceID: a.announce.InstanceID,
EntryKey: entryKey,
Status: "ok",
TS: time.Now().Unix(),
}
if err := a.producer.SendMessage(NewHeartbeatEnvelope(a.sender, hb)); err != nil {
fields := append(announcementFields(a.announce), zap.String("event", HeartbeatEvent().ToString()), zap.Error(err))
a.logWarn("Failed to publish discovery heartbeat", fields...)
}
}
func (a *Announcer) logInfo(message string, fields ...zap.Field) {
if a == nil {
return
}
a.logger.Info(message, fields...)
}
func (a *Announcer) logWarn(message string, fields ...zap.Field) {
if a == nil {
return
}
a.logger.Warn(message, fields...)
}
func DefaultEntryID(service string) string {
clean := strings.ToLower(strings.TrimSpace(service))
if clean == "" {
clean = "service"
}
host, _ := os.Hostname()
host = strings.ToLower(strings.TrimSpace(host))
uid := uuid.NewString()
if host == "" {
return clean + "_" + uid
}
return clean + "_" + host + "_" + uid
}
func DefaultInstanceID(service string) string {
return DefaultEntryID(service)
}
func DefaultInvokeURI(service string) string {
clean := strings.ToLower(strings.TrimSpace(service))
if clean == "" {
return ""
}
return clean
}