87 lines
2.4 KiB
Go
87 lines
2.4 KiB
Go
package natsb
|
|
|
|
import (
|
|
me "github.com/tech/sendico/pkg/messaging/envelope"
|
|
"github.com/tech/sendico/pkg/model"
|
|
"github.com/tech/sendico/pkg/mutil/mzap"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func (b *NatsBroker) Publish(envelope me.Envelope) error {
|
|
subject := envelope.GetSignature().ToString()
|
|
b.logger.Debug("Publishing message", mzap.Envelope(envelope))
|
|
|
|
// Serialize the message
|
|
data, err := envelope.Serialize()
|
|
if err != nil {
|
|
b.logger.Error("Failed to serialize message", zap.Error(err), mzap.Envelope(envelope))
|
|
return err
|
|
}
|
|
|
|
if err := b.nc.Publish(subject, data); err != nil {
|
|
b.logger.Error("Error publishing message", zap.Error(err), mzap.Envelope(envelope))
|
|
return err
|
|
}
|
|
|
|
b.logger.Debug("Message published", zap.String("subject", subject))
|
|
return nil
|
|
}
|
|
|
|
// Subscribe subscribes to a NATS subject and returns a channel for messages
|
|
func (b *NatsBroker) Subscribe(event model.NotificationEvent) (<-chan me.Envelope, error) {
|
|
subject := event.ToString()
|
|
b.logger.Info("Subscribing to subject", zap.String("subject", subject))
|
|
|
|
// Create a bidirectional channel to send messages to
|
|
messageChan := make(chan me.Envelope)
|
|
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
topicSub, exists := b.topicSubs[subject]
|
|
if !exists {
|
|
var err error
|
|
topicSub, err = NewTopicSubscription(b.logger, b.nc, subject)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
b.topicSubs[subject] = topicSub
|
|
}
|
|
|
|
// Add the consumer's channel to the topic subscription
|
|
topicSub.AddConsumer(messageChan)
|
|
|
|
// Return the channel as a receive-only channel
|
|
return messageChan, nil
|
|
}
|
|
|
|
// Unsubscribe unsubscribes a consumer from a NATS subject
|
|
func (b *NatsBroker) Unsubscribe(event model.NotificationEvent, messageChan <-chan me.Envelope) error {
|
|
subject := event.ToString()
|
|
b.logger.Info("Unsubscribing from subject", zap.String("subject", subject))
|
|
|
|
b.mu.Lock()
|
|
topicSub, exists := b.topicSubs[subject]
|
|
b.mu.Unlock()
|
|
if !exists {
|
|
b.logger.Warn("No subscription found for subject", zap.String("subject", subject))
|
|
return nil
|
|
}
|
|
|
|
// Remove the consumer's channel from the topic subscription
|
|
topicSub.RemoveConsumer(messageChan)
|
|
if !topicSub.HasConsumers() {
|
|
if err := topicSub.Unsubscribe(); err != nil {
|
|
b.logger.Error("Error unsubscribing from subject", zap.String("subject", subject), zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
b.mu.Lock()
|
|
delete(b.topicSubs, subject)
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
b.logger.Info("Unsubscribed from subject", zap.String("subject", subject))
|
|
return nil
|
|
}
|