service backend
This commit is contained in:
86
api/pkg/messaging/internal/natsb/NATS.go
Normal file
86
api/pkg/messaging/internal/natsb/NATS.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package natsb
|
||||
|
||||
import (
|
||||
me "github.com/tech/sendico/pkg/messaging/envelope"
|
||||
"github.com/tech/sendico/pkg/model"
|
||||
"github.com/tech/sendico/pkg/mutil/mzap"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (b *NatsBroker) Publish(envelope me.Envelope) error {
|
||||
subject := envelope.GetSignature().ToString()
|
||||
b.logger.Debug("Publishing message", mzap.Envelope(envelope))
|
||||
|
||||
// Serialize the message
|
||||
data, err := envelope.Serialize()
|
||||
if err != nil {
|
||||
b.logger.Error("Failed to serialize message", zap.Error(err), mzap.Envelope(envelope))
|
||||
return err
|
||||
}
|
||||
|
||||
if err := b.nc.Publish(subject, data); err != nil {
|
||||
b.logger.Error("Error publishing message", zap.Error(err), mzap.Envelope(envelope))
|
||||
return err
|
||||
}
|
||||
|
||||
b.logger.Debug("Message published", zap.String("subject", subject))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe subscribes to a NATS subject and returns a channel for messages
|
||||
func (b *NatsBroker) Subscribe(event model.NotificationEvent) (<-chan me.Envelope, error) {
|
||||
subject := event.ToString()
|
||||
b.logger.Info("Subscribing to subject", zap.String("subject", subject))
|
||||
|
||||
// Create a bidirectional channel to send messages to
|
||||
messageChan := make(chan me.Envelope)
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
topicSub, exists := b.topicSubs[subject]
|
||||
if !exists {
|
||||
var err error
|
||||
topicSub, err = NewTopicSubscription(b.logger, b.nc, subject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.topicSubs[subject] = topicSub
|
||||
}
|
||||
|
||||
// Add the consumer's channel to the topic subscription
|
||||
topicSub.AddConsumer(messageChan)
|
||||
|
||||
// Return the channel as a receive-only channel
|
||||
return messageChan, nil
|
||||
}
|
||||
|
||||
// Unsubscribe unsubscribes a consumer from a NATS subject
|
||||
func (b *NatsBroker) Unsubscribe(event model.NotificationEvent, messageChan <-chan me.Envelope) error {
|
||||
subject := event.ToString()
|
||||
b.logger.Info("Unsubscribing from subject", zap.String("subject", subject))
|
||||
|
||||
b.mu.Lock()
|
||||
topicSub, exists := b.topicSubs[subject]
|
||||
b.mu.Unlock()
|
||||
if !exists {
|
||||
b.logger.Warn("No subscription found for subject", zap.String("subject", subject))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove the consumer's channel from the topic subscription
|
||||
topicSub.RemoveConsumer(messageChan)
|
||||
if !topicSub.HasConsumers() {
|
||||
if err := topicSub.Unsubscribe(); err != nil {
|
||||
b.logger.Error("Error unsubscribing from subject", zap.String("subject", subject), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
delete(b.topicSubs, subject)
|
||||
b.mu.Unlock()
|
||||
}
|
||||
|
||||
b.logger.Info("Unsubscribed from subject", zap.String("subject", subject))
|
||||
return nil
|
||||
}
|
||||
113
api/pkg/messaging/internal/natsb/broker.go
Normal file
113
api/pkg/messaging/internal/natsb/broker.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package natsb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
nc "github.com/tech/sendico/pkg/messaging/internal/natsb/config"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type natsSubscriotions = map[string]*TopicSubscription
|
||||
|
||||
type NatsBroker struct {
|
||||
nc *nats.Conn
|
||||
logger *zap.Logger
|
||||
topicSubs natsSubscriotions
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
type envConfig struct {
|
||||
User, Password, Host string
|
||||
Port int
|
||||
}
|
||||
|
||||
// loadEnv gathers and validates connection details from environment variables
|
||||
// listed in the Settings struct. Invalid or missing values surface as a typed
|
||||
// InvalidArgument error so callers can decide how to handle them.
|
||||
func loadEnv(settings *nc.Settings, l *zap.Logger) (*envConfig, error) {
|
||||
get := func(key, label string) (string, error) {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v, nil
|
||||
}
|
||||
l.Error(fmt.Sprintf("NATS %s not found in environment", label), zap.String("env_var", key))
|
||||
return "", merrors.InvalidArgument(fmt.Sprintf("NATS %s not found in environment variable: %s", label, key))
|
||||
}
|
||||
|
||||
user, err := get(settings.UsernameEnv, "user name")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
password, err := get(settings.PasswordEnv, "password")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
host, err := get(settings.HostEnv, "host")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
portStr, err := get(settings.PortEnv, "port")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
port, err := strconv.Atoi(portStr)
|
||||
if err != nil || port <= 0 || port > 65535 {
|
||||
l.Error("Invalid NATS port value", zap.String("port", portStr))
|
||||
return nil, merrors.InvalidArgument("Invalid NATS port: " + portStr)
|
||||
}
|
||||
|
||||
return &envConfig{
|
||||
User: user,
|
||||
Password: password,
|
||||
Host: host,
|
||||
Port: port,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, error) {
|
||||
l := logger.Named("broker")
|
||||
// Helper function to get environment variables
|
||||
cfg, err := loadEnv(settings, l)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u := &url.URL{
|
||||
Scheme: "nats",
|
||||
Host: net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)),
|
||||
}
|
||||
natsURL := u.String()
|
||||
|
||||
opts := []nats.Option{
|
||||
nats.Name(settings.NATSName),
|
||||
nats.MaxReconnects(settings.MaxReconnects),
|
||||
nats.ReconnectWait(time.Duration(settings.ReconnectWait) * time.Second),
|
||||
nats.UserInfo(cfg.User, cfg.Password),
|
||||
}
|
||||
|
||||
res := &NatsBroker{
|
||||
logger: l.Named("nats"),
|
||||
topicSubs: natsSubscriotions{},
|
||||
}
|
||||
|
||||
if res.nc, err = nats.Connect(natsURL, opts...); err != nil {
|
||||
l.Error("Failed to connect to NATS", zap.String("url", natsURL), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger.Info("Connected to NATS", zap.String("broker", settings.NATSName),
|
||||
zap.String("url", fmt.Sprintf("nats://%s@%s", cfg.User, net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)))))
|
||||
return res, nil
|
||||
}
|
||||
12
api/pkg/messaging/internal/natsb/config/config.go
Normal file
12
api/pkg/messaging/internal/natsb/config/config.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package natsb
|
||||
|
||||
type Settings struct {
|
||||
URLEnv string `mapstructure:"url_env" yaml:"url_env"`
|
||||
HostEnv string `mapstructure:"host_env" yaml:"host_env"`
|
||||
PortEnv string `mapstructure:"port_env" yaml:"port_env"`
|
||||
UsernameEnv string `mapstructure:"username_env" yaml:"username_env"`
|
||||
PasswordEnv string `mapstructure:"password_env" yaml:"password_env"`
|
||||
NATSName string `mapstructure:"broker_name" yaml:"broker_name"`
|
||||
MaxReconnects int `mapstructure:"max_reconnects" yaml:"max_reconnects"`
|
||||
ReconnectWait int `mapstructure:"reconnect_wait" yaml:"reconnect_wait"`
|
||||
}
|
||||
78
api/pkg/messaging/internal/natsb/subscription.go
Normal file
78
api/pkg/messaging/internal/natsb/subscription.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package natsb
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
me "github.com/tech/sendico/pkg/messaging/envelope"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type TopicSubscription struct {
|
||||
sub *nats.Subscription
|
||||
consumers map[<-chan me.Envelope]chan me.Envelope
|
||||
mu sync.Mutex
|
||||
logger mlogger.Logger
|
||||
}
|
||||
|
||||
func NewTopicSubscription(logger mlogger.Logger, nc *nats.Conn, subject string) (*TopicSubscription, error) {
|
||||
ts := &TopicSubscription{
|
||||
consumers: make(map[<-chan me.Envelope]chan me.Envelope),
|
||||
logger: logger.Named(subject),
|
||||
}
|
||||
|
||||
sub, err := nc.Subscribe(subject, ts.handleMessage)
|
||||
if err != nil {
|
||||
logger.Error("Error subscribing to subject", zap.String("subject", subject), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
ts.sub = sub
|
||||
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
func (ts *TopicSubscription) handleMessage(m *nats.Msg) {
|
||||
ts.logger.Debug("Received message", zap.String("subject", m.Subject))
|
||||
|
||||
envelope, err := me.Deserialize(m.Data)
|
||||
if err != nil {
|
||||
ts.logger.Warn("Failed to deserialize message", zap.String("subject", m.Subject), zap.Error(err))
|
||||
return // Do not push invalid data to the channels
|
||||
}
|
||||
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
for _, c := range ts.consumers {
|
||||
select {
|
||||
case c <- envelope:
|
||||
default:
|
||||
ts.logger.Warn("Consumer is slow or not receiving messages", zap.String("subject", m.Subject))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *TopicSubscription) AddConsumer(messageChan chan me.Envelope) {
|
||||
ts.mu.Lock()
|
||||
ts.consumers[messageChan] = messageChan
|
||||
ts.mu.Unlock()
|
||||
}
|
||||
|
||||
func (ts *TopicSubscription) RemoveConsumer(messageChan <-chan me.Envelope) {
|
||||
ts.mu.Lock()
|
||||
if c, ok := ts.consumers[messageChan]; ok {
|
||||
delete(ts.consumers, messageChan)
|
||||
close(c)
|
||||
}
|
||||
ts.mu.Unlock()
|
||||
}
|
||||
|
||||
func (ts *TopicSubscription) HasConsumers() bool {
|
||||
ts.mu.Lock()
|
||||
defer ts.mu.Unlock()
|
||||
return len(ts.consumers) > 0
|
||||
}
|
||||
|
||||
func (ts *TopicSubscription) Unsubscribe() error {
|
||||
return ts.sub.Drain()
|
||||
}
|
||||
Reference in New Issue
Block a user