253 lines
6.1 KiB
Go
253 lines
6.1 KiB
Go
package serverimp
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/edge/callbacks/internal/config"
|
|
"github.com/tech/sendico/edge/callbacks/internal/delivery"
|
|
"github.com/tech/sendico/edge/callbacks/internal/events"
|
|
"github.com/tech/sendico/edge/callbacks/internal/ingest"
|
|
"github.com/tech/sendico/edge/callbacks/internal/ops"
|
|
"github.com/tech/sendico/edge/callbacks/internal/retry"
|
|
"github.com/tech/sendico/edge/callbacks/internal/secrets"
|
|
"github.com/tech/sendico/edge/callbacks/internal/security"
|
|
"github.com/tech/sendico/edge/callbacks/internal/signing"
|
|
"github.com/tech/sendico/edge/callbacks/internal/storage"
|
|
"github.com/tech/sendico/edge/callbacks/internal/subscriptions"
|
|
"github.com/tech/sendico/pkg/api/routers/health"
|
|
"github.com/tech/sendico/pkg/db"
|
|
msg "github.com/tech/sendico/pkg/messaging"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/vault/kv"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const defaultShutdownTimeout = 15 * time.Second
|
|
|
|
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
|
|
return &Imp{
|
|
logger: logger.Named("server"),
|
|
file: file,
|
|
debug: debug,
|
|
}, nil
|
|
}
|
|
|
|
func (i *Imp) Start() error {
|
|
i.initStopChannels()
|
|
defer i.closeDone()
|
|
|
|
loader := config.New(i.logger)
|
|
cfg, err := loader.Load(i.file)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
i.config = cfg
|
|
|
|
observer := ops.NewObserver()
|
|
metricsSrv, err := ops.NewHTTPServer(i.logger, ops.HTTPServerConfig{Address: cfg.Metrics.ListenAddress()})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
i.opServer = metricsSrv
|
|
i.opServer.SetStatus(health.SSStarting)
|
|
|
|
conn, err := db.ConnectMongo(i.logger.Named("mongo"), cfg.Database)
|
|
if err != nil {
|
|
i.shutdownRuntime(context.Background())
|
|
return err
|
|
}
|
|
i.mongoConn = conn
|
|
|
|
repo, err := storage.New(i.logger, conn)
|
|
if err != nil {
|
|
i.shutdownRuntime(context.Background())
|
|
return err
|
|
}
|
|
|
|
resolver, err := subscriptions.New(subscriptions.Dependencies{
|
|
EndpointRepo: repo.Endpoints(),
|
|
Logger: i.logger,
|
|
})
|
|
if err != nil {
|
|
i.shutdownRuntime(context.Background())
|
|
return err
|
|
}
|
|
|
|
securityValidator := security.New(i.logger, security.Config{
|
|
RequireHTTPS: cfg.Security.RequireHTTPS,
|
|
AllowedHosts: cfg.Security.AllowedHosts,
|
|
AllowedPorts: cfg.Security.AllowedPorts,
|
|
DNSResolveTimeout: int(cfg.Security.DNSResolveTimeoutMS() / time.Millisecond),
|
|
})
|
|
|
|
secretProvider, err := secrets.New(secrets.Options{
|
|
Logger: i.logger,
|
|
Static: cfg.Secrets.Static,
|
|
CacheTTL: cfg.Secrets.CacheTTL(),
|
|
Vault: secrets.VaultOptions{
|
|
Config: kv.Config{
|
|
Address: cfg.Secrets.Vault.Address,
|
|
TokenEnv: cfg.Secrets.Vault.TokenEnv,
|
|
Namespace: cfg.Secrets.Vault.Namespace,
|
|
MountPath: cfg.Secrets.Vault.MountPath,
|
|
},
|
|
DefaultField: cfg.Secrets.Vault.DefaultField,
|
|
},
|
|
})
|
|
if err != nil {
|
|
i.shutdownRuntime(context.Background())
|
|
return err
|
|
}
|
|
signer, err := signing.New(signing.Dependencies{Logger: i.logger, Provider: secretProvider})
|
|
if err != nil {
|
|
i.shutdownRuntime(context.Background())
|
|
return err
|
|
}
|
|
|
|
retryPolicy := retry.New()
|
|
eventSvc := events.New(i.logger)
|
|
|
|
broker, err := msg.CreateMessagingBroker(i.logger.Named("messaging"), cfg.Messaging)
|
|
if err != nil {
|
|
i.shutdownRuntime(context.Background())
|
|
return err
|
|
}
|
|
i.broker = broker
|
|
|
|
ingestSvc, err := ingest.New(ingest.Dependencies{
|
|
Logger: i.logger,
|
|
Broker: broker,
|
|
Events: eventSvc,
|
|
Resolver: resolver,
|
|
InboxRepo: repo.Inbox(),
|
|
TaskRepo: repo.Tasks(),
|
|
TaskDefaults: deliveryTaskDefaults(cfg),
|
|
Observer: observer,
|
|
})
|
|
if err != nil {
|
|
i.shutdownRuntime(context.Background())
|
|
return err
|
|
}
|
|
i.ingest = ingestSvc
|
|
|
|
deliverySvc, err := delivery.New(delivery.Dependencies{
|
|
Logger: i.logger,
|
|
Config: delivery.Config{
|
|
WorkerConcurrency: cfg.Delivery.WorkerConcurrency,
|
|
WorkerPoll: cfg.Delivery.WorkerPollInterval(),
|
|
LockTTL: cfg.Delivery.LockTTL(),
|
|
RequestTimeout: cfg.Delivery.RequestTimeout(),
|
|
JitterRatio: cfg.Delivery.JitterRatio,
|
|
},
|
|
Tasks: repo.Tasks(),
|
|
Retry: retryPolicy,
|
|
Security: securityValidator,
|
|
Signer: signer,
|
|
Observer: observer,
|
|
})
|
|
if err != nil {
|
|
i.shutdownRuntime(context.Background())
|
|
return err
|
|
}
|
|
i.delivery = deliverySvc
|
|
|
|
runCtx, cancel := context.WithCancel(context.Background())
|
|
i.runCancel = cancel
|
|
i.ingest.Start(runCtx)
|
|
i.delivery.Start(runCtx)
|
|
i.opServer.SetStatus(health.SSRunning)
|
|
|
|
i.logger.Info("Callbacks service ready",
|
|
zap.Int("workers", cfg.Delivery.WorkerConcurrency),
|
|
)
|
|
|
|
<-i.stopCh
|
|
i.logger.Info("Callbacks service stop signal received")
|
|
i.shutdownRuntime(context.Background())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (i *Imp) Shutdown() {
|
|
i.signalStop()
|
|
if i.doneCh != nil {
|
|
<-i.doneCh
|
|
}
|
|
}
|
|
|
|
func (i *Imp) initStopChannels() {
|
|
if i.stopCh == nil {
|
|
i.stopCh = make(chan struct{})
|
|
}
|
|
if i.doneCh == nil {
|
|
i.doneCh = make(chan struct{})
|
|
}
|
|
}
|
|
|
|
func (i *Imp) signalStop() {
|
|
i.stopOnce.Do(func() {
|
|
if i.stopCh != nil {
|
|
close(i.stopCh)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (i *Imp) closeDone() {
|
|
i.doneOnce.Do(func() {
|
|
if i.doneCh != nil {
|
|
close(i.doneCh)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (i *Imp) shutdownRuntime(ctx context.Context) {
|
|
i.shutdown.Do(func() {
|
|
if i.opServer != nil {
|
|
i.opServer.SetStatus(health.SSTerminating)
|
|
}
|
|
if i.runCancel != nil {
|
|
i.runCancel()
|
|
}
|
|
if i.ingest != nil {
|
|
i.ingest.Stop()
|
|
}
|
|
if i.delivery != nil {
|
|
i.delivery.Stop()
|
|
}
|
|
if i.opServer != nil {
|
|
i.opServer.Close(ctx)
|
|
i.opServer = nil
|
|
}
|
|
|
|
if i.mongoConn != nil {
|
|
timeout := i.shutdownTimeout()
|
|
shutdownCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
if err := i.mongoConn.Disconnect(shutdownCtx); err != nil {
|
|
i.logger.Warn("Failed to close MongoDB connection", zap.Error(err))
|
|
}
|
|
i.mongoConn = nil
|
|
}
|
|
})
|
|
}
|
|
|
|
func (i *Imp) shutdownTimeout() time.Duration {
|
|
if i.config != nil && i.config.Runtime != nil {
|
|
return i.config.Runtime.ShutdownTimeout()
|
|
}
|
|
return defaultShutdownTimeout
|
|
}
|
|
|
|
func deliveryTaskDefaults(cfg *config.Config) storage.TaskDefaults {
|
|
if cfg == nil {
|
|
return storage.TaskDefaults{}
|
|
}
|
|
return storage.TaskDefaults{
|
|
MaxAttempts: cfg.Delivery.MaxAttempts,
|
|
MinDelay: cfg.Delivery.MinDelay(),
|
|
MaxDelay: cfg.Delivery.MaxDelay(),
|
|
RequestTimeout: cfg.Delivery.RequestTimeout(),
|
|
}
|
|
}
|