package inprocess import ( "fmt" "sync" "github.com/tech/sendico/pkg/merrors" me "github.com/tech/sendico/pkg/messaging/envelope" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/mutil/mzap" "go.uber.org/zap" ) type MessageBroker struct { logger mlogger.Logger subscribers map[string][]chan me.Envelope lock sync.RWMutex bufferSize int } func (b *MessageBroker) Publish(envelope me.Envelope) error { topic := envelope.GetSignature().ToString() b.logger.Debug("Publishing message", mzap.Envelope(envelope)) b.lock.RLock() defer b.lock.RUnlock() if subs, ok := b.subscribers[topic]; ok { for _, sub := range subs { select { case sub <- envelope: default: } } return nil } b.logger.Warn("Topic not found", mzap.Envelope(envelope)) return merrors.NoMessagingTopic(topic) } func (b *MessageBroker) Subscribe(event model.NotificationEvent) (<-chan me.Envelope, error) { topic := event.ToString() b.logger.Info("New topic subscriber", zap.String("topic", topic)) ch := make(chan me.Envelope, b.bufferSize) // Buffered channel to avoid blocking publishers { b.lock.Lock() defer b.lock.Unlock() b.subscribers[topic] = append(b.subscribers[topic], ch) } return ch, nil } func (b *MessageBroker) Unsubscribe(event model.NotificationEvent, subChan <-chan me.Envelope) error { topic := event.ToString() b.logger.Info("Unsubscribing topic", zap.String("topic", topic)) b.lock.Lock() defer b.lock.Unlock() subs, ok := b.subscribers[topic] if !ok { b.logger.Debug("No subscribers for topic", zap.String("topic", topic)) return nil } for i, ch := range subs { if ch == subChan { b.subscribers[topic] = append(subs[:i], subs[i+1:]...) close(ch) return nil } } b.logger.Warn("No topic found", zap.String("topic", topic)) return merrors.NoMessagingTopic(topic) } func NewInProcessBroker(logger mlogger.Logger, bufferSize int) (*MessageBroker, error) { if bufferSize < 1 { return nil, merrors.InvalidArgument(fmt.Sprintf("Invelid buffer size %d. It must be greater than 1", bufferSize), "bufferSize") } logger.Info("Created in-process logger", zap.Int("buffer_size", bufferSize)) return &MessageBroker{ logger: logger.Named("in_process"), subscribers: make(map[string][]chan me.Envelope), bufferSize: bufferSize, }, nil }