package app import ( "context" "errors" "strings" "github.com/tech/sendico/fx/ingestor/internal/appversion" "github.com/tech/sendico/fx/ingestor/internal/config" "github.com/tech/sendico/fx/ingestor/internal/ingestor" "github.com/tech/sendico/fx/ingestor/internal/metrics" mongostorage "github.com/tech/sendico/fx/storage/mongo" "github.com/tech/sendico/pkg/api/routers/health" "github.com/tech/sendico/pkg/db" "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" msg "github.com/tech/sendico/pkg/messaging" msgproducer "github.com/tech/sendico/pkg/messaging/producer" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) const DefaultConfigPath = "config.yml" type App struct { logger mlogger.Logger cfg *config.Config } func New(logger mlogger.Logger, cfgPath string) (*App, error) { if logger == nil { return nil, merrors.InvalidArgument("app: logger is nil") } path := strings.TrimSpace(cfgPath) if path == "" { path = DefaultConfigPath } cfg, err := config.Load(path) if err != nil { return nil, err } return &App{ logger: logger, cfg: cfg, }, nil } func (a *App) Run(ctx context.Context) error { metricsSrv, err := metrics.NewServer(a.logger, a.cfg.MetricsConfig()) if err != nil { return err } a.logger.Debug("Metrics server initialised") defer metricsSrv.Close(context.Background()) conn, err := db.ConnectMongo(a.logger, a.cfg.Database) if err != nil { return err } defer conn.Disconnect(context.Background()) a.logger.Debug("MongoDB connection established") repo, err := mongostorage.New(a.logger, conn) if err != nil { return err } a.logger.Debug("Storage repository initialised") service, err := ingestor.New(a.logger, a.cfg, repo) if err != nil { return err } var announcer *discovery.Announcer if cfg := a.cfg.Messaging; cfg != nil && cfg.Driver != "" { broker, err := msg.CreateMessagingBroker(a.logger.Named("discovery_bus"), cfg) if err != nil { a.logger.Warn("Failed to initialize discovery broker", zap.Error(err)) } else { producer := msgproducer.NewProducer(a.logger.Named("discovery_producer"), broker) announce := discovery.Announcement{ Service: "FX_INGESTOR", Operations: []string{"fx.ingest"}, Version: appversion.Create().Short(), } announcer = discovery.NewAnnouncer(a.logger, producer, "fx_ingestor", announce) announcer.Start() defer announcer.Stop() } } a.logger.Info("Starting FX ingestor service", zap.String("version", appversion.Create().Info())) metricsSrv.SetStatus(health.SSRunning) if err := service.Run(ctx); err != nil { if !errors.Is(err, context.Canceled) { // ignore termination reques error a.logger.Error("Ingestor service exited with error", zap.Error(err)) } return err } a.logger.Info("Ingestor service stopped") return nil }