Files
sendico/api/pkg/messaging/internal/natsb/broker.go
Stephan D 717dafc673
Some checks failed
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/chain_gateway Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful
ci/woodpecker/push/bump_version Pipeline failed
ci/woodpecker/push/nats Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
better message formatting
2025-11-19 13:54:25 +01:00

114 lines
2.8 KiB
Go

package natsb
import (
"fmt"
"net"
"net/url"
"os"
"strconv"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/tech/sendico/pkg/merrors"
nc "github.com/tech/sendico/pkg/messaging/internal/natsb/config"
"github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap"
)
type natsSubscriotions = map[string]*TopicSubscription
type NatsBroker struct {
nc *nats.Conn
logger *zap.Logger
topicSubs natsSubscriotions
mu sync.Mutex
}
type envConfig struct {
User, Password, Host string
Port int
}
// loadEnv gathers and validates connection details from environment variables
// listed in the Settings struct. Invalid or missing values surface as a typed
// InvalidArgument error so callers can decide how to handle them.
func loadEnv(settings *nc.Settings, l *zap.Logger) (*envConfig, error) {
get := func(key, label string) (string, error) {
if v := os.Getenv(key); v != "" {
return v, nil
}
l.Error(fmt.Sprintf("NATS %s not found in environment", label), zap.String("env_var", key))
return "", merrors.InvalidArgument(fmt.Sprintf("NATS %s not found in environment variable: %s", label, key), key)
}
user, err := get(settings.UsernameEnv, "user name")
if err != nil {
return nil, err
}
password, err := get(settings.PasswordEnv, "password")
if err != nil {
return nil, err
}
host, err := get(settings.HostEnv, "host")
if err != nil {
return nil, err
}
portStr, err := get(settings.PortEnv, "port")
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil || port <= 0 || port > 65535 {
l.Error("Invalid NATS port value", zap.String("port", portStr))
return nil, merrors.InvalidArgument("Invalid NATS port: "+portStr, settings.PortEnv)
}
return &envConfig{
User: user,
Password: password,
Host: host,
Port: port,
}, nil
}
func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, error) {
l := logger.Named("broker")
// Helper function to get environment variables
cfg, err := loadEnv(settings, l)
if err != nil {
return nil, err
}
u := &url.URL{
Scheme: "nats",
Host: net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)),
}
natsURL := u.String()
opts := []nats.Option{
nats.Name(settings.NATSName),
nats.MaxReconnects(settings.MaxReconnects),
nats.ReconnectWait(time.Duration(settings.ReconnectWait) * time.Second),
nats.UserInfo(cfg.User, cfg.Password),
}
res := &NatsBroker{
logger: l.Named("nats"),
topicSubs: natsSubscriotions{},
}
if res.nc, err = nats.Connect(natsURL, opts...); err != nil {
l.Error("Failed to connect to NATS", zap.String("url", natsURL), zap.Error(err))
return nil, err
}
logger.Info("Connected to NATS", zap.String("broker", settings.NATSName),
zap.String("url", fmt.Sprintf("nats://%s@%s", cfg.User, net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)))))
return res, nil
}