Fully separated payment quotation and orchestration flows
This commit is contained in:
@@ -11,6 +11,34 @@ import (
|
||||
|
||||
const quotationDiscoverySender = "payment_quotation"
|
||||
|
||||
func (i *Imp) initDiscovery(cfg *config) {
|
||||
if i == nil || cfg == nil || cfg.Messaging == nil || cfg.Messaging.Driver == "" {
|
||||
return
|
||||
}
|
||||
|
||||
logger := i.logger.Named("discovery")
|
||||
broker, err := msg.CreateMessagingBroker(logger.Named("bus"), cfg.Messaging)
|
||||
if err != nil {
|
||||
i.logger.Warn("Failed to initialise discovery broker", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
registry := discovery.NewRegistry()
|
||||
watcher, err := discovery.NewRegistryWatcher(logger, broker, registry)
|
||||
if err != nil {
|
||||
i.logger.Warn("Failed to initialise discovery registry watcher", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if err := watcher.Start(); err != nil {
|
||||
i.logger.Warn("Failed to start discovery registry watcher", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
i.discoveryWatcher = watcher
|
||||
i.discoveryReg = registry
|
||||
i.logger.Info("Discovery registry watcher started")
|
||||
}
|
||||
|
||||
func (i *Imp) startDiscoveryAnnouncer(cfg *config, producer msg.Producer) {
|
||||
if i == nil || cfg == nil || producer == nil || cfg.GRPC == nil {
|
||||
return
|
||||
@@ -43,3 +71,15 @@ func (i *Imp) stopDiscoveryAnnouncer() {
|
||||
i.discoveryAnnouncer.Stop()
|
||||
i.discoveryAnnouncer = nil
|
||||
}
|
||||
|
||||
func (i *Imp) stopDiscovery() {
|
||||
if i == nil {
|
||||
return
|
||||
}
|
||||
i.stopDiscoveryAnnouncer()
|
||||
if i.discoveryWatcher != nil {
|
||||
i.discoveryWatcher.Stop()
|
||||
i.discoveryWatcher = nil
|
||||
}
|
||||
i.discoveryReg = nil
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package serverimp
|
||||
|
||||
import (
|
||||
quotesvc "github.com/tech/sendico/payments/quotation/internal/service/orchestrator"
|
||||
quotesvc "github.com/tech/sendico/payments/quotation/internal/service/quotation"
|
||||
"github.com/tech/sendico/payments/storage"
|
||||
mongostorage "github.com/tech/sendico/payments/storage/mongo"
|
||||
"github.com/tech/sendico/pkg/db"
|
||||
@@ -19,7 +19,7 @@ func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
|
||||
}
|
||||
|
||||
func (i *Imp) Shutdown() {
|
||||
i.stopDiscoveryAnnouncer()
|
||||
i.stopDiscovery()
|
||||
if i.service != nil {
|
||||
i.service.Shutdown()
|
||||
}
|
||||
@@ -34,6 +34,7 @@ func (i *Imp) Start() error {
|
||||
}
|
||||
i.config = cfg
|
||||
|
||||
i.initDiscovery(cfg)
|
||||
i.deps = i.initDependencies(cfg)
|
||||
|
||||
quoteRetention := cfg.quoteRetention()
|
||||
@@ -54,6 +55,9 @@ func (i *Imp) Start() error {
|
||||
opts = append(opts, quotesvc.WithChainGatewayClient(i.deps.gatewayClient))
|
||||
}
|
||||
}
|
||||
if registry := quotesvc.NewDiscoveryGatewayRegistry(logger, i.discoveryReg); registry != nil {
|
||||
opts = append(opts, quotesvc.WithGatewayRegistry(registry))
|
||||
}
|
||||
i.startDiscoveryAnnouncer(cfg, producer)
|
||||
svc := quotesvc.NewQuotationService(logger, repo, opts...)
|
||||
i.service = svc
|
||||
|
||||
@@ -33,5 +33,7 @@ type Imp struct {
|
||||
service quoteService
|
||||
deps *clientDependencies
|
||||
|
||||
discoveryWatcher *discovery.RegistryWatcher
|
||||
discoveryReg *discovery.Registry
|
||||
discoveryAnnouncer *discovery.Announcer
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user