79 lines
1.9 KiB
Go
79 lines
1.9 KiB
Go
package natsb
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
me "github.com/tech/sendico/pkg/messaging/envelope"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type TopicSubscription struct {
|
|
sub *nats.Subscription
|
|
consumers map[<-chan me.Envelope]chan me.Envelope
|
|
mu sync.Mutex
|
|
logger mlogger.Logger
|
|
}
|
|
|
|
func NewTopicSubscription(logger mlogger.Logger, nc *nats.Conn, subject string) (*TopicSubscription, error) {
|
|
ts := &TopicSubscription{
|
|
consumers: make(map[<-chan me.Envelope]chan me.Envelope),
|
|
logger: logger.Named(subject),
|
|
}
|
|
|
|
sub, err := nc.Subscribe(subject, ts.handleMessage)
|
|
if err != nil {
|
|
logger.Error("Error subscribing to subject", zap.String("subject", subject), zap.Error(err))
|
|
return nil, err
|
|
}
|
|
ts.sub = sub
|
|
|
|
return ts, nil
|
|
}
|
|
|
|
func (ts *TopicSubscription) handleMessage(m *nats.Msg) {
|
|
ts.logger.Debug("Received message", zap.String("subject", m.Subject))
|
|
|
|
envelope, err := me.Deserialize(m.Data)
|
|
if err != nil {
|
|
ts.logger.Warn("Failed to deserialize message", zap.String("subject", m.Subject), zap.Error(err))
|
|
return // Do not push invalid data to the channels
|
|
}
|
|
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
for _, c := range ts.consumers {
|
|
select {
|
|
case c <- envelope:
|
|
default:
|
|
ts.logger.Warn("Consumer is slow or not receiving messages", zap.String("subject", m.Subject))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ts *TopicSubscription) AddConsumer(messageChan chan me.Envelope) {
|
|
ts.mu.Lock()
|
|
ts.consumers[messageChan] = messageChan
|
|
ts.mu.Unlock()
|
|
}
|
|
|
|
func (ts *TopicSubscription) RemoveConsumer(messageChan <-chan me.Envelope) {
|
|
ts.mu.Lock()
|
|
if c, ok := ts.consumers[messageChan]; ok {
|
|
delete(ts.consumers, messageChan)
|
|
close(c)
|
|
}
|
|
ts.mu.Unlock()
|
|
}
|
|
|
|
func (ts *TopicSubscription) HasConsumers() bool {
|
|
ts.mu.Lock()
|
|
defer ts.mu.Unlock()
|
|
return len(ts.consumers) > 0
|
|
}
|
|
|
|
func (ts *TopicSubscription) Unsubscribe() error {
|
|
return ts.sub.Drain()
|
|
}
|