migration to replicaset connection
Some checks failed
ci/woodpecker/push/ledger Pipeline is pending
ci/woodpecker/push/nats Pipeline is pending
ci/woodpecker/push/notification Pipeline is pending
ci/woodpecker/push/payments_orchestrator Pipeline is pending
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/chain_gateway Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline failed
ci/woodpecker/push/bump_version unknown status
ci/woodpecker/push/frontend Pipeline failed

This commit is contained in:
Stephan D
2025-11-24 19:10:07 +01:00
parent cd79355e69
commit 72271cfc9a
11 changed files with 204 additions and 36 deletions

View File

@@ -2,7 +2,10 @@ package mongo
import (
"context"
"fmt"
"net"
"os"
"strings"
"github.com/mitchellh/mapstructure"
"github.com/tech/sendico/pkg/auth"
@@ -28,33 +31,40 @@ import (
"github.com/tech/sendico/pkg/mservice"
mutil "github.com/tech/sendico/pkg/mutil/config"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.uber.org/zap"
)
// Config represents configuration
type Config struct {
Port *string `mapstructure:"port"`
PortEnv *string `mapstructure:"port_env"`
User *string `mapstructure:"user"`
UserEnv *string `mapstructure:"user_env"`
PasswordEnv string `mapstructure:"password_env"`
Database *string `mapstructure:"database"`
DatabaseEnv *string `mapstructure:"database_env"`
Host *string `mapstructure:"host"`
HostEnv *string `mapstructure:"host_env"`
AuthSource *string `mapstructure:"auth_source,omitempty"`
AuthSourceEnv *string `mapstructure:"auth_source_env,omitempty"`
AuthMechanism *string `mapstructure:"auth_mechanism,omitempty"`
AuthMechanismEnv *string `mapstructure:"auth_mechanism_env,omitempty"`
ReplicaSet *string `mapstructure:"replica_set,omitempty"`
ReplicaSetEnv *string `mapstructure:"replica_set_env,omitempty"`
Enforcer *auth.Config `mapstructure:"enforcer"`
Port *string `mapstructure:"port"`
PortEnv *string `mapstructure:"port_env"`
User *string `mapstructure:"user"`
UserEnv *string `mapstructure:"user_env"`
PasswordEnv string `mapstructure:"password_env"`
Database *string `mapstructure:"database"`
DatabaseEnv *string `mapstructure:"database_env"`
Host *string `mapstructure:"host"`
HostEnv *string `mapstructure:"host_env"`
Hosts []string `mapstructure:"hosts,omitempty"`
HostsEnvPrefix *string `mapstructure:"hosts_env_prefix,omitempty"`
HostsEnvPrefixEnv *string `mapstructure:"hosts_env_prefix_env,omitempty"`
PortsEnvPrefix *string `mapstructure:"ports_env_prefix,omitempty"`
PortsEnvPrefixEnv *string `mapstructure:"ports_env_prefix_env,omitempty"`
URI *string `mapstructure:"uri,omitempty"`
URIEnv *string `mapstructure:"uri_env,omitempty"`
AuthSource *string `mapstructure:"auth_source,omitempty"`
AuthSourceEnv *string `mapstructure:"auth_source_env,omitempty"`
AuthMechanism *string `mapstructure:"auth_mechanism,omitempty"`
AuthMechanismEnv *string `mapstructure:"auth_mechanism_env,omitempty"`
ReplicaSet *string `mapstructure:"replica_set,omitempty"`
ReplicaSetEnv *string `mapstructure:"replica_set_env,omitempty"`
Enforcer *auth.Config `mapstructure:"enforcer"`
}
type DBSettings struct {
Host string
Hosts []string
Port string
User string
Password string
@@ -62,6 +72,7 @@ type DBSettings struct {
AuthSource string
AuthMechanism string
ReplicaSet string
URI string
}
func newProtectedDB[T any](
@@ -87,6 +98,19 @@ func Config2DBSettings(logger mlogger.Logger, config *Config) *DBSettings {
p.AuthSource = mutil.GetConfigValue(logger, "auth_source", "auth_source_env", config.AuthSource, config.AuthSourceEnv)
p.AuthMechanism = mutil.GetConfigValue(logger, "auth_mechanism", "auth_mechanism_env", config.AuthMechanism, config.AuthMechanismEnv)
p.ReplicaSet = mutil.GetConfigValue(logger, "replica_set", "replica_set_env", config.ReplicaSet, config.ReplicaSetEnv)
p.URI = mutil.GetConfigValue(logger, "uri", "uri_env", config.URI, config.URIEnv)
hostPrefix := mutil.GetConfigValue(logger, "hosts_env_prefix", "hosts_env_prefix_env", config.HostsEnvPrefix, config.HostsEnvPrefixEnv)
portPrefix := mutil.GetConfigValue(logger, "ports_env_prefix", "ports_env_prefix_env", config.PortsEnvPrefix, config.PortsEnvPrefixEnv)
if hostPrefix == "" && p.ReplicaSet != "" {
hostPrefix = "MONGO_HOSTS_"
}
if portPrefix == "" && p.ReplicaSet != "" {
portPrefix = "MONGO_PORTS_"
}
p.Hosts = collectReplicaHosts(config.Hosts, p.ReplicaSet, p.Port, hostPrefix, portPrefix)
return p
}
@@ -101,21 +125,19 @@ func decodeConfig(logger mlogger.Logger, settings model.SettingsT) (*Config, *DB
}
func dialMongo(logger mlogger.Logger, dbSettings *DBSettings) (*mongo.Client, error) {
cred := options.Credential{
AuthMechanism: dbSettings.AuthMechanism,
AuthSource: dbSettings.AuthSource,
Username: dbSettings.User,
Password: dbSettings.Password,
}
dbURI := buildURI(dbSettings)
opts := buildOptions(dbSettings)
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(dbURI).SetAuth(cred))
client, err := mongo.Connect(context.Background(), opts)
if err != nil {
logger.Error("Unable to connect to database", zap.Error(err))
return nil, err
}
logger.Info("Connected successfully", zap.String("uri", dbURI))
if dbSettings.URI != "" {
logger.Info("Connected successfully", zap.Bool("uri_provided", true))
} else {
logger.Info("Connected successfully", zap.Strings("hosts", opts.Hosts), zap.String("replica_set", dbSettings.ReplicaSet))
}
if err := client.Ping(context.Background(), readpref.Primary()); err != nil {
logger.Error("Unable to ping database", zap.Error(err))
@@ -199,6 +221,70 @@ func (db *DB) TransactionFactory() transaction.Factory {
return transactionimp.CreateFactory(db.client)
}
func collectReplicaHosts(configuredHosts []string, replicaSet, defaultPort, hostPrefix, portPrefix string) []string {
normalize := func(host, port string) string {
host = strings.TrimSpace(host)
port = strings.TrimSpace(port)
if host == "" {
return ""
}
// If host already has a port, keep it; otherwise apply provided/default port.
if _, _, err := net.SplitHostPort(host); err == nil {
return host
}
if port != "" {
return net.JoinHostPort(host, port)
}
if defaultPort != "" {
return net.JoinHostPort(host, defaultPort)
}
return host
}
appendHost := func(list []string, host, port string) []string {
if normalized := normalize(host, port); normalized != "" {
return append(list, normalized)
}
return list
}
var hosts []string
for _, h := range configuredHosts {
hosts = appendHost(hosts, h, "")
}
if replicaSet == "" || hostPrefix == "" {
return hosts
}
index := 0
for {
hostEnv := os.Getenv(fmt.Sprintf("%s%d", hostPrefix, index))
portEnv := ""
if portPrefix != "" {
portEnv = os.Getenv(fmt.Sprintf("%s%d", portPrefix, index))
}
if hostEnv == "" && index == 0 {
hostEnv = os.Getenv(fmt.Sprintf("%s%d", hostPrefix, 1))
if portPrefix != "" {
portEnv = os.Getenv(fmt.Sprintf("%s%d", portPrefix, 1))
}
if hostEnv == "" {
break
}
index = 1
} else if hostEnv == "" {
break
}
hosts = appendHost(hosts, hostEnv, portEnv)
index++
}
return hosts
}
func (db *DB) Permissions() auth.Provider {
return db
}

View File

@@ -1,22 +1,49 @@
package mongo
import (
"net/url"
"net"
"strings"
"go.mongodb.org/mongo-driver/mongo/options"
)
func buildURI(s *DBSettings) string {
u := &url.URL{
Scheme: "mongodb",
Host: s.Host,
Path: "/" + url.PathEscape(s.Database), // /my%20db
func buildOptions(s *DBSettings) *options.ClientOptions {
opts := options.Client()
if s.URI != "" {
return opts.ApplyURI(s.URI)
}
hosts := make([]string, 0, len(s.Hosts)+1)
for _, h := range s.Hosts {
if trimmed := strings.TrimSpace(h); trimmed != "" {
hosts = append(hosts, trimmed)
}
}
if len(hosts) == 0 && s.Host != "" {
host := s.Host
if _, _, err := net.SplitHostPort(host); err != nil && s.Port != "" {
host = net.JoinHostPort(host, s.Port)
}
hosts = append(hosts, host)
}
if len(hosts) > 0 {
opts.SetHosts(hosts)
}
q := url.Values{}
if s.ReplicaSet != "" {
q.Set("replicaSet", s.ReplicaSet)
opts.SetReplicaSet(s.ReplicaSet)
}
u.RawQuery = q.Encode()
cred := options.Credential{
AuthMechanism: s.AuthMechanism,
AuthSource: s.AuthSource,
Username: s.User,
Password: s.Password,
}
opts.SetAuth(cred)
return u.String()
return opts
}