discovery: +invoke url #271

Merged
tech merged 1 commits from discovery-270 into main 2026-01-19 10:07:55 +00:00
26 changed files with 170 additions and 32 deletions
Showing only changes of commit 64803a21e0 - Show all commits

View File

@@ -4,6 +4,7 @@ runtime:
grpc: grpc:
network: tcp network: tcp
address: ":50060" address: ":50060"
advertise_host: "sendico_billing_fees"
enable_reflection: true enable_reflection: true
enable_health: true enable_health: true

View File

@@ -129,6 +129,11 @@ func (i *Imp) Start() error {
if oracleClient != nil { if oracleClient != nil {
opts = append(opts, fees.WithOracleClient(oracleClient)) opts = append(opts, fees.WithOracleClient(oracleClient))
} }
if cfg.GRPC != nil {
if invokeURI := cfg.GRPC.DiscoveryInvokeURI(); invokeURI != "" {
opts = append(opts, fees.WithDiscoveryInvokeURI(invokeURI))
}
}
svc := fees.NewService(logger, repo, producer, opts...) svc := fees.NewService(logger, repo, producer, opts...)
i.service = svc i.service = svc
return svc, nil return svc, nil

View File

@@ -1,6 +1,8 @@
package fees package fees
import ( import (
"strings"
internalcalculator "github.com/tech/sendico/billing/fees/internal/service/fees/internal/calculator" internalcalculator "github.com/tech/sendico/billing/fees/internal/service/fees/internal/calculator"
oracleclient "github.com/tech/sendico/fx/oracle/client" oracleclient "github.com/tech/sendico/fx/oracle/client"
clockpkg "github.com/tech/sendico/pkg/clock" clockpkg "github.com/tech/sendico/pkg/clock"
@@ -46,3 +48,10 @@ func WithFeeResolver(r FeeResolver) Option {
} }
} }
} }
// WithDiscoveryInvokeURI sets the invoke URI used when announcing the service in discovery.
func WithDiscoveryInvokeURI(invokeURI string) Option {
return func(s *Service) {
s.invokeURI = strings.TrimSpace(invokeURI)
}
}

View File

@@ -40,6 +40,7 @@ type Service struct {
oracle oracleclient.Client oracle oracleclient.Client
resolver FeeResolver resolver FeeResolver
announcer *discovery.Announcer announcer *discovery.Announcer
invokeURI string
feesv1.UnimplementedFeeEngineServer feesv1.UnimplementedFeeEngineServer
} }
@@ -93,6 +94,7 @@ func (s *Service) startDiscoveryAnnouncer() {
announce := discovery.Announcement{ announce := discovery.Announcement{
Service: "BILLING_FEES", Service: "BILLING_FEES",
Operations: []string{"fee.calc"}, Operations: []string{"fee.calc"},
InvokeURI: s.invokeURI,
Version: appversion.Create().Short(), Version: appversion.Create().Short(),
} }
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.FeePlans), announce) s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.FeePlans), announce)

View File

@@ -4,6 +4,7 @@ runtime:
grpc: grpc:
network: tcp network: tcp
address: ":50051" address: ":50051"
advertise_host: "sendico_fx_oracle"
enable_reflection: true enable_reflection: true
enable_health: true enable_health: true

View File

@@ -63,7 +63,7 @@ func (i *Imp) Start() error {
} }
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) { serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
svc := oracle.NewService(logger, repo, producer) svc := oracle.NewService(logger, repo, producer, cfg.GRPC.DiscoveryInvokeURI())
i.service = svc i.service = svc
return svc, nil return svc, nil
} }

View File

@@ -42,15 +42,17 @@ type Service struct {
storage storage.Repository storage storage.Repository
producer pmessaging.Producer producer pmessaging.Producer
announcer *discovery.Announcer announcer *discovery.Announcer
invokeURI string
oraclev1.UnimplementedOracleServer oraclev1.UnimplementedOracleServer
} }
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer) *Service { func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, invokeURI string) *Service {
initMetrics() initMetrics()
svc := &Service{ svc := &Service{
logger: logger.Named("oracle"), logger: logger.Named("oracle"),
storage: repo, storage: repo,
producer: prod, producer: prod,
invokeURI: strings.TrimSpace(invokeURI),
} }
svc.startDiscoveryAnnouncer() svc.startDiscoveryAnnouncer()
return svc return svc
@@ -78,6 +80,7 @@ func (s *Service) startDiscoveryAnnouncer() {
announce := discovery.Announcement{ announce := discovery.Announcement{
Service: "FX_ORACLE", Service: "FX_ORACLE",
Operations: []string{"fx.quote"}, Operations: []string{"fx.quote"},
InvokeURI: s.invokeURI,
Version: appversion.Create().Short(), Version: appversion.Create().Short(),
} }
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.FXOracle), announce) s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.FXOracle), announce)

View File

@@ -142,7 +142,7 @@ func TestServiceGetQuoteFirm(t *testing.T) {
} }
repo.currencies = currencyStoreStub{} repo.currencies = currencyStoreStub{}
svc := NewService(zap.NewNop(), repo, nil) svc := NewService(zap.NewNop(), repo, nil, "")
req := &oraclev1.GetQuoteRequest{ req := &oraclev1.GetQuoteRequest{
Meta: &oraclev1.RequestMeta{ Meta: &oraclev1.RequestMeta{
@@ -189,7 +189,7 @@ func TestServiceGetQuoteRateNotFound(t *testing.T) {
return nil, merrors.ErrNoData return nil, merrors.ErrNoData
}}, }},
} }
svc := NewService(zap.NewNop(), repo, nil) svc := NewService(zap.NewNop(), repo, nil, "")
_, err := svc.GetQuote(context.Background(), &oraclev1.GetQuoteRequest{ _, err := svc.GetQuote(context.Background(), &oraclev1.GetQuoteRequest{
Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"}, Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"},
@@ -263,7 +263,7 @@ func TestServiceGetQuoteCrossRate(t *testing.T) {
repo.quotes = &quotesStoreStub{} repo.quotes = &quotesStoreStub{}
repo.currencies = currencyStoreStub{} repo.currencies = currencyStoreStub{}
svc := NewService(zap.NewNop(), repo, nil) svc := NewService(zap.NewNop(), repo, nil, "")
req := &oraclev1.GetQuoteRequest{ req := &oraclev1.GetQuoteRequest{
Pair: &fxv1.CurrencyPair{Base: "EUR", Quote: "RUB"}, Pair: &fxv1.CurrencyPair{Base: "EUR", Quote: "RUB"},
@@ -352,7 +352,7 @@ func TestServiceLatestRateCross(t *testing.T) {
repo.quotes = &quotesStoreStub{} repo.quotes = &quotesStoreStub{}
repo.currencies = currencyStoreStub{} repo.currencies = currencyStoreStub{}
svc := NewService(zap.NewNop(), repo, nil) svc := NewService(zap.NewNop(), repo, nil, "")
resp, err := svc.LatestRate(context.Background(), &oraclev1.LatestRateRequest{ resp, err := svc.LatestRate(context.Background(), &oraclev1.LatestRateRequest{
Pair: &fxv1.CurrencyPair{Base: "EUR", Quote: "RUB"}, Pair: &fxv1.CurrencyPair{Base: "EUR", Quote: "RUB"},
@@ -390,7 +390,7 @@ func TestServiceValidateQuote(t *testing.T) {
}, },
}, },
} }
svc := NewService(zap.NewNop(), repo, nil) svc := NewService(zap.NewNop(), repo, nil, "")
resp, err := svc.ValidateQuote(context.Background(), &oraclev1.ValidateQuoteRequest{QuoteRef: "q1"}) resp, err := svc.ValidateQuote(context.Background(), &oraclev1.ValidateQuoteRequest{QuoteRef: "q1"})
if err != nil { if err != nil {
@@ -409,7 +409,7 @@ func TestServiceConsumeQuoteExpired(t *testing.T) {
}, },
}, },
} }
svc := NewService(zap.NewNop(), repo, nil) svc := NewService(zap.NewNop(), repo, nil, "")
_, err := svc.ConsumeQuote(context.Background(), &oraclev1.ConsumeQuoteRequest{QuoteRef: "q1", LedgerTxnRef: "ledger"}) _, err := svc.ConsumeQuote(context.Background(), &oraclev1.ConsumeQuoteRequest{QuoteRef: "q1", LedgerTxnRef: "ledger"})
if err == nil { if err == nil {
@@ -439,7 +439,7 @@ func TestServiceLatestRateSuccess(t *testing.T) {
}, },
}, },
} }
svc := NewService(zap.NewNop(), repo, nil) svc := NewService(zap.NewNop(), repo, nil, "")
resp, err := svc.LatestRate(context.Background(), &oraclev1.LatestRateRequest{Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"}}) resp, err := svc.LatestRate(context.Background(), &oraclev1.LatestRateRequest{Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"}})
if err != nil { if err != nil {
@@ -456,7 +456,7 @@ func TestServiceListPairs(t *testing.T) {
return []*model.Pair{{Pair: model.CurrencyPair{Base: "USD", Quote: "EUR"}}}, nil return []*model.Pair{{Pair: model.CurrencyPair{Base: "USD", Quote: "EUR"}}}, nil
}}, }},
} }
svc := NewService(zap.NewNop(), repo, nil) svc := NewService(zap.NewNop(), repo, nil, "")
resp, err := svc.ListPairs(context.Background(), &oraclev1.ListPairsRequest{}) resp, err := svc.ListPairs(context.Background(), &oraclev1.ListPairsRequest{})
if err != nil { if err != nil {

View File

@@ -4,6 +4,7 @@ runtime:
grpc: grpc:
network: tcp network: tcp
address: ":50070" address: ":50070"
advertise_host: "sendico_chain_gateway"
enable_reflection: true enable_reflection: true
enable_health: true enable_health: true

View File

@@ -148,7 +148,12 @@ func (i *Imp) Start() error {
} }
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) { serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
invokeURI := ""
if cfg.GRPC != nil {
invokeURI = cfg.GRPC.DiscoveryInvokeURI()
}
opts := []gatewayservice.Option{ opts := []gatewayservice.Option{
gatewayservice.WithDiscoveryInvokeURI(invokeURI),
gatewayservice.WithNetworks(networkConfigs), gatewayservice.WithNetworks(networkConfigs),
gatewayservice.WithServiceWallet(walletConfig), gatewayservice.WithServiceWallet(walletConfig),
gatewayservice.WithKeyManager(keyManager), gatewayservice.WithKeyManager(keyManager),

View File

@@ -83,3 +83,10 @@ func WithSettings(settings CacheSettings) Option {
s.settings = settings.withDefaults() s.settings = settings.withDefaults()
} }
} }
// WithDiscoveryInvokeURI sets the invoke URI used when announcing the gateway.
func WithDiscoveryInvokeURI(invokeURI string) Option {
return func(s *Service) {
s.invokeURI = strings.TrimSpace(invokeURI)
}
}

View File

@@ -51,6 +51,7 @@ type Service struct {
drivers *drivers.Registry drivers *drivers.Registry
commands commands.Registry commands commands.Registry
announcers []*discovery.Announcer announcers []*discovery.Announcer
invokeURI string
connectorv1.UnimplementedConnectorServiceServer connectorv1.UnimplementedConnectorServiceServer
} }
@@ -209,7 +210,7 @@ func (s *Service) startDiscoveryAnnouncers() {
Network: network.Name, Network: network.Name,
Operations: []string{"balance.read", "payin.crypto", "payout.crypto", "fee.send"}, Operations: []string{"balance.read", "payin.crypto", "payout.crypto", "fee.send"},
Currencies: currencies, Currencies: currencies,
InvokeURI: discovery.DefaultInvokeURI(string(mservice.ChainGateway)), InvokeURI: s.invokeURI,
Version: version, Version: version,
} }
announcer := discovery.NewAnnouncer(s.logger, s.producer, string(mservice.ChainGateway), announce) announcer := discovery.NewAnnouncer(s.logger, s.producer, string(mservice.ChainGateway), announce)

View File

@@ -4,6 +4,7 @@ runtime:
grpc: grpc:
network: tcp network: tcp
address: ":50075" address: ":50075"
advertise_host: "sendico_mntx_gateway"
enable_reflection: true enable_reflection: true
enable_health: true enable_health: true

View File

@@ -184,7 +184,12 @@ func (i *Imp) Start() error {
) )
serviceFactory := func(logger mlogger.Logger, _ struct{}, producer msg.Producer) (grpcapp.Service, error) { serviceFactory := func(logger mlogger.Logger, _ struct{}, producer msg.Producer) (grpcapp.Service, error) {
invokeURI := ""
if cfg.GRPC != nil {
invokeURI = cfg.GRPC.DiscoveryInvokeURI()
}
svc := mntxservice.NewService(logger, svc := mntxservice.NewService(logger,
mntxservice.WithDiscoveryInvokeURI(invokeURI),
mntxservice.WithProducer(producer), mntxservice.WithProducer(producer),
mntxservice.WithMonetixConfig(monetixCfg), mntxservice.WithMonetixConfig(monetixCfg),
mntxservice.WithGatewayDescriptor(gatewayDescriptor), mntxservice.WithGatewayDescriptor(gatewayDescriptor),

View File

@@ -2,6 +2,7 @@ package gateway
import ( import (
"net/http" "net/http"
"strings"
"github.com/tech/sendico/gateway/mntx/internal/service/monetix" "github.com/tech/sendico/gateway/mntx/internal/service/monetix"
"github.com/tech/sendico/pkg/clock" "github.com/tech/sendico/pkg/clock"
@@ -52,3 +53,10 @@ func WithGatewayDescriptor(descriptor *gatewayv1.GatewayInstanceDescriptor) Opti
} }
} }
} }
// WithDiscoveryInvokeURI sets the invoke URI used when announcing the gateway.
func WithDiscoveryInvokeURI(invokeURI string) Option {
return func(s *Service) {
s.invokeURI = strings.TrimSpace(invokeURI)
}
}

View File

@@ -30,6 +30,7 @@ type Service struct {
card *cardPayoutProcessor card *cardPayoutProcessor
gatewayDescriptor *gatewayv1.GatewayInstanceDescriptor gatewayDescriptor *gatewayv1.GatewayInstanceDescriptor
announcer *discovery.Announcer announcer *discovery.Announcer
invokeURI string
connectorv1.UnimplementedConnectorServiceServer connectorv1.UnimplementedConnectorServiceServer
} }
@@ -145,7 +146,7 @@ func (s *Service) startDiscoveryAnnouncer() {
Service: "CARD_PAYOUT_RAIL_GATEWAY", Service: "CARD_PAYOUT_RAIL_GATEWAY",
Rail: "CARD_PAYOUT", Rail: "CARD_PAYOUT",
Operations: []string{"payout.card"}, Operations: []string{"payout.card"},
InvokeURI: discovery.DefaultInvokeURI(string(mservice.MntxGateway)), InvokeURI: s.invokeURI,
Version: appversion.Create().Short(), Version: appversion.Create().Short(),
} }
if s.gatewayDescriptor != nil { if s.gatewayDescriptor != nil {

View File

@@ -4,6 +4,7 @@ runtime:
grpc: grpc:
network: tcp network: tcp
address: ":50080" address: ":50080"
advertise_host: "sendico_payment_gateway"
enable_reflection: true enable_reflection: true
enable_health: true enable_health: true

View File

@@ -85,11 +85,16 @@ func (i *Imp) Start() error {
} }
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) { serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
invokeURI := ""
if cfg.GRPC != nil {
invokeURI = cfg.GRPC.DiscoveryInvokeURI()
}
gwCfg := gateway.Config{ gwCfg := gateway.Config{
Rail: cfg.Gateway.Rail, Rail: cfg.Gateway.Rail,
TargetChatIDEnv: cfg.Gateway.TargetChatIDEnv, TargetChatIDEnv: cfg.Gateway.TargetChatIDEnv,
TimeoutSeconds: cfg.Gateway.TimeoutSeconds, TimeoutSeconds: cfg.Gateway.TimeoutSeconds,
AcceptedUserIDs: cfg.Gateway.AcceptedUserIDs, AcceptedUserIDs: cfg.Gateway.AcceptedUserIDs,
InvokeURI: invokeURI,
} }
svc := gateway.NewService(logger, repo, producer, broker, gwCfg) svc := gateway.NewService(logger, repo, producer, broker, gwCfg)
i.service = svc i.service = svc

View File

@@ -50,6 +50,7 @@ type Config struct {
TargetChatIDEnv string TargetChatIDEnv string
TimeoutSeconds int32 TimeoutSeconds int32
AcceptedUserIDs []string AcceptedUserIDs []string
InvokeURI string
} }
type Service struct { type Service struct {
@@ -61,6 +62,7 @@ type Service struct {
rail string rail string
chatID string chatID string
announcer *discovery.Announcer announcer *discovery.Announcer
invokeURI string
mu sync.Mutex mu sync.Mutex
pending map[string]*model.PaymentGatewayIntent pending map[string]*model.PaymentGatewayIntent
@@ -81,6 +83,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
broker: broker, broker: broker,
cfg: cfg, cfg: cfg,
rail: strings.TrimSpace(cfg.Rail), rail: strings.TrimSpace(cfg.Rail),
invokeURI: strings.TrimSpace(cfg.InvokeURI),
pending: map[string]*model.PaymentGatewayIntent{}, pending: map[string]*model.PaymentGatewayIntent{},
} }
svc.chatID = strings.TrimSpace(readEnv(cfg.TargetChatIDEnv)) svc.chatID = strings.TrimSpace(readEnv(cfg.TargetChatIDEnv))
@@ -411,7 +414,7 @@ func (s *Service) startAnnouncer() {
Service: string(mservice.PaymentGateway), Service: string(mservice.PaymentGateway),
Rail: s.rail, Rail: s.rail,
Operations: caps, Operations: caps,
InvokeURI: discovery.DefaultInvokeURI(string(mservice.PaymentGateway)), InvokeURI: s.invokeURI,
} }
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.PaymentGateway), announce) s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.PaymentGateway), announce)
s.announcer.Start() s.announcer.Start()

View File

@@ -4,6 +4,7 @@ runtime:
grpc: grpc:
network: tcp network: tcp
address: ":50052" address: ":50052"
advertise_host: "sendico_ledger"
enable_reflection: true enable_reflection: true
enable_health: true enable_health: true

View File

@@ -116,7 +116,11 @@ func (i *Imp) Start() error {
} }
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) { serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
svc := ledger.NewService(logger, repo, producer, feesClient, feesTimeout) invokeURI := ""
if cfg.GRPC != nil {
invokeURI = cfg.GRPC.DiscoveryInvokeURI()
}
svc := ledger.NewService(logger, repo, producer, feesClient, feesTimeout, invokeURI)
i.service = svc i.service = svc
return svc, nil return svc, nil
} }

View File

@@ -44,6 +44,7 @@ type Service struct {
producer pmessaging.Producer producer pmessaging.Producer
fees feesDependency fees feesDependency
announcer *discovery.Announcer announcer *discovery.Announcer
invokeURI string
outbox struct { outbox struct {
once sync.Once once sync.Once
@@ -61,14 +62,15 @@ func (f feesDependency) available() bool {
return f.client != nil return f.client != nil
} }
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, feesClient feesv1.FeeEngineClient, feesTimeout time.Duration) *Service { func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, feesClient feesv1.FeeEngineClient, feesTimeout time.Duration, invokeURI string) *Service {
// Initialize Prometheus metrics // Initialize Prometheus metrics
initMetrics() initMetrics()
service := &Service{ service := &Service{
logger: logger.Named("ledger"), logger: logger.Named("ledger"),
storage: repo, storage: repo,
producer: prod, producer: prod,
invokeURI: strings.TrimSpace(invokeURI),
fees: feesDependency{ fees: feesDependency{
client: feesClient, client: feesClient,
timeout: feesTimeout, timeout: feesTimeout,
@@ -204,7 +206,7 @@ func (s *Service) startDiscoveryAnnouncer() {
announce := discovery.Announcement{ announce := discovery.Announcement{
Service: "LEDGER", Service: "LEDGER",
Operations: []string{"balance.read", "ledger.debit", "ledger.credit"}, Operations: []string{"balance.read", "ledger.debit", "ledger.credit"},
InvokeURI: discovery.DefaultInvokeURI(string(mservice.Ledger)), InvokeURI: s.invokeURI,
Version: appversion.Create().Short(), Version: appversion.Create().Short(),
} }
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.Ledger), announce) s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.Ledger), announce)

View File

@@ -4,6 +4,7 @@ runtime:
grpc: grpc:
network: tcp network: tcp
address: ":50062" address: ":50062"
advertise_host: "sendico_payments_orchestrator"
enable_reflection: true enable_reflection: true
enable_health: true enable_health: true

View File

@@ -34,6 +34,7 @@ func (i *Imp) initDiscovery(cfg *config) {
announce := discovery.Announcement{ announce := discovery.Announcement{
Service: "PAYMENTS_ORCHESTRATOR", Service: "PAYMENTS_ORCHESTRATOR",
Operations: []string{"payment.quote", "payment.initiate"}, Operations: []string{"payment.quote", "payment.initiate"},
InvokeURI: cfg.GRPC.DiscoveryInvokeURI(),
Version: appversion.Create().Short(), Version: appversion.Create().Short(),
} }
i.discoveryAnnouncer = discovery.NewAnnouncer(i.logger, producer, string(mservice.PaymentOrchestrator), announce) i.discoveryAnnouncer = discovery.NewAnnouncer(i.logger, producer, string(mservice.PaymentOrchestrator), announce)

View File

@@ -1,13 +1,24 @@
package grpcimp package grpcimp
import (
"fmt"
"net"
"os"
"strings"
)
type Config struct { type Config struct {
Network string `yaml:"network"` Network string `yaml:"network"`
Address string `yaml:"address"` Address string `yaml:"address"`
EnableReflection bool `yaml:"enable_reflection"` AdvertiseHost string `yaml:"advertise_host"`
EnableHealth bool `yaml:"enable_health"` AdvertiseHostEnv string `yaml:"advertise_host_env"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size"` AdvertiseScheme string `yaml:"advertise_scheme"`
MaxSendMsgSize int `yaml:"max_send_msg_size"` AdvertiseSchemeEnv string `yaml:"advertise_scheme_env"`
TLS *TLSConfig `yaml:"tls"` EnableReflection bool `yaml:"enable_reflection"`
EnableHealth bool `yaml:"enable_health"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxSendMsgSize int `yaml:"max_send_msg_size"`
TLS *TLSConfig `yaml:"tls"`
} }
type TLSConfig struct { type TLSConfig struct {
@@ -16,3 +27,65 @@ type TLSConfig struct {
CAFile string `yaml:"ca_file"` CAFile string `yaml:"ca_file"`
RequireClientCert bool `yaml:"require_client_cert"` RequireClientCert bool `yaml:"require_client_cert"`
} }
// DiscoveryInvokeURI builds a discovery invoke URI from the gRPC config.
func (c *Config) DiscoveryInvokeURI() string {
if c == nil {
return ""
}
address := strings.TrimSpace(c.Address)
addrHost, addrPort := splitHostPort(address)
host := strings.TrimSpace(c.AdvertiseHost)
if envKey := strings.TrimSpace(c.AdvertiseHostEnv); envKey != "" {
if value := strings.TrimSpace(os.Getenv(envKey)); value != "" {
host = value
}
}
if host != "" {
if parsedHost, parsedPort, err := net.SplitHostPort(host); err == nil && parsedPort != "" {
host = strings.TrimSpace(parsedHost)
addrPort = strings.TrimSpace(parsedPort)
}
}
if host == "" {
host = addrHost
}
if host == "" || host == "0.0.0.0" || host == "::" {
return ""
}
if addrPort == "" {
return ""
}
return fmt.Sprintf("%s://%s:%s", c.discoveryScheme(), host, addrPort)
}
func (c *Config) discoveryScheme() string {
scheme := strings.TrimSpace(c.AdvertiseScheme)
if envKey := strings.TrimSpace(c.AdvertiseSchemeEnv); envKey != "" {
if value := strings.TrimSpace(os.Getenv(envKey)); value != "" {
scheme = value
}
}
if scheme != "" {
return scheme
}
if c != nil && c.TLS != nil && strings.TrimSpace(c.TLS.CertFile) != "" && strings.TrimSpace(c.TLS.KeyFile) != "" {
return "grpcs"
}
return "grpc"
}
func splitHostPort(address string) (string, string) {
address = strings.TrimSpace(address)
if address == "" {
return "", ""
}
host, port, err := net.SplitHostPort(address)
if err != nil {
return "", ""
}
return strings.TrimSpace(host), strings.TrimSpace(port)
}

View File

@@ -39,9 +39,6 @@ func NewAnnouncer(logger mlogger.Logger, producer msg.Producer, sender string, a
if announce.ID == "" { if announce.ID == "" {
announce.ID = DefaultEntryID(announce.Service) announce.ID = DefaultEntryID(announce.Service)
} }
if announce.InvokeURI == "" && announce.Service != "" {
announce.InvokeURI = DefaultInvokeURI(announce.Service)
}
return &Announcer{ return &Announcer{
logger: logger, logger: logger,
producer: producer, producer: producer,