discovery: +invoke url #271

Merged
tech merged 1 commits from discovery-270 into main 2026-01-19 10:07:55 +00:00
26 changed files with 170 additions and 32 deletions

View File

@@ -4,6 +4,7 @@ runtime:
grpc:
network: tcp
address: ":50060"
advertise_host: "sendico_billing_fees"
enable_reflection: true
enable_health: true

View File

@@ -129,6 +129,11 @@ func (i *Imp) Start() error {
if oracleClient != nil {
opts = append(opts, fees.WithOracleClient(oracleClient))
}
if cfg.GRPC != nil {
if invokeURI := cfg.GRPC.DiscoveryInvokeURI(); invokeURI != "" {
opts = append(opts, fees.WithDiscoveryInvokeURI(invokeURI))
}
}
svc := fees.NewService(logger, repo, producer, opts...)
i.service = svc
return svc, nil

View File

@@ -1,6 +1,8 @@
package fees
import (
"strings"
internalcalculator "github.com/tech/sendico/billing/fees/internal/service/fees/internal/calculator"
oracleclient "github.com/tech/sendico/fx/oracle/client"
clockpkg "github.com/tech/sendico/pkg/clock"
@@ -46,3 +48,10 @@ func WithFeeResolver(r FeeResolver) Option {
}
}
}
// WithDiscoveryInvokeURI sets the invoke URI used when announcing the service in discovery.
func WithDiscoveryInvokeURI(invokeURI string) Option {
return func(s *Service) {
s.invokeURI = strings.TrimSpace(invokeURI)
}
}

View File

@@ -40,6 +40,7 @@ type Service struct {
oracle oracleclient.Client
resolver FeeResolver
announcer *discovery.Announcer
invokeURI string
feesv1.UnimplementedFeeEngineServer
}
@@ -93,6 +94,7 @@ func (s *Service) startDiscoveryAnnouncer() {
announce := discovery.Announcement{
Service: "BILLING_FEES",
Operations: []string{"fee.calc"},
InvokeURI: s.invokeURI,
Version: appversion.Create().Short(),
}
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.FeePlans), announce)

View File

@@ -4,6 +4,7 @@ runtime:
grpc:
network: tcp
address: ":50051"
advertise_host: "sendico_fx_oracle"
enable_reflection: true
enable_health: true

View File

@@ -63,7 +63,7 @@ func (i *Imp) Start() error {
}
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
svc := oracle.NewService(logger, repo, producer)
svc := oracle.NewService(logger, repo, producer, cfg.GRPC.DiscoveryInvokeURI())
i.service = svc
return svc, nil
}

View File

@@ -42,15 +42,17 @@ type Service struct {
storage storage.Repository
producer pmessaging.Producer
announcer *discovery.Announcer
invokeURI string
oraclev1.UnimplementedOracleServer
}
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer) *Service {
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, invokeURI string) *Service {
initMetrics()
svc := &Service{
logger: logger.Named("oracle"),
storage: repo,
producer: prod,
logger: logger.Named("oracle"),
storage: repo,
producer: prod,
invokeURI: strings.TrimSpace(invokeURI),
}
svc.startDiscoveryAnnouncer()
return svc
@@ -78,6 +80,7 @@ func (s *Service) startDiscoveryAnnouncer() {
announce := discovery.Announcement{
Service: "FX_ORACLE",
Operations: []string{"fx.quote"},
InvokeURI: s.invokeURI,
Version: appversion.Create().Short(),
}
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.FXOracle), announce)

View File

@@ -142,7 +142,7 @@ func TestServiceGetQuoteFirm(t *testing.T) {
}
repo.currencies = currencyStoreStub{}
svc := NewService(zap.NewNop(), repo, nil)
svc := NewService(zap.NewNop(), repo, nil, "")
req := &oraclev1.GetQuoteRequest{
Meta: &oraclev1.RequestMeta{
@@ -189,7 +189,7 @@ func TestServiceGetQuoteRateNotFound(t *testing.T) {
return nil, merrors.ErrNoData
}},
}
svc := NewService(zap.NewNop(), repo, nil)
svc := NewService(zap.NewNop(), repo, nil, "")
_, err := svc.GetQuote(context.Background(), &oraclev1.GetQuoteRequest{
Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"},
@@ -263,7 +263,7 @@ func TestServiceGetQuoteCrossRate(t *testing.T) {
repo.quotes = &quotesStoreStub{}
repo.currencies = currencyStoreStub{}
svc := NewService(zap.NewNop(), repo, nil)
svc := NewService(zap.NewNop(), repo, nil, "")
req := &oraclev1.GetQuoteRequest{
Pair: &fxv1.CurrencyPair{Base: "EUR", Quote: "RUB"},
@@ -352,7 +352,7 @@ func TestServiceLatestRateCross(t *testing.T) {
repo.quotes = &quotesStoreStub{}
repo.currencies = currencyStoreStub{}
svc := NewService(zap.NewNop(), repo, nil)
svc := NewService(zap.NewNop(), repo, nil, "")
resp, err := svc.LatestRate(context.Background(), &oraclev1.LatestRateRequest{
Pair: &fxv1.CurrencyPair{Base: "EUR", Quote: "RUB"},
@@ -390,7 +390,7 @@ func TestServiceValidateQuote(t *testing.T) {
},
},
}
svc := NewService(zap.NewNop(), repo, nil)
svc := NewService(zap.NewNop(), repo, nil, "")
resp, err := svc.ValidateQuote(context.Background(), &oraclev1.ValidateQuoteRequest{QuoteRef: "q1"})
if err != nil {
@@ -409,7 +409,7 @@ func TestServiceConsumeQuoteExpired(t *testing.T) {
},
},
}
svc := NewService(zap.NewNop(), repo, nil)
svc := NewService(zap.NewNop(), repo, nil, "")
_, err := svc.ConsumeQuote(context.Background(), &oraclev1.ConsumeQuoteRequest{QuoteRef: "q1", LedgerTxnRef: "ledger"})
if err == nil {
@@ -439,7 +439,7 @@ func TestServiceLatestRateSuccess(t *testing.T) {
},
},
}
svc := NewService(zap.NewNop(), repo, nil)
svc := NewService(zap.NewNop(), repo, nil, "")
resp, err := svc.LatestRate(context.Background(), &oraclev1.LatestRateRequest{Pair: &fxv1.CurrencyPair{Base: "USD", Quote: "EUR"}})
if err != nil {
@@ -456,7 +456,7 @@ func TestServiceListPairs(t *testing.T) {
return []*model.Pair{{Pair: model.CurrencyPair{Base: "USD", Quote: "EUR"}}}, nil
}},
}
svc := NewService(zap.NewNop(), repo, nil)
svc := NewService(zap.NewNop(), repo, nil, "")
resp, err := svc.ListPairs(context.Background(), &oraclev1.ListPairsRequest{})
if err != nil {

View File

@@ -4,6 +4,7 @@ runtime:
grpc:
network: tcp
address: ":50070"
advertise_host: "sendico_chain_gateway"
enable_reflection: true
enable_health: true

View File

@@ -148,7 +148,12 @@ func (i *Imp) Start() error {
}
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
invokeURI := ""
if cfg.GRPC != nil {
invokeURI = cfg.GRPC.DiscoveryInvokeURI()
}
opts := []gatewayservice.Option{
gatewayservice.WithDiscoveryInvokeURI(invokeURI),
gatewayservice.WithNetworks(networkConfigs),
gatewayservice.WithServiceWallet(walletConfig),
gatewayservice.WithKeyManager(keyManager),

View File

@@ -83,3 +83,10 @@ func WithSettings(settings CacheSettings) Option {
s.settings = settings.withDefaults()
}
}
// WithDiscoveryInvokeURI sets the invoke URI used when announcing the gateway.
func WithDiscoveryInvokeURI(invokeURI string) Option {
return func(s *Service) {
s.invokeURI = strings.TrimSpace(invokeURI)
}
}

View File

@@ -51,6 +51,7 @@ type Service struct {
drivers *drivers.Registry
commands commands.Registry
announcers []*discovery.Announcer
invokeURI string
connectorv1.UnimplementedConnectorServiceServer
}
@@ -209,7 +210,7 @@ func (s *Service) startDiscoveryAnnouncers() {
Network: network.Name,
Operations: []string{"balance.read", "payin.crypto", "payout.crypto", "fee.send"},
Currencies: currencies,
InvokeURI: discovery.DefaultInvokeURI(string(mservice.ChainGateway)),
InvokeURI: s.invokeURI,
Version: version,
}
announcer := discovery.NewAnnouncer(s.logger, s.producer, string(mservice.ChainGateway), announce)

View File

@@ -4,6 +4,7 @@ runtime:
grpc:
network: tcp
address: ":50075"
advertise_host: "sendico_mntx_gateway"
enable_reflection: true
enable_health: true

View File

@@ -184,7 +184,12 @@ func (i *Imp) Start() error {
)
serviceFactory := func(logger mlogger.Logger, _ struct{}, producer msg.Producer) (grpcapp.Service, error) {
invokeURI := ""
if cfg.GRPC != nil {
invokeURI = cfg.GRPC.DiscoveryInvokeURI()
}
svc := mntxservice.NewService(logger,
mntxservice.WithDiscoveryInvokeURI(invokeURI),
mntxservice.WithProducer(producer),
mntxservice.WithMonetixConfig(monetixCfg),
mntxservice.WithGatewayDescriptor(gatewayDescriptor),

View File

@@ -2,6 +2,7 @@ package gateway
import (
"net/http"
"strings"
"github.com/tech/sendico/gateway/mntx/internal/service/monetix"
"github.com/tech/sendico/pkg/clock"
@@ -52,3 +53,10 @@ func WithGatewayDescriptor(descriptor *gatewayv1.GatewayInstanceDescriptor) Opti
}
}
}
// WithDiscoveryInvokeURI sets the invoke URI used when announcing the gateway.
func WithDiscoveryInvokeURI(invokeURI string) Option {
return func(s *Service) {
s.invokeURI = strings.TrimSpace(invokeURI)
}
}

View File

@@ -30,6 +30,7 @@ type Service struct {
card *cardPayoutProcessor
gatewayDescriptor *gatewayv1.GatewayInstanceDescriptor
announcer *discovery.Announcer
invokeURI string
connectorv1.UnimplementedConnectorServiceServer
}
@@ -145,7 +146,7 @@ func (s *Service) startDiscoveryAnnouncer() {
Service: "CARD_PAYOUT_RAIL_GATEWAY",
Rail: "CARD_PAYOUT",
Operations: []string{"payout.card"},
InvokeURI: discovery.DefaultInvokeURI(string(mservice.MntxGateway)),
InvokeURI: s.invokeURI,
Version: appversion.Create().Short(),
}
if s.gatewayDescriptor != nil {

View File

@@ -4,6 +4,7 @@ runtime:
grpc:
network: tcp
address: ":50080"
advertise_host: "sendico_payment_gateway"
enable_reflection: true
enable_health: true

View File

@@ -85,11 +85,16 @@ func (i *Imp) Start() error {
}
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
invokeURI := ""
if cfg.GRPC != nil {
invokeURI = cfg.GRPC.DiscoveryInvokeURI()
}
gwCfg := gateway.Config{
Rail: cfg.Gateway.Rail,
TargetChatIDEnv: cfg.Gateway.TargetChatIDEnv,
TimeoutSeconds: cfg.Gateway.TimeoutSeconds,
AcceptedUserIDs: cfg.Gateway.AcceptedUserIDs,
InvokeURI: invokeURI,
}
svc := gateway.NewService(logger, repo, producer, broker, gwCfg)
i.service = svc

View File

@@ -50,6 +50,7 @@ type Config struct {
TargetChatIDEnv string
TimeoutSeconds int32
AcceptedUserIDs []string
InvokeURI string
}
type Service struct {
@@ -61,6 +62,7 @@ type Service struct {
rail string
chatID string
announcer *discovery.Announcer
invokeURI string
mu sync.Mutex
pending map[string]*model.PaymentGatewayIntent
@@ -81,6 +83,7 @@ func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Pro
broker: broker,
cfg: cfg,
rail: strings.TrimSpace(cfg.Rail),
invokeURI: strings.TrimSpace(cfg.InvokeURI),
pending: map[string]*model.PaymentGatewayIntent{},
}
svc.chatID = strings.TrimSpace(readEnv(cfg.TargetChatIDEnv))
@@ -411,7 +414,7 @@ func (s *Service) startAnnouncer() {
Service: string(mservice.PaymentGateway),
Rail: s.rail,
Operations: caps,
InvokeURI: discovery.DefaultInvokeURI(string(mservice.PaymentGateway)),
InvokeURI: s.invokeURI,
}
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.PaymentGateway), announce)
s.announcer.Start()

View File

@@ -4,6 +4,7 @@ runtime:
grpc:
network: tcp
address: ":50052"
advertise_host: "sendico_ledger"
enable_reflection: true
enable_health: true

View File

@@ -116,7 +116,11 @@ func (i *Imp) Start() error {
}
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
svc := ledger.NewService(logger, repo, producer, feesClient, feesTimeout)
invokeURI := ""
if cfg.GRPC != nil {
invokeURI = cfg.GRPC.DiscoveryInvokeURI()
}
svc := ledger.NewService(logger, repo, producer, feesClient, feesTimeout, invokeURI)
i.service = svc
return svc, nil
}

View File

@@ -44,6 +44,7 @@ type Service struct {
producer pmessaging.Producer
fees feesDependency
announcer *discovery.Announcer
invokeURI string
outbox struct {
once sync.Once
@@ -61,14 +62,15 @@ func (f feesDependency) available() bool {
return f.client != nil
}
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, feesClient feesv1.FeeEngineClient, feesTimeout time.Duration) *Service {
func NewService(logger mlogger.Logger, repo storage.Repository, prod pmessaging.Producer, feesClient feesv1.FeeEngineClient, feesTimeout time.Duration, invokeURI string) *Service {
// Initialize Prometheus metrics
initMetrics()
service := &Service{
logger: logger.Named("ledger"),
storage: repo,
producer: prod,
logger: logger.Named("ledger"),
storage: repo,
producer: prod,
invokeURI: strings.TrimSpace(invokeURI),
fees: feesDependency{
client: feesClient,
timeout: feesTimeout,
@@ -204,7 +206,7 @@ func (s *Service) startDiscoveryAnnouncer() {
announce := discovery.Announcement{
Service: "LEDGER",
Operations: []string{"balance.read", "ledger.debit", "ledger.credit"},
InvokeURI: discovery.DefaultInvokeURI(string(mservice.Ledger)),
InvokeURI: s.invokeURI,
Version: appversion.Create().Short(),
}
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.Ledger), announce)

View File

@@ -4,6 +4,7 @@ runtime:
grpc:
network: tcp
address: ":50062"
advertise_host: "sendico_payments_orchestrator"
enable_reflection: true
enable_health: true

View File

@@ -34,6 +34,7 @@ func (i *Imp) initDiscovery(cfg *config) {
announce := discovery.Announcement{
Service: "PAYMENTS_ORCHESTRATOR",
Operations: []string{"payment.quote", "payment.initiate"},
InvokeURI: cfg.GRPC.DiscoveryInvokeURI(),
Version: appversion.Create().Short(),
}
i.discoveryAnnouncer = discovery.NewAnnouncer(i.logger, producer, string(mservice.PaymentOrchestrator), announce)

View File

@@ -1,13 +1,24 @@
package grpcimp
import (
"fmt"
"net"
"os"
"strings"
)
type Config struct {
Network string `yaml:"network"`
Address string `yaml:"address"`
EnableReflection bool `yaml:"enable_reflection"`
EnableHealth bool `yaml:"enable_health"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxSendMsgSize int `yaml:"max_send_msg_size"`
TLS *TLSConfig `yaml:"tls"`
Network string `yaml:"network"`
Address string `yaml:"address"`
AdvertiseHost string `yaml:"advertise_host"`
AdvertiseHostEnv string `yaml:"advertise_host_env"`
AdvertiseScheme string `yaml:"advertise_scheme"`
AdvertiseSchemeEnv string `yaml:"advertise_scheme_env"`
EnableReflection bool `yaml:"enable_reflection"`
EnableHealth bool `yaml:"enable_health"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
MaxSendMsgSize int `yaml:"max_send_msg_size"`
TLS *TLSConfig `yaml:"tls"`
}
type TLSConfig struct {
@@ -16,3 +27,65 @@ type TLSConfig struct {
CAFile string `yaml:"ca_file"`
RequireClientCert bool `yaml:"require_client_cert"`
}
// DiscoveryInvokeURI builds a discovery invoke URI from the gRPC config.
func (c *Config) DiscoveryInvokeURI() string {
if c == nil {
return ""
}
address := strings.TrimSpace(c.Address)
addrHost, addrPort := splitHostPort(address)
host := strings.TrimSpace(c.AdvertiseHost)
if envKey := strings.TrimSpace(c.AdvertiseHostEnv); envKey != "" {
if value := strings.TrimSpace(os.Getenv(envKey)); value != "" {
host = value
}
}
if host != "" {
if parsedHost, parsedPort, err := net.SplitHostPort(host); err == nil && parsedPort != "" {
host = strings.TrimSpace(parsedHost)
addrPort = strings.TrimSpace(parsedPort)
}
}
if host == "" {
host = addrHost
}
if host == "" || host == "0.0.0.0" || host == "::" {
return ""
}
if addrPort == "" {
return ""
}
return fmt.Sprintf("%s://%s:%s", c.discoveryScheme(), host, addrPort)
}
func (c *Config) discoveryScheme() string {
scheme := strings.TrimSpace(c.AdvertiseScheme)
if envKey := strings.TrimSpace(c.AdvertiseSchemeEnv); envKey != "" {
if value := strings.TrimSpace(os.Getenv(envKey)); value != "" {
scheme = value
}
}
if scheme != "" {
return scheme
}
if c != nil && c.TLS != nil && strings.TrimSpace(c.TLS.CertFile) != "" && strings.TrimSpace(c.TLS.KeyFile) != "" {
return "grpcs"
}
return "grpc"
}
func splitHostPort(address string) (string, string) {
address = strings.TrimSpace(address)
if address == "" {
return "", ""
}
host, port, err := net.SplitHostPort(address)
if err != nil {
return "", ""
}
return strings.TrimSpace(host), strings.TrimSpace(port)
}

View File

@@ -39,9 +39,6 @@ func NewAnnouncer(logger mlogger.Logger, producer msg.Producer, sender string, a
if announce.ID == "" {
announce.ID = DefaultEntryID(announce.Service)
}
if announce.InvokeURI == "" && announce.Service != "" {
announce.InvokeURI = DefaultInvokeURI(announce.Service)
}
return &Announcer{
logger: logger,
producer: producer,