Some checks failed
ci/woodpecker/push/billing_fees Pipeline was successful
ci/woodpecker/push/bff Pipeline was successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/chain_gateway Pipeline was successful
ci/woodpecker/push/fx_ingestor Pipeline was successful
ci/woodpecker/push/fx_oracle Pipeline was successful
ci/woodpecker/push/frontend Pipeline was successful
ci/woodpecker/push/payments_orchestrator Pipeline was successful
ci/woodpecker/push/bump_version Pipeline failed
ci/woodpecker/push/nats Pipeline was successful
ci/woodpecker/push/ledger Pipeline was successful
ci/woodpecker/push/notification Pipeline was successful
274 lines
7.7 KiB
Go
274 lines
7.7 KiB
Go
package grpcapp
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/tech/sendico/pkg/api/routers"
|
|
"github.com/tech/sendico/pkg/api/routers/health"
|
|
"github.com/tech/sendico/pkg/db"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
msg "github.com/tech/sendico/pkg/messaging"
|
|
mb "github.com/tech/sendico/pkg/messaging/broker"
|
|
msgproducer "github.com/tech/sendico/pkg/messaging/producer"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type Service interface {
|
|
Register(routers.GRPC) error
|
|
}
|
|
|
|
type RepositoryFactory[T any] func(logger mlogger.Logger, conn *db.MongoConnection) (T, error)
|
|
type ServiceFactory[T any] func(logger mlogger.Logger, repo T, producer msg.Producer) (Service, error)
|
|
type ProducerFactory func(logger mlogger.Logger, broker mb.Broker) msg.Producer
|
|
|
|
type App[T any] struct {
|
|
name string
|
|
logger mlogger.Logger
|
|
config *Config
|
|
debug bool
|
|
repoFactory RepositoryFactory[T]
|
|
serviceFactory ServiceFactory[T]
|
|
producerFactory ProducerFactory
|
|
metricsCfg *MetricsConfig
|
|
|
|
grpc routers.GRPC
|
|
mongoConn *db.MongoConnection
|
|
producer msg.Producer
|
|
metricsSrv *http.Server
|
|
health routers.Health
|
|
|
|
runCtx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
cleanupOnce sync.Once
|
|
}
|
|
|
|
func NewApp[T any](logger mlogger.Logger, name string, config *Config, debug bool, repoFactory RepositoryFactory[T], serviceFactory ServiceFactory[T], opts ...Option[T]) (*App[T], error) {
|
|
if logger == nil {
|
|
return nil, merrors.InvalidArgument("nil logger supplied", "logger")
|
|
}
|
|
if config == nil {
|
|
return nil, merrors.InvalidArgument("nil config supplied", "config")
|
|
}
|
|
if serviceFactory == nil {
|
|
return nil, merrors.InvalidArgument("nil service factory supplied", "serviceFactory")
|
|
}
|
|
|
|
app := &App[T]{
|
|
name: name,
|
|
logger: logger.Named(name),
|
|
config: config,
|
|
debug: debug,
|
|
repoFactory: repoFactory,
|
|
serviceFactory: serviceFactory,
|
|
producerFactory: func(l mlogger.Logger, broker mb.Broker) msg.Producer {
|
|
if broker == nil {
|
|
return nil
|
|
}
|
|
return msgproducer.NewProducer(l, broker)
|
|
},
|
|
metricsCfg: config.Metrics,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(app)
|
|
}
|
|
|
|
return app, nil
|
|
}
|
|
|
|
type Option[T any] func(*App[T])
|
|
|
|
func WithProducerFactory[T any](factory ProducerFactory) Option[T] {
|
|
return func(app *App[T]) {
|
|
if factory != nil {
|
|
app.producerFactory = factory
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *App[T]) Start() error {
|
|
var err error
|
|
|
|
a.logger.Debug("Initialising gRPC application components")
|
|
|
|
var repo T
|
|
if a.repoFactory != nil && a.config.Database != nil {
|
|
a.mongoConn, err = db.ConnectMongo(a.logger, a.config.Database)
|
|
if err != nil {
|
|
a.logger.Error("Failed to connect to MongoDB", zap.Error(err))
|
|
return err
|
|
}
|
|
repo, err = a.repoFactory(a.logger, a.mongoConn)
|
|
if err != nil {
|
|
a.logger.Error("Failed to initialise repository", zap.Error(err))
|
|
return err
|
|
}
|
|
if dbName := a.mongoConn.Database().Name(); dbName != "" {
|
|
a.logger.Info("MongoDB connection ready", zap.String("database", dbName))
|
|
} else {
|
|
a.logger.Info("MongoDB connection ready")
|
|
}
|
|
} else if a.repoFactory != nil && a.config.Database == nil {
|
|
a.logger.Warn("Repository factory provided but database configuration missing; repository will be zero value")
|
|
}
|
|
|
|
var producer msg.Producer
|
|
if a.config.Messaging != nil && a.config.Messaging.Driver != "" {
|
|
broker, err := msg.CreateMessagingBroker(a.logger, a.config.Messaging)
|
|
if err != nil {
|
|
a.logger.Warn("Failed to initialise messaging broker", zap.Error(err))
|
|
} else {
|
|
a.logger.Info("Messaging broker initialised", zap.String("driver", string(a.config.Messaging.Driver)))
|
|
producer = a.producerFactory(a.logger, broker)
|
|
}
|
|
} else {
|
|
a.logger.Info("Messaging configuration not provided; streaming disabled")
|
|
}
|
|
if producer != nil {
|
|
a.logger.Debug("Messaging producer configured")
|
|
}
|
|
a.producer = producer
|
|
|
|
service, err := a.serviceFactory(a.logger, repo, producer)
|
|
if err != nil {
|
|
a.logger.Error("Failed to create gRPC service", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
if addr := a.metricsAddr(); addr != "" {
|
|
a.logger.Debug("Preparing metrics server", zap.String("address", addr))
|
|
router := chi.NewRouter()
|
|
router.Handle("/metrics", promhttp.Handler())
|
|
|
|
if hr, err := routers.NewHealthRouter(a.logger, router, ""); err != nil {
|
|
a.logger.Warn("Failed to initialise health router", zap.Error(err))
|
|
} else {
|
|
hr.SetStatus(health.SSStarting)
|
|
a.health = hr
|
|
}
|
|
|
|
a.metricsSrv = &http.Server{
|
|
Addr: addr,
|
|
Handler: router,
|
|
}
|
|
go func() {
|
|
a.logger.Info("Prometheus metrics server starting", zap.String("address", addr))
|
|
if err := a.metricsSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
a.logger.Error("Prometheus metrics server failed", zap.Error(err))
|
|
if a.health != nil {
|
|
a.health.SetStatus(health.SSTerminating)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
a.logger.Debug("Creating gRPC router")
|
|
a.grpc, err = routers.NewGRPCRouter(a.logger, a.config.GRPC)
|
|
if err != nil {
|
|
a.logger.Error("Failed to initialise gRPC router", zap.Error(err))
|
|
a.cleanup(context.Background())
|
|
return err
|
|
}
|
|
|
|
if err := service.Register(a.grpc); err != nil {
|
|
a.logger.Error("Failed to register gRPC service", zap.Error(err))
|
|
a.cleanup(context.Background())
|
|
return err
|
|
}
|
|
a.logger.Debug("gRPC services registered")
|
|
|
|
a.runCtx, a.cancel = context.WithCancel(context.Background())
|
|
a.logger.Debug("gRPC server context initialised")
|
|
|
|
if err := a.grpc.Start(a.runCtx); err != nil {
|
|
a.logger.Error("Failed to start gRPC server", zap.Error(err))
|
|
if a.health != nil {
|
|
a.health.SetStatus(health.SSTerminating)
|
|
}
|
|
a.cleanup(context.Background())
|
|
return err
|
|
}
|
|
|
|
if a.health != nil {
|
|
a.health.SetStatus(health.SSRunning)
|
|
}
|
|
|
|
if addr := a.grpc.Addr(); addr != nil {
|
|
a.logger.Info(fmt.Sprintf("%s gRPC server started", a.name), zap.String("network", addr.Network()), zap.String("address", addr.String()), zap.Bool("debug_mode", a.debug))
|
|
} else {
|
|
a.logger.Info(fmt.Sprintf("%s gRPC server started", a.name), zap.Bool("debug_mode", a.debug))
|
|
}
|
|
|
|
err = <-a.grpc.Done()
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
a.logger.Error("gRPC server stopped with error", zap.Error(err))
|
|
} else {
|
|
a.logger.Info("gRPC server finished")
|
|
}
|
|
|
|
a.cleanup(context.Background())
|
|
return err
|
|
}
|
|
|
|
func (a *App[T]) Shutdown(ctx context.Context) {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
if a.cancel != nil {
|
|
a.cancel()
|
|
}
|
|
if a.grpc != nil {
|
|
if err := a.grpc.Finish(ctx); err != nil && !errors.Is(err, context.Canceled) {
|
|
a.logger.Warn("Failed to stop gRPC server gracefully", zap.Error(err))
|
|
} else {
|
|
a.logger.Info("gRPC server stopped")
|
|
}
|
|
}
|
|
a.cleanup(ctx)
|
|
}
|
|
|
|
func (a *App[T]) cleanup(ctx context.Context) {
|
|
a.cleanupOnce.Do(func() {
|
|
a.logger.Debug("Performing application cleanup")
|
|
if a.health != nil {
|
|
a.health.SetStatus(health.SSTerminating)
|
|
a.health.Finish()
|
|
a.health = nil
|
|
}
|
|
if a.metricsSrv != nil {
|
|
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
if err := a.metricsSrv.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
a.logger.Warn("Failed to stop Prometheus metrics server", zap.Error(err))
|
|
} else {
|
|
a.logger.Info("Prometheus metrics server stopped")
|
|
}
|
|
cancel()
|
|
a.metricsSrv = nil
|
|
}
|
|
if a.mongoConn != nil {
|
|
if err := a.mongoConn.Disconnect(ctx); err != nil {
|
|
a.logger.Warn("Failed to close MongoDB connection", zap.Error(err))
|
|
} else {
|
|
a.logger.Info("MongoDB connection closed")
|
|
}
|
|
a.mongoConn = nil
|
|
}
|
|
})
|
|
}
|
|
|
|
func (a *App[T]) metricsAddr() string {
|
|
if a.metricsCfg == nil {
|
|
return ""
|
|
}
|
|
return a.metricsCfg.listenAddress()
|
|
}
|