Some checks failed
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/chain_gateway Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful
ci/woodpecker/push/bump_version Pipeline failed
ci/woodpecker/push/nats Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
88 lines
2.3 KiB
Go
88 lines
2.3 KiB
Go
package inprocess
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
me "github.com/tech/sendico/pkg/messaging/envelope"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/model"
|
|
"github.com/tech/sendico/pkg/mutil/mzap"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type MessageBroker struct {
|
|
logger mlogger.Logger
|
|
subscribers map[string][]chan me.Envelope
|
|
lock sync.RWMutex
|
|
bufferSize int
|
|
}
|
|
|
|
func (b *MessageBroker) Publish(envelope me.Envelope) error {
|
|
topic := envelope.GetSignature().ToString()
|
|
b.logger.Debug("Publishing message", mzap.Envelope(envelope))
|
|
b.lock.RLock()
|
|
defer b.lock.RUnlock()
|
|
if subs, ok := b.subscribers[topic]; ok {
|
|
for _, sub := range subs {
|
|
select {
|
|
case sub <- envelope:
|
|
default:
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
b.logger.Warn("Topic not found", mzap.Envelope(envelope))
|
|
return merrors.NoMessagingTopic(topic)
|
|
}
|
|
|
|
func (b *MessageBroker) Subscribe(event model.NotificationEvent) (<-chan me.Envelope, error) {
|
|
topic := event.ToString()
|
|
b.logger.Info("New topic subscriber", zap.String("topic", topic))
|
|
ch := make(chan me.Envelope, b.bufferSize) // Buffered channel to avoid blocking publishers
|
|
{
|
|
b.lock.Lock()
|
|
defer b.lock.Unlock()
|
|
b.subscribers[topic] = append(b.subscribers[topic], ch)
|
|
}
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
func (b *MessageBroker) Unsubscribe(event model.NotificationEvent, subChan <-chan me.Envelope) error {
|
|
topic := event.ToString()
|
|
b.logger.Info("Unsubscribing topic", zap.String("topic", topic))
|
|
b.lock.Lock()
|
|
defer b.lock.Unlock()
|
|
|
|
subs, ok := b.subscribers[topic]
|
|
if !ok {
|
|
b.logger.Debug("No subscribers for topic", zap.String("topic", topic))
|
|
return nil
|
|
}
|
|
|
|
for i, ch := range subs {
|
|
if ch == subChan {
|
|
b.subscribers[topic] = append(subs[:i], subs[i+1:]...)
|
|
close(ch)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
b.logger.Warn("No topic found", zap.String("topic", topic))
|
|
return merrors.NoMessagingTopic(topic)
|
|
}
|
|
|
|
func NewInProcessBroker(logger mlogger.Logger, bufferSize int) (*MessageBroker, error) {
|
|
if bufferSize < 1 {
|
|
return nil, merrors.InvalidArgument(fmt.Sprintf("Invelid buffer size %d. It must be greater than 1", bufferSize), "bufferSize")
|
|
}
|
|
logger.Info("Created in-process logger", zap.Int("buffer_size", bufferSize))
|
|
return &MessageBroker{
|
|
logger: logger.Named("in_process"),
|
|
subscribers: make(map[string][]chan me.Envelope),
|
|
bufferSize: bufferSize,
|
|
}, nil
|
|
}
|