package serverimp import ( "context" "errors" "io" "net" "net/http" "os" "strconv" "strings" "time" "github.com/go-chi/chi/v5" "github.com/tech/sendico/gateway/mntx/internal/appversion" mntxservice "github.com/tech/sendico/gateway/mntx/internal/service/gateway" "github.com/tech/sendico/gateway/mntx/internal/service/monetix" "github.com/tech/sendico/pkg/api/routers" "github.com/tech/sendico/pkg/merrors" msg "github.com/tech/sendico/pkg/messaging" "github.com/tech/sendico/pkg/mlogger" gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1" "github.com/tech/sendico/pkg/server/grpcapp" "go.uber.org/zap" "gopkg.in/yaml.v3" ) type Imp struct { logger mlogger.Logger file string debug bool config *config app *grpcapp.App[struct{}] http *http.Server service *mntxservice.Service } type config struct { *grpcapp.Config `yaml:",inline"` Monetix monetixConfig `yaml:"monetix"` Gateway gatewayConfig `yaml:"gateway"` HTTP httpConfig `yaml:"http"` } type monetixConfig struct { BaseURL string `yaml:"base_url"` BaseURLEnv string `yaml:"base_url_env"` ProjectID int64 `yaml:"project_id"` ProjectIDEnv string `yaml:"project_id_env"` SecretKey string `yaml:"secret_key"` SecretKeyEnv string `yaml:"secret_key_env"` AllowedCurrencies []string `yaml:"allowed_currencies"` RequireCustomerAddress bool `yaml:"require_customer_address"` RequestTimeoutSeconds int `yaml:"request_timeout_seconds"` StatusSuccess string `yaml:"status_success"` StatusProcessing string `yaml:"status_processing"` } type gatewayConfig struct { ID string `yaml:"id"` Network string `yaml:"network"` Currencies []string `yaml:"currencies"` IsEnabled *bool `yaml:"is_enabled"` Limits limitsConfig `yaml:"limits"` } type limitsConfig struct { MinAmount string `yaml:"min_amount"` MaxAmount string `yaml:"max_amount"` PerTxMaxFee string `yaml:"per_tx_max_fee"` PerTxMinAmount string `yaml:"per_tx_min_amount"` PerTxMaxAmount string `yaml:"per_tx_max_amount"` VolumeLimit map[string]string `yaml:"volume_limit"` VelocityLimit map[string]int `yaml:"velocity_limit"` CurrencyLimits map[string]limitsOverrideCfg `yaml:"currency_limits"` } type limitsOverrideCfg struct { MaxVolume string `yaml:"max_volume"` MinAmount string `yaml:"min_amount"` MaxAmount string `yaml:"max_amount"` MaxFee string `yaml:"max_fee"` MaxOps int `yaml:"max_ops"` } type httpConfig struct { Callback callbackConfig `yaml:"callback"` } type callbackConfig struct { Address string `yaml:"address"` Path string `yaml:"path"` AllowedCIDRs []string `yaml:"allowed_cidrs"` MaxBodyBytes int64 `yaml:"max_body_bytes"` } // Create initialises the Monetix gateway server implementation. func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) { return &Imp{ logger: logger.Named("server"), file: file, debug: debug, }, nil } func (i *Imp) Shutdown() { if i.app == nil { return } timeout := 15 * time.Second if i.config != nil && i.config.Runtime != nil { timeout = i.config.Runtime.ShutdownTimeout() } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() if i.service != nil { i.service.Shutdown() } if i.http != nil { _ = i.http.Shutdown(ctx) i.http = nil } i.app.Shutdown(ctx) } func (i *Imp) Start() error { i.logger.Info("Starting Monetix gateway", zap.String("config_file", i.file), zap.Bool("debug", i.debug)) cfg, err := i.loadConfig() if err != nil { return err } i.config = cfg i.logger.Info("Configuration loaded", zap.String("grpc_address", cfg.GRPC.Address), zap.String("metrics_address", cfg.Metrics.Address), ) monetixCfg, err := i.resolveMonetixConfig(cfg.Monetix) if err != nil { i.logger.Error("Failed to resolve Monetix configuration", zap.Error(err)) return err } callbackCfg, err := i.resolveCallbackConfig(cfg.HTTP.Callback) if err != nil { i.logger.Error("Failed to resolve callback configuration", zap.Error(err)) return err } i.logger.Info("Monetix configuration resolved", zap.Bool("base_url_set", strings.TrimSpace(monetixCfg.BaseURL) != ""), zap.Int64("project_id", monetixCfg.ProjectID), zap.Bool("secret_key_set", strings.TrimSpace(monetixCfg.SecretKey) != ""), zap.Int("allowed_currencies", len(monetixCfg.AllowedCurrencies)), zap.Bool("require_customer_address", monetixCfg.RequireCustomerAddress), zap.Duration("request_timeout", monetixCfg.RequestTimeout), zap.String("status_success", monetixCfg.SuccessStatus()), zap.String("status_processing", monetixCfg.ProcessingStatus()), ) gatewayDescriptor := resolveGatewayDescriptor(cfg.Gateway, monetixCfg) if gatewayDescriptor != nil { i.logger.Info("Gateway descriptor resolved", zap.String("id", gatewayDescriptor.GetId()), zap.String("rail", gatewayDescriptor.GetRail().String()), zap.String("network", gatewayDescriptor.GetNetwork()), zap.Int("currencies", len(gatewayDescriptor.GetCurrencies())), zap.Bool("enabled", gatewayDescriptor.GetIsEnabled()), ) } i.logger.Info("Callback configuration resolved", zap.String("address", callbackCfg.Address), zap.String("path", callbackCfg.Path), zap.Int("allowed_cidrs", len(callbackCfg.AllowedCIDRs)), zap.Int64("max_body_bytes", callbackCfg.MaxBodyBytes), ) serviceFactory := func(logger mlogger.Logger, _ struct{}, producer msg.Producer) (grpcapp.Service, error) { invokeURI := "" if cfg.GRPC != nil { invokeURI = cfg.GRPC.DiscoveryInvokeURI() } svc := mntxservice.NewService(logger, mntxservice.WithDiscoveryInvokeURI(invokeURI), mntxservice.WithProducer(producer), mntxservice.WithMonetixConfig(monetixCfg), mntxservice.WithGatewayDescriptor(gatewayDescriptor), mntxservice.WithHTTPClient(&http.Client{Timeout: monetixCfg.Timeout()}), ) i.service = svc if err := i.startHTTPCallbackServer(svc, callbackCfg); err != nil { return nil, err } return svc, nil } app, err := grpcapp.NewApp(i.logger, "monetix", cfg.Config, i.debug, nil, serviceFactory) if err != nil { return err } i.app = app return i.app.Start() } func (i *Imp) loadConfig() (*config, error) { data, err := os.ReadFile(i.file) if err != nil { i.logger.Error("Could not read configuration file", zap.String("config_file", i.file), zap.Error(err)) return nil, err } cfg := &config{ Config: &grpcapp.Config{}, } if err := yaml.Unmarshal(data, cfg); err != nil { i.logger.Error("Failed to parse configuration", zap.Error(err)) return nil, err } if cfg.Runtime == nil { cfg.Runtime = &grpcapp.RuntimeConfig{ShutdownTimeoutSeconds: 15} } if cfg.GRPC == nil { cfg.GRPC = &routers.GRPCConfig{ Network: "tcp", Address: ":50075", EnableReflection: true, EnableHealth: true, } } if cfg.Metrics == nil { cfg.Metrics = &grpcapp.MetricsConfig{Address: ":9405"} } return cfg, nil } func (i *Imp) resolveMonetixConfig(cfg monetixConfig) (monetix.Config, error) { baseURL := strings.TrimSpace(cfg.BaseURL) if env := strings.TrimSpace(cfg.BaseURLEnv); env != "" { if val := strings.TrimSpace(os.Getenv(env)); val != "" { baseURL = val } } projectID := cfg.ProjectID if projectID == 0 && strings.TrimSpace(cfg.ProjectIDEnv) != "" { raw := strings.TrimSpace(os.Getenv(cfg.ProjectIDEnv)) if raw != "" { if id, err := strconv.ParseInt(raw, 10, 64); err == nil { projectID = id } else { return monetix.Config{}, merrors.InvalidArgument("invalid project id in env "+cfg.ProjectIDEnv, "monetix.project_id") } } } secret := strings.TrimSpace(cfg.SecretKey) if env := strings.TrimSpace(cfg.SecretKeyEnv); env != "" { if val := strings.TrimSpace(os.Getenv(env)); val != "" { secret = val } } timeout := time.Duration(cfg.RequestTimeoutSeconds) * time.Second if timeout <= 0 { timeout = 15 * time.Second } statusSuccess := strings.TrimSpace(cfg.StatusSuccess) statusProcessing := strings.TrimSpace(cfg.StatusProcessing) return monetix.Config{ BaseURL: baseURL, ProjectID: projectID, SecretKey: secret, AllowedCurrencies: cfg.AllowedCurrencies, RequireCustomerAddress: cfg.RequireCustomerAddress, RequestTimeout: timeout, StatusSuccess: statusSuccess, StatusProcessing: statusProcessing, }, nil } func resolveGatewayDescriptor(cfg gatewayConfig, monetixCfg monetix.Config) *gatewayv1.GatewayInstanceDescriptor { id := strings.TrimSpace(cfg.ID) if id == "" { id = "monetix" } network := strings.ToUpper(strings.TrimSpace(cfg.Network)) currencies := normalizeCurrencies(cfg.Currencies) if len(currencies) == 0 { currencies = normalizeCurrencies(monetixCfg.AllowedCurrencies) } enabled := true if cfg.IsEnabled != nil { enabled = *cfg.IsEnabled } limits := buildGatewayLimits(cfg.Limits) if limits == nil { limits = &gatewayv1.Limits{MinAmount: "0"} } version := strings.TrimSpace(appversion.Version) return &gatewayv1.GatewayInstanceDescriptor{ Id: id, Rail: gatewayv1.Rail_RAIL_CARD_PAYOUT, Network: network, Currencies: currencies, Capabilities: &gatewayv1.RailCapabilities{ CanPayOut: true, CanPayIn: false, CanReadBalance: false, CanSendFee: false, RequiresObserveConfirm: true, }, Limits: limits, Version: version, IsEnabled: enabled, } } func normalizeCurrencies(values []string) []string { if len(values) == 0 { return nil } seen := map[string]bool{} result := make([]string, 0, len(values)) for _, value := range values { clean := strings.ToUpper(strings.TrimSpace(value)) if clean == "" || seen[clean] { continue } seen[clean] = true result = append(result, clean) } return result } func buildGatewayLimits(cfg limitsConfig) *gatewayv1.Limits { hasValue := strings.TrimSpace(cfg.MinAmount) != "" || strings.TrimSpace(cfg.MaxAmount) != "" || strings.TrimSpace(cfg.PerTxMaxFee) != "" || strings.TrimSpace(cfg.PerTxMinAmount) != "" || strings.TrimSpace(cfg.PerTxMaxAmount) != "" || len(cfg.VolumeLimit) > 0 || len(cfg.VelocityLimit) > 0 || len(cfg.CurrencyLimits) > 0 if !hasValue { return nil } limits := &gatewayv1.Limits{ MinAmount: strings.TrimSpace(cfg.MinAmount), MaxAmount: strings.TrimSpace(cfg.MaxAmount), PerTxMaxFee: strings.TrimSpace(cfg.PerTxMaxFee), PerTxMinAmount: strings.TrimSpace(cfg.PerTxMinAmount), PerTxMaxAmount: strings.TrimSpace(cfg.PerTxMaxAmount), } if len(cfg.VolumeLimit) > 0 { limits.VolumeLimit = map[string]string{} for key, value := range cfg.VolumeLimit { bucket := strings.TrimSpace(key) amount := strings.TrimSpace(value) if bucket == "" || amount == "" { continue } limits.VolumeLimit[bucket] = amount } } if len(cfg.VelocityLimit) > 0 { limits.VelocityLimit = map[string]int32{} for key, value := range cfg.VelocityLimit { bucket := strings.TrimSpace(key) if bucket == "" { continue } limits.VelocityLimit[bucket] = int32(value) } } if len(cfg.CurrencyLimits) > 0 { limits.CurrencyLimits = map[string]*gatewayv1.LimitsOverride{} for key, override := range cfg.CurrencyLimits { currency := strings.ToUpper(strings.TrimSpace(key)) if currency == "" { continue } limits.CurrencyLimits[currency] = &gatewayv1.LimitsOverride{ MaxVolume: strings.TrimSpace(override.MaxVolume), MinAmount: strings.TrimSpace(override.MinAmount), MaxAmount: strings.TrimSpace(override.MaxAmount), MaxFee: strings.TrimSpace(override.MaxFee), MaxOps: int32(override.MaxOps), } } } return limits } type callbackRuntimeConfig struct { Address string Path string AllowedCIDRs []*net.IPNet MaxBodyBytes int64 } func (i *Imp) resolveCallbackConfig(cfg callbackConfig) (callbackRuntimeConfig, error) { addr := strings.TrimSpace(cfg.Address) if addr == "" { addr = ":8084" } path := strings.TrimSpace(cfg.Path) if path == "" { path = "/monetix/callback" } maxBody := cfg.MaxBodyBytes if maxBody <= 0 { maxBody = 1 << 20 // 1MB } var cidrs []*net.IPNet for _, raw := range cfg.AllowedCIDRs { clean := strings.TrimSpace(raw) if clean == "" { continue } _, block, err := net.ParseCIDR(clean) if err != nil { i.logger.Warn("Invalid callback allowlist CIDR skipped", zap.String("cidr", clean), zap.Error(err)) continue } cidrs = append(cidrs, block) } return callbackRuntimeConfig{ Address: addr, Path: path, AllowedCIDRs: cidrs, MaxBodyBytes: maxBody, }, nil } func (i *Imp) startHTTPCallbackServer(svc *mntxservice.Service, cfg callbackRuntimeConfig) error { if svc == nil { return merrors.InvalidArgument("nil service provided for callback server") } if strings.TrimSpace(cfg.Address) == "" { i.logger.Info("Monetix callback server disabled: address is empty") return nil } router := chi.NewRouter() router.Post(cfg.Path, func(w http.ResponseWriter, r *http.Request) { log := i.logger.Named("callback_http") log.Debug("Callback request received", zap.String("remote_addr", strings.TrimSpace(r.RemoteAddr)), zap.String("path", r.URL.Path), zap.String("method", r.Method), ) if len(cfg.AllowedCIDRs) > 0 && !clientAllowed(r, cfg.AllowedCIDRs) { ip := clientIPFromRequest(r) remoteIP := "" if ip != nil { remoteIP = ip.String() } log.Warn("Callback rejected by CIDR allowlist", zap.String("remote_ip", remoteIP)) http.Error(w, "forbidden", http.StatusForbidden) return } body, err := io.ReadAll(io.LimitReader(r.Body, cfg.MaxBodyBytes)) if err != nil { log.Warn("Callback body read failed", zap.Error(err)) http.Error(w, "failed to read body", http.StatusBadRequest) return } status, err := svc.ProcessMonetixCallback(r.Context(), body) if err != nil { log.Warn("Callback processing failed", zap.Error(err), zap.Int("status", status)) http.Error(w, err.Error(), status) return } log.Debug("Callback processed", zap.Int("status", status)) w.WriteHeader(status) }) server := &http.Server{ Addr: cfg.Address, Handler: router, } ln, err := net.Listen("tcp", cfg.Address) if err != nil { return err } i.http = server go func() { if err := server.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { i.logger.Warn("Monetix callback server stopped with error", zap.Error(err)) } }() i.logger.Info("Monetix callback server listening", zap.String("address", cfg.Address), zap.String("path", cfg.Path)) return nil } func clientAllowed(r *http.Request, cidrs []*net.IPNet) bool { if len(cidrs) == 0 { return true } host := clientIPFromRequest(r) if host == nil { return false } for _, block := range cidrs { if block.Contains(host) { return true } } return false } func clientIPFromRequest(r *http.Request) net.IP { if r == nil { return nil } if xfwd := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); xfwd != "" { parts := strings.Split(xfwd, ",") if len(parts) > 0 { if ip := net.ParseIP(strings.TrimSpace(parts[0])); ip != nil { return ip } } } host, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { return nil } return net.ParseIP(host) }