Fixed messaging buffering issue
This commit is contained in:
@@ -33,7 +33,7 @@ func (b *NatsBroker) Subscribe(event model.NotificationEvent) (<-chan me.Envelop
|
||||
b.logger.Info("Subscribing to subject", zap.String("subject", subject))
|
||||
|
||||
// Create a bidirectional channel to send messages to
|
||||
messageChan := make(chan me.Envelope)
|
||||
messageChan := make(chan me.Envelope, b.bufferSize)
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
@@ -20,11 +20,12 @@ import (
|
||||
type natsSubscriotions = map[string]*TopicSubscription
|
||||
|
||||
type NatsBroker struct {
|
||||
nc *nats.Conn
|
||||
js nats.JetStreamContext
|
||||
logger *zap.Logger
|
||||
topicSubs natsSubscriotions
|
||||
mu sync.Mutex
|
||||
nc *nats.Conn
|
||||
js nats.JetStreamContext
|
||||
logger *zap.Logger
|
||||
topicSubs natsSubscriotions
|
||||
mu sync.Mutex
|
||||
bufferSize int
|
||||
}
|
||||
|
||||
type envConfig struct {
|
||||
@@ -32,6 +33,8 @@ type envConfig struct {
|
||||
Port int
|
||||
}
|
||||
|
||||
const defaultConsumerBufferSize = 1024
|
||||
|
||||
// loadEnv gathers and validates connection details from environment variables
|
||||
// listed in the Settings struct. Invalid or missing values surface as a typed
|
||||
// InvalidArgument error so callers can decide how to handle them.
|
||||
@@ -83,6 +86,10 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
|
||||
var err error
|
||||
var cfg *envConfig
|
||||
var natsURL string
|
||||
bufferSize := defaultConsumerBufferSize
|
||||
if settings != nil && settings.BufferSize > 0 {
|
||||
bufferSize = settings.BufferSize
|
||||
}
|
||||
if settings != nil && strings.TrimSpace(settings.URLEnv) != "" {
|
||||
urlVal := strings.TrimSpace(os.Getenv(settings.URLEnv))
|
||||
if urlVal != "" {
|
||||
@@ -123,8 +130,9 @@ func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, e
|
||||
}
|
||||
|
||||
res := &NatsBroker{
|
||||
logger: l.Named("nats"),
|
||||
topicSubs: natsSubscriotions{},
|
||||
logger: l.Named("nats"),
|
||||
topicSubs: natsSubscriotions{},
|
||||
bufferSize: bufferSize,
|
||||
}
|
||||
|
||||
if res.nc, err = nats.Connect(natsURL, opts...); err != nil {
|
||||
|
||||
@@ -9,4 +9,5 @@ type Settings struct {
|
||||
NATSName string `mapstructure:"broker_name" yaml:"broker_name"`
|
||||
MaxReconnects int `mapstructure:"max_reconnects" yaml:"max_reconnects"`
|
||||
ReconnectWait int `mapstructure:"reconnect_wait" yaml:"reconnect_wait"`
|
||||
BufferSize int `mapstructure:"buffer_size" yaml:"buffer_size"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user