383 lines
12 KiB
Go
Executable File
383 lines
12 KiB
Go
Executable File
package mongo
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/mitchellh/mapstructure"
|
|
"github.com/tech/sendico/pkg/auth"
|
|
"github.com/tech/sendico/pkg/db/account"
|
|
"github.com/tech/sendico/pkg/db/chainassets"
|
|
"github.com/tech/sendico/pkg/db/confirmation"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/accountdb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/chainassetsdb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/confirmationdb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/invitationdb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/organizationdb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/paymethoddb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/policiesdb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/recipientdb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/refreshtokensdb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/rolesdb"
|
|
"github.com/tech/sendico/pkg/db/internal/mongo/transactionimp"
|
|
"github.com/tech/sendico/pkg/db/invitation"
|
|
"github.com/tech/sendico/pkg/db/organization"
|
|
"github.com/tech/sendico/pkg/db/paymethod"
|
|
"github.com/tech/sendico/pkg/db/policy"
|
|
"github.com/tech/sendico/pkg/db/recipient"
|
|
"github.com/tech/sendico/pkg/db/refreshtokens"
|
|
"github.com/tech/sendico/pkg/db/repository"
|
|
"github.com/tech/sendico/pkg/db/role"
|
|
"github.com/tech/sendico/pkg/db/transaction"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/model"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
mutil "github.com/tech/sendico/pkg/mutil/config"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/readpref"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Config represents configuration
|
|
type Config struct {
|
|
Port *string `mapstructure:"port"`
|
|
PortEnv *string `mapstructure:"port_env"`
|
|
User *string `mapstructure:"user"`
|
|
UserEnv *string `mapstructure:"user_env"`
|
|
PasswordEnv string `mapstructure:"password_env"`
|
|
Database *string `mapstructure:"database"`
|
|
DatabaseEnv *string `mapstructure:"database_env"`
|
|
Host *string `mapstructure:"host"`
|
|
HostEnv *string `mapstructure:"host_env"`
|
|
Hosts []string `mapstructure:"hosts,omitempty"`
|
|
HostsEnvPrefix *string `mapstructure:"hosts_env_prefix,omitempty"`
|
|
HostsEnvPrefixEnv *string `mapstructure:"hosts_env_prefix_env,omitempty"`
|
|
PortsEnvPrefix *string `mapstructure:"ports_env_prefix,omitempty"`
|
|
PortsEnvPrefixEnv *string `mapstructure:"ports_env_prefix_env,omitempty"`
|
|
URI *string `mapstructure:"uri,omitempty"`
|
|
URIEnv *string `mapstructure:"uri_env,omitempty"`
|
|
AuthSource *string `mapstructure:"auth_source,omitempty"`
|
|
AuthSourceEnv *string `mapstructure:"auth_source_env,omitempty"`
|
|
AuthMechanism *string `mapstructure:"auth_mechanism,omitempty"`
|
|
AuthMechanismEnv *string `mapstructure:"auth_mechanism_env,omitempty"`
|
|
ReplicaSet *string `mapstructure:"replica_set,omitempty"`
|
|
ReplicaSetEnv *string `mapstructure:"replica_set_env,omitempty"`
|
|
Enforcer *auth.Config `mapstructure:"enforcer"`
|
|
}
|
|
|
|
type DBSettings struct {
|
|
Host string
|
|
Hosts []string
|
|
Port string
|
|
User string
|
|
Password string
|
|
Database string
|
|
AuthSource string
|
|
AuthMechanism string
|
|
ReplicaSet string
|
|
URI string
|
|
}
|
|
|
|
func newProtectedDB[T any](
|
|
db *DB,
|
|
create func(ctx context.Context, logger mlogger.Logger, enforcer auth.Enforcer, pdb policy.DB, client *mongo.Database) (T, error),
|
|
) (T, error) {
|
|
pdb, err := db.NewPoliciesDB()
|
|
if err != nil {
|
|
db.logger.Warn("Failed to create policies database", zap.Error(err))
|
|
var zero T
|
|
return zero, err
|
|
}
|
|
return create(context.Background(), db.logger, db.Enforcer(), pdb, db.db())
|
|
}
|
|
|
|
func Config2DBSettings(logger mlogger.Logger, config *Config) *DBSettings {
|
|
p := new(DBSettings)
|
|
p.Port = mutil.GetConfigValue(logger, "port", "port_env", config.Port, config.PortEnv)
|
|
p.Database = mutil.GetConfigValue(logger, "database", "database_env", config.Database, config.DatabaseEnv)
|
|
p.Password = os.Getenv(config.PasswordEnv)
|
|
p.User = mutil.GetConfigValue(logger, "user", "user_env", config.User, config.UserEnv)
|
|
p.Host = mutil.GetConfigValue(logger, "host", "host_env", config.Host, config.HostEnv)
|
|
p.AuthSource = mutil.GetConfigValue(logger, "auth_source", "auth_source_env", config.AuthSource, config.AuthSourceEnv)
|
|
p.AuthMechanism = mutil.GetConfigValue(logger, "auth_mechanism", "auth_mechanism_env", config.AuthMechanism, config.AuthMechanismEnv)
|
|
p.ReplicaSet = mutil.GetConfigValue(logger, "replica_set", "replica_set_env", config.ReplicaSet, config.ReplicaSetEnv)
|
|
p.URI = mutil.GetConfigValue(logger, "uri", "uri_env", config.URI, config.URIEnv)
|
|
|
|
hostPrefix := mutil.GetConfigValue(logger, "hosts_env_prefix", "hosts_env_prefix_env", config.HostsEnvPrefix, config.HostsEnvPrefixEnv)
|
|
portPrefix := mutil.GetConfigValue(logger, "ports_env_prefix", "ports_env_prefix_env", config.PortsEnvPrefix, config.PortsEnvPrefixEnv)
|
|
|
|
if hostPrefix == "" && p.ReplicaSet != "" {
|
|
hostPrefix = "MONGO_HOSTS_"
|
|
}
|
|
if portPrefix == "" && p.ReplicaSet != "" {
|
|
portPrefix = "MONGO_PORTS_"
|
|
}
|
|
|
|
p.Hosts = collectReplicaHosts(config.Hosts, p.ReplicaSet, p.Port, hostPrefix, portPrefix)
|
|
return p
|
|
}
|
|
|
|
func decodeConfig(logger mlogger.Logger, settings model.SettingsT) (*Config, *DBSettings, error) {
|
|
var config Config
|
|
if err := mapstructure.Decode(settings, &config); err != nil {
|
|
logger.Warn("Failed to decode settings", zap.Error(err), zap.Any("settings", settings))
|
|
return nil, nil, err
|
|
}
|
|
dbSettings := Config2DBSettings(logger, &config)
|
|
return &config, dbSettings, nil
|
|
}
|
|
|
|
func dialMongo(logger mlogger.Logger, dbSettings *DBSettings) (*mongo.Client, error) {
|
|
opts := buildOptions(dbSettings)
|
|
|
|
client, err := mongo.Connect(context.Background(), opts)
|
|
if err != nil {
|
|
logger.Error("Unable to connect to database", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
if dbSettings.URI != "" {
|
|
logger.Info("Connected successfully", zap.Bool("uri_provided", true))
|
|
} else {
|
|
logger.Info("Connected successfully", zap.Strings("hosts", opts.Hosts), zap.String("replica_set", dbSettings.ReplicaSet))
|
|
}
|
|
|
|
if err := client.Ping(context.Background(), readpref.Primary()); err != nil {
|
|
logger.Error("Unable to ping database", zap.Error(err))
|
|
_ = client.Disconnect(context.Background())
|
|
return nil, err
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
func ConnectClient(logger mlogger.Logger, settings model.SettingsT) (*mongo.Client, *Config, *DBSettings, error) {
|
|
config, dbSettings, err := decodeConfig(logger, settings)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
client, err := dialMongo(logger, dbSettings)
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
return client, config, dbSettings, nil
|
|
}
|
|
|
|
// DB represents the structure of the database
|
|
type DB struct {
|
|
logger mlogger.Logger
|
|
config *DBSettings
|
|
client *mongo.Client
|
|
enforcer auth.Enforcer
|
|
manager auth.Manager
|
|
pdb policy.DB
|
|
}
|
|
|
|
func (db *DB) db() *mongo.Database {
|
|
return db.client.Database(db.config.Database)
|
|
}
|
|
|
|
func (db *DB) NewAccountDB() (account.DB, error) {
|
|
return accountdb.Create(db.logger, db.db())
|
|
}
|
|
|
|
func (db *DB) NewConfirmationsDB() (confirmation.DB, error) {
|
|
return confirmationdb.Create(db.logger, db.db())
|
|
}
|
|
|
|
func (db *DB) NewOrganizationDB() (organization.DB, error) {
|
|
pdb, err := db.NewPoliciesDB()
|
|
if err != nil {
|
|
db.logger.Warn("Failed to create policies database", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
organizationDB, err := organizationdb.Create(context.Background(), db.logger, db.Enforcer(), pdb, db.db())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Return the concrete type - interface mismatch will be handled at runtime
|
|
// TODO: Update organization.DB interface to match implementation signatures
|
|
return organizationDB, nil
|
|
}
|
|
|
|
func (db *DB) NewRecipientsDB() (recipient.DB, error) {
|
|
pmdb, err := db.NewPaymentMethodsDB()
|
|
if err != nil {
|
|
db.logger.Warn("Failed to create payment methods database", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
create := func(ctx context.Context,
|
|
logger mlogger.Logger,
|
|
enforcer auth.Enforcer,
|
|
pdb policy.DB,
|
|
db *mongo.Database,
|
|
) (recipient.DB, error) {
|
|
return recipientdb.Create(ctx, logger, enforcer, pdb, pmdb, db)
|
|
}
|
|
|
|
return newProtectedDB(db, create)
|
|
}
|
|
|
|
func (db *DB) NewPaymentMethodsDB() (paymethod.DB, error) {
|
|
return newProtectedDB(db, paymethoddb.Create)
|
|
}
|
|
|
|
func (db *DB) NewRefreshTokensDB() (refreshtokens.DB, error) {
|
|
return refreshtokensdb.Create(db.logger, db.db())
|
|
}
|
|
|
|
func (db *DB) NewInvitationsDB() (invitation.DB, error) {
|
|
return newProtectedDB(db, invitationdb.Create)
|
|
}
|
|
|
|
func (db *DB) NewPoliciesDB() (policy.DB, error) {
|
|
return db.pdb, nil
|
|
}
|
|
|
|
func (db *DB) NewRolesDB() (role.DB, error) {
|
|
return rolesdb.Create(db.logger, db.db())
|
|
}
|
|
|
|
func (db *DB) TransactionFactory() transaction.Factory {
|
|
return transactionimp.CreateFactory(db.client)
|
|
}
|
|
|
|
func collectReplicaHosts(configuredHosts []string, replicaSet, defaultPort, hostPrefix, portPrefix string) []string {
|
|
normalize := func(host, port string) string {
|
|
host = strings.TrimSpace(host)
|
|
port = strings.TrimSpace(port)
|
|
if host == "" {
|
|
return ""
|
|
}
|
|
// If host already has a port, keep it; otherwise apply provided/default port.
|
|
if _, _, err := net.SplitHostPort(host); err == nil {
|
|
return host
|
|
}
|
|
if port != "" {
|
|
return net.JoinHostPort(host, port)
|
|
}
|
|
if defaultPort != "" {
|
|
return net.JoinHostPort(host, defaultPort)
|
|
}
|
|
return host
|
|
}
|
|
|
|
appendHost := func(list []string, host, port string) []string {
|
|
if normalized := normalize(host, port); normalized != "" {
|
|
return append(list, normalized)
|
|
}
|
|
return list
|
|
}
|
|
|
|
var hosts []string
|
|
for _, h := range configuredHosts {
|
|
hosts = appendHost(hosts, h, "")
|
|
}
|
|
|
|
if replicaSet == "" || hostPrefix == "" {
|
|
return hosts
|
|
}
|
|
|
|
index := 0
|
|
for {
|
|
hostEnv := os.Getenv(fmt.Sprintf("%s%d", hostPrefix, index))
|
|
portEnv := ""
|
|
if portPrefix != "" {
|
|
portEnv = os.Getenv(fmt.Sprintf("%s%d", portPrefix, index))
|
|
}
|
|
|
|
if hostEnv == "" && index == 0 {
|
|
hostEnv = os.Getenv(fmt.Sprintf("%s%d", hostPrefix, 1))
|
|
if portPrefix != "" {
|
|
portEnv = os.Getenv(fmt.Sprintf("%s%d", portPrefix, 1))
|
|
}
|
|
if hostEnv == "" {
|
|
break
|
|
}
|
|
index = 1
|
|
} else if hostEnv == "" {
|
|
break
|
|
}
|
|
|
|
hosts = appendHost(hosts, hostEnv, portEnv)
|
|
index++
|
|
}
|
|
|
|
return hosts
|
|
}
|
|
|
|
func (db *DB) NewChainAsstesDB() (chainassets.DB, error) {
|
|
return chainassetsdb.Create(db.logger, db.db())
|
|
}
|
|
|
|
func (db *DB) Permissions() auth.Provider {
|
|
return db
|
|
}
|
|
|
|
func (db *DB) Manager() auth.Manager {
|
|
return db.manager
|
|
}
|
|
|
|
func (db *DB) Enforcer() auth.Enforcer {
|
|
return db.enforcer
|
|
}
|
|
|
|
func (db *DB) GetPolicyDescription(ctx context.Context, resource mservice.Type) (*model.PolicyDescription, error) {
|
|
var policyDescription model.PolicyDescription
|
|
return &policyDescription, db.pdb.FindOne(ctx, repository.Filter("resourceTypes", resource), &policyDescription)
|
|
}
|
|
|
|
func (db *DB) CloseConnection() {
|
|
if err := db.client.Disconnect(context.Background()); err != nil {
|
|
db.logger.Warn("Failed to close connection", zap.Error(err))
|
|
}
|
|
db.logger.Info("Database connection closed")
|
|
}
|
|
|
|
// NewConnection creates a new database connection
|
|
func NewConnection(logger mlogger.Logger, settings model.SettingsT) (*DB, error) {
|
|
client, config, dbSettings, err := ConnectClient(logger, settings)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
db := &DB{
|
|
logger: logger.Named("db"),
|
|
config: dbSettings,
|
|
client: client,
|
|
}
|
|
|
|
cleanup := func(ctx context.Context) {
|
|
if err := client.Disconnect(ctx); err != nil {
|
|
logger.Warn("Failed to close MongoDB connection", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
rdb, err := db.NewRolesDB()
|
|
if err != nil {
|
|
db.logger.Warn("Failed to create roles database", zap.Error(err))
|
|
cleanup(context.Background())
|
|
return nil, err
|
|
}
|
|
if db.pdb, err = policiesdb.Create(db.logger, db.db()); err != nil {
|
|
db.logger.Warn("Failed to create policies database", zap.Error(err))
|
|
cleanup(context.Background())
|
|
return nil, err
|
|
}
|
|
if db.enforcer, db.manager, err = auth.CreateAuth(logger, db.client, db.db(), db.pdb, rdb, config.Enforcer); err != nil {
|
|
db.logger.Warn("Failed to create permissions enforcer", zap.Error(err))
|
|
cleanup(context.Background())
|
|
return nil, err
|
|
}
|
|
|
|
return db, nil
|
|
}
|