Files
sendico/api/gateway/mntx/internal/server/internal/serverimp.go
2025-12-05 09:37:51 +01:00

347 lines
8.3 KiB
Go

package serverimp
import (
"context"
"errors"
"io"
"net"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
mntxservice "github.com/tech/sendico/gateway/mntx/internal/service/gateway"
"github.com/tech/sendico/gateway/mntx/internal/service/monetix"
"github.com/tech/sendico/pkg/api/routers"
"github.com/tech/sendico/pkg/merrors"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/server/grpcapp"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
)
type Imp struct {
logger mlogger.Logger
file string
debug bool
config *config
app *grpcapp.App[struct{}]
http *http.Server
}
type config struct {
*grpcapp.Config `yaml:",inline"`
Monetix monetixConfig `yaml:"monetix"`
HTTP httpConfig `yaml:"http"`
}
type monetixConfig struct {
BaseURL string `yaml:"base_url"`
BaseURLEnv string `yaml:"base_url_env"`
ProjectID int64 `yaml:"project_id"`
ProjectIDEnv string `yaml:"project_id_env"`
SecretKey string `yaml:"secret_key"`
SecretKeyEnv string `yaml:"secret_key_env"`
AllowedCurrencies []string `yaml:"allowed_currencies"`
RequireCustomerAddress bool `yaml:"require_customer_address"`
RequestTimeoutSeconds int `yaml:"request_timeout_seconds"`
StatusSuccess string `yaml:"status_success"`
StatusProcessing string `yaml:"status_processing"`
}
type httpConfig struct {
Callback callbackConfig `yaml:"callback"`
}
type callbackConfig struct {
Address string `yaml:"address"`
Path string `yaml:"path"`
AllowedCIDRs []string `yaml:"allowed_cidrs"`
MaxBodyBytes int64 `yaml:"max_body_bytes"`
}
// Create initialises the Monetix gateway server implementation.
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
return &Imp{
logger: logger.Named("server"),
file: file,
debug: debug,
}, nil
}
func (i *Imp) Shutdown() {
if i.app == nil {
return
}
timeout := 15 * time.Second
if i.config != nil && i.config.Runtime != nil {
timeout = i.config.Runtime.ShutdownTimeout()
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if i.http != nil {
_ = i.http.Shutdown(ctx)
i.http = nil
}
i.app.Shutdown(ctx)
}
func (i *Imp) Start() error {
cfg, err := i.loadConfig()
if err != nil {
return err
}
i.config = cfg
monetixCfg, err := i.resolveMonetixConfig(cfg.Monetix)
if err != nil {
return err
}
callbackCfg, err := i.resolveCallbackConfig(cfg.HTTP.Callback)
if err != nil {
return err
}
serviceFactory := func(logger mlogger.Logger, _ struct{}, producer msg.Producer) (grpcapp.Service, error) {
svc := mntxservice.NewService(logger,
mntxservice.WithProducer(producer),
mntxservice.WithMonetixConfig(monetixCfg),
mntxservice.WithHTTPClient(&http.Client{Timeout: monetixCfg.Timeout()}),
)
if err := i.startHTTPCallbackServer(svc, callbackCfg); err != nil {
return nil, err
}
return svc, nil
}
app, err := grpcapp.NewApp(i.logger, "monetix", cfg.Config, i.debug, nil, serviceFactory)
if err != nil {
return err
}
i.app = app
return i.app.Start()
}
func (i *Imp) loadConfig() (*config, error) {
data, err := os.ReadFile(i.file)
if err != nil {
i.logger.Error("could not read configuration file", zap.String("config_file", i.file), zap.Error(err))
return nil, err
}
cfg := &config{
Config: &grpcapp.Config{},
}
if err := yaml.Unmarshal(data, cfg); err != nil {
i.logger.Error("failed to parse configuration", zap.Error(err))
return nil, err
}
if cfg.Runtime == nil {
cfg.Runtime = &grpcapp.RuntimeConfig{ShutdownTimeoutSeconds: 15}
}
if cfg.GRPC == nil {
cfg.GRPC = &routers.GRPCConfig{
Network: "tcp",
Address: ":50075",
EnableReflection: true,
EnableHealth: true,
}
}
if cfg.Metrics == nil {
cfg.Metrics = &grpcapp.MetricsConfig{Address: ":9405"}
}
return cfg, nil
}
func (i *Imp) resolveMonetixConfig(cfg monetixConfig) (monetix.Config, error) {
baseURL := strings.TrimSpace(cfg.BaseURL)
if env := strings.TrimSpace(cfg.BaseURLEnv); env != "" {
if val := strings.TrimSpace(os.Getenv(env)); val != "" {
baseURL = val
}
}
projectID := cfg.ProjectID
if projectID == 0 && strings.TrimSpace(cfg.ProjectIDEnv) != "" {
raw := strings.TrimSpace(os.Getenv(cfg.ProjectIDEnv))
if raw != "" {
if id, err := strconv.ParseInt(raw, 10, 64); err == nil {
projectID = id
} else {
return monetix.Config{}, merrors.InvalidArgument("invalid project id in env "+cfg.ProjectIDEnv, "monetix.project_id")
}
}
}
secret := strings.TrimSpace(cfg.SecretKey)
if env := strings.TrimSpace(cfg.SecretKeyEnv); env != "" {
if val := strings.TrimSpace(os.Getenv(env)); val != "" {
secret = val
}
}
timeout := time.Duration(cfg.RequestTimeoutSeconds) * time.Second
if timeout <= 0 {
timeout = 15 * time.Second
}
statusSuccess := strings.TrimSpace(cfg.StatusSuccess)
statusProcessing := strings.TrimSpace(cfg.StatusProcessing)
return monetix.Config{
BaseURL: baseURL,
ProjectID: projectID,
SecretKey: secret,
AllowedCurrencies: cfg.AllowedCurrencies,
RequireCustomerAddress: cfg.RequireCustomerAddress,
RequestTimeout: timeout,
StatusSuccess: statusSuccess,
StatusProcessing: statusProcessing,
}, nil
}
type callbackRuntimeConfig struct {
Address string
Path string
AllowedCIDRs []*net.IPNet
MaxBodyBytes int64
}
func (i *Imp) resolveCallbackConfig(cfg callbackConfig) (callbackRuntimeConfig, error) {
addr := strings.TrimSpace(cfg.Address)
if addr == "" {
addr = ":8084"
}
path := strings.TrimSpace(cfg.Path)
if path == "" {
path = "/monetix/callback"
}
maxBody := cfg.MaxBodyBytes
if maxBody <= 0 {
maxBody = 1 << 20 // 1MB
}
var cidrs []*net.IPNet
for _, raw := range cfg.AllowedCIDRs {
clean := strings.TrimSpace(raw)
if clean == "" {
continue
}
_, block, err := net.ParseCIDR(clean)
if err != nil {
i.logger.Warn("invalid callback allowlist CIDR skipped", zap.String("cidr", clean), zap.Error(err))
continue
}
cidrs = append(cidrs, block)
}
return callbackRuntimeConfig{
Address: addr,
Path: path,
AllowedCIDRs: cidrs,
MaxBodyBytes: maxBody,
}, nil
}
func (i *Imp) startHTTPCallbackServer(svc *mntxservice.Service, cfg callbackRuntimeConfig) error {
if svc == nil {
return errors.New("nil service provided for callback server")
}
if strings.TrimSpace(cfg.Address) == "" {
i.logger.Info("Monetix callback server disabled: address is empty")
return nil
}
router := chi.NewRouter()
router.Post(cfg.Path, func(w http.ResponseWriter, r *http.Request) {
if len(cfg.AllowedCIDRs) > 0 && !clientAllowed(r, cfg.AllowedCIDRs) {
http.Error(w, "forbidden", http.StatusForbidden)
return
}
body, err := io.ReadAll(io.LimitReader(r.Body, cfg.MaxBodyBytes))
if err != nil {
http.Error(w, "failed to read body", http.StatusBadRequest)
return
}
status, err := svc.ProcessMonetixCallback(r.Context(), body)
if err != nil {
http.Error(w, err.Error(), status)
return
}
w.WriteHeader(status)
})
server := &http.Server{
Addr: cfg.Address,
Handler: router,
}
ln, err := net.Listen("tcp", cfg.Address)
if err != nil {
return err
}
i.http = server
go func() {
if err := server.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
i.logger.Error("Monetix callback server stopped with error", zap.Error(err))
}
}()
i.logger.Info("Monetix callback server listening", zap.String("address", cfg.Address), zap.String("path", cfg.Path))
return nil
}
func clientAllowed(r *http.Request, cidrs []*net.IPNet) bool {
if len(cidrs) == 0 {
return true
}
host := clientIPFromRequest(r)
if host == nil {
return false
}
for _, block := range cidrs {
if block.Contains(host) {
return true
}
}
return false
}
func clientIPFromRequest(r *http.Request) net.IP {
if r == nil {
return nil
}
if xfwd := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); xfwd != "" {
parts := strings.Split(xfwd, ",")
if len(parts) > 0 {
if ip := net.ParseIP(strings.TrimSpace(parts[0])); ip != nil {
return ip
}
}
}
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return nil
}
return net.ParseIP(host)
}