package natsb import ( "sync" "github.com/nats-io/nats.go" me "github.com/tech/sendico/pkg/messaging/envelope" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) type TopicSubscription struct { sub *nats.Subscription consumers map[<-chan me.Envelope]chan me.Envelope mu sync.Mutex logger mlogger.Logger } func NewTopicSubscription(logger mlogger.Logger, nc *nats.Conn, subject string) (*TopicSubscription, error) { ts := &TopicSubscription{ consumers: make(map[<-chan me.Envelope]chan me.Envelope), logger: logger.Named(subject), } sub, err := nc.Subscribe(subject, ts.handleMessage) if err != nil { logger.Error("Error subscribing to subject", zap.String("subject", subject), zap.Error(err)) return nil, err } ts.sub = sub return ts, nil } func (ts *TopicSubscription) handleMessage(m *nats.Msg) { ts.logger.Debug("Received message", zap.String("subject", m.Subject)) envelope, err := me.Deserialize(m.Data) if err != nil { ts.logger.Warn("Failed to deserialize message", zap.String("subject", m.Subject), zap.Error(err)) return // Do not push invalid data to the channels } ts.mu.Lock() defer ts.mu.Unlock() for _, c := range ts.consumers { select { case c <- envelope: default: ts.logger.Warn("Consumer is slow or not receiving messages", zap.String("subject", m.Subject)) } } } func (ts *TopicSubscription) AddConsumer(messageChan chan me.Envelope) { ts.mu.Lock() ts.consumers[messageChan] = messageChan ts.mu.Unlock() } func (ts *TopicSubscription) RemoveConsumer(messageChan <-chan me.Envelope) { ts.mu.Lock() if c, ok := ts.consumers[messageChan]; ok { delete(ts.consumers, messageChan) close(c) } ts.mu.Unlock() } func (ts *TopicSubscription) HasConsumers() bool { ts.mu.Lock() defer ts.mu.Unlock() return len(ts.consumers) > 0 } func (ts *TopicSubscription) Unsubscribe() error { return ts.sub.Drain() }