Files
sendico/api/pkg/messaging/internal/natsb/NATS.go
Stephan D 62a6631b9a
All checks were successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/nats Pipeline was successful
service backend
2025-11-07 18:35:26 +01:00

87 lines
2.4 KiB
Go

package natsb
import (
me "github.com/tech/sendico/pkg/messaging/envelope"
"github.com/tech/sendico/pkg/model"
"github.com/tech/sendico/pkg/mutil/mzap"
"go.uber.org/zap"
)
func (b *NatsBroker) Publish(envelope me.Envelope) error {
subject := envelope.GetSignature().ToString()
b.logger.Debug("Publishing message", mzap.Envelope(envelope))
// Serialize the message
data, err := envelope.Serialize()
if err != nil {
b.logger.Error("Failed to serialize message", zap.Error(err), mzap.Envelope(envelope))
return err
}
if err := b.nc.Publish(subject, data); err != nil {
b.logger.Error("Error publishing message", zap.Error(err), mzap.Envelope(envelope))
return err
}
b.logger.Debug("Message published", zap.String("subject", subject))
return nil
}
// Subscribe subscribes to a NATS subject and returns a channel for messages
func (b *NatsBroker) Subscribe(event model.NotificationEvent) (<-chan me.Envelope, error) {
subject := event.ToString()
b.logger.Info("Subscribing to subject", zap.String("subject", subject))
// Create a bidirectional channel to send messages to
messageChan := make(chan me.Envelope)
b.mu.Lock()
defer b.mu.Unlock()
topicSub, exists := b.topicSubs[subject]
if !exists {
var err error
topicSub, err = NewTopicSubscription(b.logger, b.nc, subject)
if err != nil {
return nil, err
}
b.topicSubs[subject] = topicSub
}
// Add the consumer's channel to the topic subscription
topicSub.AddConsumer(messageChan)
// Return the channel as a receive-only channel
return messageChan, nil
}
// Unsubscribe unsubscribes a consumer from a NATS subject
func (b *NatsBroker) Unsubscribe(event model.NotificationEvent, messageChan <-chan me.Envelope) error {
subject := event.ToString()
b.logger.Info("Unsubscribing from subject", zap.String("subject", subject))
b.mu.Lock()
topicSub, exists := b.topicSubs[subject]
b.mu.Unlock()
if !exists {
b.logger.Warn("No subscription found for subject", zap.String("subject", subject))
return nil
}
// Remove the consumer's channel from the topic subscription
topicSub.RemoveConsumer(messageChan)
if !topicSub.HasConsumers() {
if err := topicSub.Unsubscribe(); err != nil {
b.logger.Error("Error unsubscribing from subject", zap.String("subject", subject), zap.Error(err))
return err
}
b.mu.Lock()
delete(b.topicSubs, subject)
b.mu.Unlock()
}
b.logger.Info("Unsubscribed from subject", zap.String("subject", subject))
return nil
}