package natsb import ( "fmt" "net" "net/url" "os" "strconv" "strings" "sync" "time" "github.com/nats-io/nats.go" "github.com/tech/sendico/pkg/merrors" nc "github.com/tech/sendico/pkg/messaging/internal/natsb/config" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) type natsSubscriotions = map[string]*TopicSubscription type NatsBroker struct { logger mlogger.Logger nc *nats.Conn js nats.JetStreamContext topicSubs natsSubscriotions mu sync.Mutex bufferSize int } type envConfig struct { User, Password, Host string Port int } const defaultConsumerBufferSize = 1024 const redactedNATSPassword = "xxxxx" func buildSafePublishableNATSURL(rawURL string) string { if rawURL == "" { return rawURL } parts := strings.Split(rawURL, ",") safe := make([]string, 0, len(parts)) for _, part := range parts { trimmed := strings.TrimSpace(part) if trimmed == "" { continue } built, ok := buildSafePublishableNATSEntry(trimmed) if !ok { safe = append(safe, trimmed) continue } safe = append(safe, built) } if len(safe) == 0 { return strings.TrimSpace(rawURL) } return strings.Join(safe, ",") } func buildSafePublishableNATSEntry(raw string) (string, bool) { parsed, err := url.Parse(raw) if err == nil && parsed.Host != "" { safe := &url.URL{ Scheme: parsed.Scheme, Host: parsed.Host, Path: parsed.Path, RawPath: parsed.RawPath, RawQuery: parsed.RawQuery, Fragment: parsed.Fragment, } if safe.Scheme == "" { safe.Scheme = "nats" } if parsed.User != nil { username := parsed.User.Username() if username == "" { username = redactedNATSPassword } safe.User = url.UserPassword(username, redactedNATSPassword) } return safe.String(), true } return buildSafePublishableFromAuthority(raw) } func buildSafePublishableFromAuthority(raw string) (string, bool) { scheme := "nats" authorityAndSuffix := raw if schemeIndex := strings.Index(raw, "://"); schemeIndex >= 0 { if candidate := strings.TrimSpace(raw[:schemeIndex]); candidate != "" { scheme = candidate } authorityAndSuffix = raw[schemeIndex+3:] } authorityEnd := strings.IndexAny(authorityAndSuffix, "/?#") if authorityEnd < 0 { authorityEnd = len(authorityAndSuffix) } authority := authorityAndSuffix[:authorityEnd] suffix := authorityAndSuffix[authorityEnd:] atIndex := strings.LastIndex(authority, "@") hostPort := authority username := "" if atIndex >= 0 { userInfo := authority[:atIndex] hostPort = authority[atIndex+1:] if hostPort == "" { return "", false } username = userInfo if colonIndex := strings.Index(userInfo, ":"); colonIndex >= 0 { username = userInfo[:colonIndex] } if username == "" { username = redactedNATSPassword } } if hostPort == "" || strings.ContainsAny(hostPort, " \t\r\n") { return "", false } safe := &url.URL{ Scheme: scheme, Host: hostPort, } if username != "" { safe.User = url.UserPassword(username, redactedNATSPassword) } return safe.String() + suffix, true } // loadEnv gathers and validates connection details from environment variables // listed in the Settings struct. Invalid or missing values surface as a typed // InvalidArgument error so callers can decide how to handle them. func loadEnv(settings *nc.Settings, l mlogger.Logger) (*envConfig, error) { get := func(key, label string) (string, error) { if v := os.Getenv(key); v != "" { return v, nil } l.Error(fmt.Sprintf("NATS %s not found in environment", label), zap.String("env_var", key)) return "", merrors.InvalidArgument(fmt.Sprintf("NATS %s not found in environment variable: %s", label, key), key) } user, err := get(settings.UsernameEnv, "user name") if err != nil { return nil, err } password, err := get(settings.PasswordEnv, "password") if err != nil { return nil, err } host, err := get(settings.HostEnv, "host") if err != nil { return nil, err } portStr, err := get(settings.PortEnv, "port") if err != nil { return nil, err } port, err := strconv.Atoi(portStr) if err != nil || port <= 0 || port > 65535 { l.Error("Invalid NATS port value", zap.String("port", portStr)) return nil, merrors.InvalidArgument("Invalid NATS port: "+portStr, settings.PortEnv) } return &envConfig{ User: user, Password: password, Host: host, Port: port, }, nil } func NewNatsBroker(logger mlogger.Logger, settings *nc.Settings) (*NatsBroker, error) { l := logger.Named("broker") var err error var cfg *envConfig var natsURL string bufferSize := defaultConsumerBufferSize if settings != nil && settings.BufferSize > 0 { bufferSize = settings.BufferSize } if settings != nil && strings.TrimSpace(settings.URLEnv) != "" { urlVal := strings.TrimSpace(os.Getenv(settings.URLEnv)) if urlVal != "" { natsURL = urlVal } } if natsURL == "" { // Helper function to get environment variables cfg, err = loadEnv(settings, l) if err != nil { return nil, err } u := &url.URL{ Scheme: "nats", Host: net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)), } natsURL = u.String() } publishableNATSURL := buildSafePublishableNATSURL(natsURL) opts := []nats.Option{ nats.Name(settings.NATSName), nats.MaxReconnects(settings.MaxReconnects), nats.ReconnectWait(time.Duration(settings.ReconnectWait) * time.Second), nats.RetryOnFailedConnect(true), nats.DisconnectErrHandler(func(conn *nats.Conn, err error) { fields := []zap.Field{ zap.String("broker", settings.NATSName), } if conn != nil { fields = append(fields, zap.String("connected_url", buildSafePublishableNATSURL(conn.ConnectedUrl()))) } if err != nil { fields = append(fields, zap.Error(err)) } l.Warn("Disconnected from NATS", fields...) }), nats.ReconnectHandler(func(conn *nats.Conn) { fields := []zap.Field{ zap.String("broker", settings.NATSName), } if conn != nil { fields = append(fields, zap.String("connected_url", buildSafePublishableNATSURL(conn.ConnectedUrl()))) } l.Info("Reconnected to NATS", fields...) }), nats.ClosedHandler(func(conn *nats.Conn) { fields := []zap.Field{ zap.String("broker", settings.NATSName), } if conn != nil { if url := conn.ConnectedUrl(); url != "" { fields = append(fields, zap.String("connected_url", buildSafePublishableNATSURL(url))) } if err := conn.LastError(); err != nil { fields = append(fields, zap.Error(err)) } } l.Warn("NATS connection closed", fields...) }), } if cfg != nil { opts = append(opts, nats.UserInfo(cfg.User, cfg.Password)) } else if settings != nil { userEnv := strings.TrimSpace(settings.UsernameEnv) passEnv := strings.TrimSpace(settings.PasswordEnv) if userEnv != "" && passEnv != "" { user := strings.TrimSpace(os.Getenv(userEnv)) pass := strings.TrimSpace(os.Getenv(passEnv)) if user != "" || pass != "" { opts = append(opts, nats.UserInfo(user, pass)) } } } res := &NatsBroker{ logger: l.Named("nats"), topicSubs: natsSubscriotions{}, bufferSize: bufferSize, } if res.nc, err = nats.Connect(natsURL, opts...); err != nil { l.Error("Failed to connect to NATS", zap.String("url", publishableNATSURL), zap.Error(err)) return nil, err } if res.js, err = res.nc.JetStream(); err != nil { l.Warn("Failed to initialise JetStream context", zap.Error(err)) } logger.Info("Connected to NATS", zap.String("broker", settings.NATSName), zap.String("url", publishableNATSURL)) return res, nil } func (b *NatsBroker) JetStream() nats.JetStreamContext { if b == nil { return nil } return b.js }