package natsb import ( "fmt" "net" "net/url" "os" "strconv" "sync" "time" "github.com/nats-io/nats.go" "github.com/tech/sendico/pkg/merrors" nc "github.com/tech/sendico/pkg/messaging/internal/natsb/config" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) type natsSubscriotions = map[string]*TopicSubscription type NatsBroker struct { nc *nats.Conn logger *zap.Logger topicSubs natsSubscriotions mu sync.Mutex } type envConfig struct { User, Password, Host string Port int } // loadEnv gathers and validates connection details from environment variables // listed in the Settings struct. Invalid or missing values surface as a typed // InvalidArgument error so callers can decide how to handle them. func loadEnv(settings *nc.Settings, l *zap.Logger) (*envConfig, error) { get := func(key, label string) (string, error) { if v := os.Getenv(key); v != "" { return v, nil } l.Error(fmt.Sprintf("NATS %s not found in environment", label), zap.String("env_var", key)) return "", merrors.InvalidArgument(fmt.Sprintf("NATS %s not found in environment variable: %s", label, key), key) } user, err := get(settings.UsernameEnv, "user name") if err != nil { return nil, err } password, err := get(settings.PasswordEnv, "password") if err != nil { return nil, err } host, err := get(settings.HostEnv, "host") if err != nil { return nil, err } portStr, err := get(settings.PortEnv, "port") if err != nil { return nil, err } port, err := strconv.Atoi(portStr) if err != nil || port <= 0 || port > 65535 { l.Error("Invalid NATS port value", zap.String("port", portStr)) return nil, merrors.InvalidArgument("Invalid NATS port: "+portStr, settings.PortEnv) } return &envConfig{ User: user, Password: password, Host: host, Port: port, }, nil } func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, error) { l := logger.Named("broker") // Helper function to get environment variables cfg, err := loadEnv(settings, l) if err != nil { return nil, err } u := &url.URL{ Scheme: "nats", Host: net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)), } natsURL := u.String() opts := []nats.Option{ nats.Name(settings.NATSName), nats.MaxReconnects(settings.MaxReconnects), nats.ReconnectWait(time.Duration(settings.ReconnectWait) * time.Second), nats.UserInfo(cfg.User, cfg.Password), } res := &NatsBroker{ logger: l.Named("nats"), topicSubs: natsSubscriotions{}, } if res.nc, err = nats.Connect(natsURL, opts...); err != nil { l.Error("Failed to connect to NATS", zap.String("url", natsURL), zap.Error(err)) return nil, err } logger.Info("Connected to NATS", zap.String("broker", settings.NATSName), zap.String("url", fmt.Sprintf("nats://%s@%s", cfg.User, net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port))))) return res, nil }