diff --git a/api/fx/ingestor/.gitignore b/api/fx/ingestor/.gitignore index dc67a7e..1abe8f4 100644 --- a/api/fx/ingestor/.gitignore +++ b/api/fx/ingestor/.gitignore @@ -1,3 +1,3 @@ internal/generated .gocache -app \ No newline at end of file +/app diff --git a/api/fx/ingestor/internal/app/app.go b/api/fx/ingestor/internal/app/app.go new file mode 100644 index 0000000..28ab12d --- /dev/null +++ b/api/fx/ingestor/internal/app/app.go @@ -0,0 +1,82 @@ +package app + +import ( + "context" + "errors" + "strings" + + "github.com/tech/sendico/fx/ingestor/internal/appversion" + "github.com/tech/sendico/fx/ingestor/internal/config" + "github.com/tech/sendico/fx/ingestor/internal/fmerrors" + "github.com/tech/sendico/fx/ingestor/internal/ingestor" + "github.com/tech/sendico/fx/ingestor/internal/metrics" + mongostorage "github.com/tech/sendico/fx/storage/mongo" + "github.com/tech/sendico/pkg/api/routers/health" + "github.com/tech/sendico/pkg/db" + "github.com/tech/sendico/pkg/mlogger" + "go.uber.org/zap" +) + +const DefaultConfigPath = "config.yml" + +type App struct { + logger mlogger.Logger + cfg *config.Config +} + +func New(logger mlogger.Logger, cfgPath string) (*App, error) { + if logger == nil { + return nil, fmerrors.New("app: logger is nil") + } + path := strings.TrimSpace(cfgPath) + if path == "" { + path = DefaultConfigPath + } + cfg, err := config.Load(path) + if err != nil { + return nil, err + } + return &App{ + logger: logger, + cfg: cfg, + }, nil +} + +func (a *App) Run(ctx context.Context) error { + metricsSrv, err := metrics.NewServer(a.logger, a.cfg.MetricsConfig()) + if err != nil { + return err + } + a.logger.Debug("Metrics server initialised") + defer metricsSrv.Close(context.Background()) + + conn, err := db.ConnectMongo(a.logger, a.cfg.Database) + if err != nil { + return err + } + defer conn.Disconnect(context.Background()) + a.logger.Debug("MongoDB connection established") + + repo, err := mongostorage.New(a.logger, conn) + if err != nil { + return err + } + a.logger.Debug("Storage repository initialised") + + service, err := ingestor.New(a.logger, a.cfg, repo) + if err != nil { + return err + } + + a.logger.Info("Starting FX ingestor service", zap.String("version", appversion.Create().Info())) + metricsSrv.SetStatus(health.SSRunning) + + if err := service.Run(ctx); err != nil { + if !errors.Is(err, context.Canceled) { // ignore termination reques error + a.logger.Error("Ingestor service exited with error", zap.Error(err)) + } + return err + } + a.logger.Info("Ingestor service stopped") + return nil +}