211 lines
6.0 KiB
Go
211 lines
6.0 KiB
Go
package serverimp
|
|
|
|
import (
|
|
"context"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/gateway/tgsettle/internal/service/gateway"
|
|
"github.com/tech/sendico/gateway/tgsettle/storage"
|
|
gatewaymongo "github.com/tech/sendico/gateway/tgsettle/storage/mongo"
|
|
"github.com/tech/sendico/pkg/api/routers"
|
|
"github.com/tech/sendico/pkg/db"
|
|
"github.com/tech/sendico/pkg/discovery"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
msg "github.com/tech/sendico/pkg/messaging"
|
|
mb "github.com/tech/sendico/pkg/messaging/broker"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/server/grpcapp"
|
|
"go.uber.org/zap"
|
|
"gopkg.in/yaml.v3"
|
|
)
|
|
|
|
type Imp struct {
|
|
logger mlogger.Logger
|
|
file string
|
|
debug bool
|
|
|
|
config *config
|
|
app *grpcapp.App[storage.Repository]
|
|
service *gateway.Service
|
|
|
|
discoveryWatcher *discovery.RegistryWatcher
|
|
discoveryReg *discovery.Registry
|
|
}
|
|
|
|
type config struct {
|
|
*grpcapp.Config `yaml:",inline"`
|
|
Gateway gatewayConfig `yaml:"gateway"`
|
|
Treasury treasuryConfig `yaml:"treasury"`
|
|
}
|
|
|
|
type gatewayConfig struct {
|
|
Rail string `yaml:"rail"`
|
|
TargetChatIDEnv string `yaml:"target_chat_id_env"`
|
|
TimeoutSeconds int32 `yaml:"timeout_seconds"`
|
|
AcceptedUserIDs []string `yaml:"accepted_user_ids"`
|
|
SuccessReaction string `yaml:"success_reaction"`
|
|
}
|
|
|
|
type treasuryConfig struct {
|
|
ExecutionDelay time.Duration `yaml:"execution_delay"`
|
|
PollInterval time.Duration `yaml:"poll_interval"`
|
|
Ledger ledgerConfig `yaml:"ledger"`
|
|
Limits treasuryLimitsConfig `yaml:"limits"`
|
|
}
|
|
|
|
type treasuryLimitsConfig struct {
|
|
MaxAmountPerOperation string `yaml:"max_amount_per_operation"`
|
|
MaxDailyAmount string `yaml:"max_daily_amount"`
|
|
}
|
|
|
|
type ledgerConfig struct {
|
|
Timeout time.Duration `yaml:"timeout"`
|
|
}
|
|
|
|
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
|
|
return &Imp{
|
|
logger: logger.Named("server"),
|
|
file: file,
|
|
debug: debug,
|
|
}, nil
|
|
}
|
|
|
|
func (i *Imp) Shutdown() {
|
|
if i.app == nil {
|
|
return
|
|
}
|
|
timeout := 15 * time.Second
|
|
if i.config != nil && i.config.Runtime != nil {
|
|
timeout = i.config.Runtime.ShutdownTimeout()
|
|
}
|
|
if i.service != nil {
|
|
i.service.Shutdown()
|
|
}
|
|
if i.discoveryWatcher != nil {
|
|
i.discoveryWatcher.Stop()
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
i.app.Shutdown(ctx)
|
|
}
|
|
|
|
func (i *Imp) Start() error {
|
|
cfg, err := i.loadConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
i.config = cfg
|
|
|
|
var broker mb.Broker
|
|
if cfg.Messaging != nil && cfg.Messaging.Driver != "" {
|
|
broker, err = msg.CreateMessagingBroker(i.logger, cfg.Messaging)
|
|
if err != nil {
|
|
i.logger.Warn("Failed to create messaging broker", zap.Error(err))
|
|
}
|
|
}
|
|
if broker != nil {
|
|
registry := discovery.NewRegistry()
|
|
watcher, watcherErr := discovery.NewRegistryWatcher(i.logger, broker, registry)
|
|
if watcherErr != nil {
|
|
i.logger.Warn("Failed to initialise discovery registry watcher", zap.Error(watcherErr))
|
|
} else if startErr := watcher.Start(); startErr != nil {
|
|
i.logger.Warn("Failed to start discovery registry watcher", zap.Error(startErr))
|
|
} else {
|
|
i.discoveryWatcher = watcher
|
|
i.discoveryReg = registry
|
|
i.logger.Info("Discovery registry watcher started")
|
|
}
|
|
}
|
|
|
|
repoFactory := func(logger mlogger.Logger, conn *db.MongoConnection) (storage.Repository, error) {
|
|
return gatewaymongo.New(logger, conn)
|
|
}
|
|
|
|
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
|
|
invokeURI := ""
|
|
if cfg.GRPC != nil {
|
|
invokeURI = cfg.GRPC.DiscoveryInvokeURI()
|
|
}
|
|
msgSettings := map[string]any(nil)
|
|
if cfg.Messaging != nil {
|
|
msgSettings = cfg.Messaging.Settings
|
|
}
|
|
gwCfg := gateway.Config{
|
|
Rail: cfg.Gateway.Rail,
|
|
TargetChatIDEnv: cfg.Gateway.TargetChatIDEnv,
|
|
TimeoutSeconds: cfg.Gateway.TimeoutSeconds,
|
|
AcceptedUserIDs: cfg.Gateway.AcceptedUserIDs,
|
|
SuccessReaction: cfg.Gateway.SuccessReaction,
|
|
InvokeURI: invokeURI,
|
|
MessagingSettings: msgSettings,
|
|
DiscoveryRegistry: i.discoveryReg,
|
|
Treasury: gateway.TreasuryConfig{
|
|
ExecutionDelay: cfg.Treasury.ExecutionDelay,
|
|
PollInterval: cfg.Treasury.PollInterval,
|
|
Ledger: gateway.LedgerConfig{
|
|
Timeout: cfg.Treasury.Ledger.Timeout,
|
|
},
|
|
Limits: gateway.TreasuryLimitsConfig{
|
|
MaxAmountPerOperation: cfg.Treasury.Limits.MaxAmountPerOperation,
|
|
MaxDailyAmount: cfg.Treasury.Limits.MaxDailyAmount,
|
|
},
|
|
},
|
|
}
|
|
svc := gateway.NewService(logger, repo, producer, broker, gwCfg)
|
|
i.service = svc
|
|
return svc, nil
|
|
}
|
|
|
|
app, err := grpcapp.NewApp(i.logger, "tgsettle_gateway", cfg.Config, i.debug, repoFactory, serviceFactory)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
i.app = app
|
|
return i.app.Start()
|
|
}
|
|
|
|
func (i *Imp) loadConfig() (*config, error) {
|
|
data, err := os.ReadFile(i.file)
|
|
if err != nil {
|
|
i.logger.Error("Could not read configuration file", zap.String("config_file", i.file), zap.Error(err))
|
|
return nil, err
|
|
}
|
|
cfg := &config{Config: &grpcapp.Config{}}
|
|
if err := yaml.Unmarshal(data, cfg); err != nil {
|
|
i.logger.Error("Failed to parse configuration", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
if cfg.Runtime == nil {
|
|
cfg.Runtime = &grpcapp.RuntimeConfig{ShutdownTimeoutSeconds: 15}
|
|
}
|
|
if cfg.GRPC == nil {
|
|
cfg.GRPC = &routers.GRPCConfig{
|
|
Network: "tcp",
|
|
Address: ":50080",
|
|
EnableReflection: true,
|
|
EnableHealth: true,
|
|
}
|
|
}
|
|
if cfg.Metrics == nil {
|
|
cfg.Metrics = &grpcapp.MetricsConfig{Address: ":9406"}
|
|
}
|
|
if cfg.Treasury.ExecutionDelay <= 0 {
|
|
cfg.Treasury.ExecutionDelay = 30 * time.Second
|
|
}
|
|
if cfg.Treasury.PollInterval <= 0 {
|
|
cfg.Treasury.PollInterval = 30 * time.Second
|
|
}
|
|
if cfg.Treasury.Ledger.Timeout <= 0 {
|
|
cfg.Treasury.Ledger.Timeout = 5 * time.Second
|
|
}
|
|
cfg.Gateway.Rail = discovery.NormalizeRail(cfg.Gateway.Rail)
|
|
if cfg.Gateway.Rail == "" {
|
|
return nil, merrors.InvalidArgument("gateway rail is required", "gateway.rail")
|
|
}
|
|
if !discovery.IsKnownRail(cfg.Gateway.Rail) {
|
|
return nil, merrors.InvalidArgument("gateway rail must be a known token", "gateway.rail")
|
|
}
|
|
return cfg, nil
|
|
}
|