277 lines
7.6 KiB
Go
277 lines
7.6 KiB
Go
package serverimp
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/url"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
chainclient "github.com/tech/sendico/gateway/chain/client"
|
|
"github.com/tech/sendico/pkg/discovery"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const discoveryLogThrottle = 30 * time.Second
|
|
|
|
type discoveryEndpoint struct {
|
|
address string
|
|
insecure bool
|
|
raw string
|
|
}
|
|
|
|
func (e discoveryEndpoint) key() string {
|
|
return fmt.Sprintf("%s|%t", e.address, e.insecure)
|
|
}
|
|
|
|
type discoveryClientResolver struct {
|
|
logger mlogger.Logger
|
|
registry *discovery.Registry
|
|
|
|
mu sync.Mutex
|
|
|
|
chainClients map[string]chainclient.Client
|
|
|
|
lastSelection map[string]string
|
|
lastMissing map[string]time.Time
|
|
}
|
|
|
|
func newDiscoveryClientResolver(logger mlogger.Logger, registry *discovery.Registry) *discoveryClientResolver {
|
|
if logger != nil {
|
|
logger = logger.Named("discovery_clients")
|
|
}
|
|
return &discoveryClientResolver{
|
|
logger: logger,
|
|
registry: registry,
|
|
chainClients: map[string]chainclient.Client{},
|
|
lastSelection: map[string]string{},
|
|
lastMissing: map[string]time.Time{},
|
|
}
|
|
}
|
|
|
|
func (r *discoveryClientResolver) Close() {
|
|
if r == nil {
|
|
return
|
|
}
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
for key, client := range r.chainClients {
|
|
if client != nil {
|
|
_ = client.Close()
|
|
}
|
|
delete(r.chainClients, key)
|
|
}
|
|
}
|
|
|
|
type discoveryGatewayInvokeResolver struct {
|
|
resolver *discoveryClientResolver
|
|
}
|
|
|
|
func (r discoveryGatewayInvokeResolver) Resolve(ctx context.Context, invokeURI string) (chainclient.Client, error) {
|
|
if r.resolver == nil {
|
|
return nil, merrors.NoData("discovery: chain gateway unavailable")
|
|
}
|
|
return r.resolver.ChainClientByInvokeURI(ctx, invokeURI)
|
|
}
|
|
|
|
type discoveryChainGatewayResolver struct {
|
|
resolver *discoveryClientResolver
|
|
}
|
|
|
|
func (r discoveryChainGatewayResolver) Resolve(ctx context.Context, network string) (chainclient.Client, error) {
|
|
if r.resolver == nil {
|
|
return nil, merrors.NoData("discovery: chain gateway unavailable")
|
|
}
|
|
return r.resolver.ChainClientByNetwork(ctx, network)
|
|
}
|
|
|
|
func (r *discoveryClientResolver) ChainClientByInvokeURI(ctx context.Context, invokeURI string) (chainclient.Client, error) {
|
|
endpoint, err := parseDiscoveryEndpoint(invokeURI)
|
|
if err != nil {
|
|
r.logMissing("chain", "invalid chain gateway invoke uri", invokeURI, err)
|
|
return nil, err
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if client, ok := r.chainClients[endpoint.key()]; ok && client != nil {
|
|
return client, nil
|
|
}
|
|
|
|
client, dialErr := chainclient.New(ctx, chainclient.Config{
|
|
Address: endpoint.address,
|
|
Insecure: endpoint.insecure,
|
|
})
|
|
if dialErr != nil {
|
|
r.logMissing("chain", "failed to dial chain gateway", endpoint.raw, dialErr)
|
|
return nil, dialErr
|
|
}
|
|
r.chainClients[endpoint.key()] = client
|
|
return client, nil
|
|
}
|
|
|
|
func (r *discoveryClientResolver) ChainClientByNetwork(ctx context.Context, network string) (chainclient.Client, error) {
|
|
entry, ok := r.findChainEntry(network)
|
|
if !ok {
|
|
if strings.TrimSpace(network) == "" {
|
|
return nil, merrors.NoData("discovery: chain gateway unavailable")
|
|
}
|
|
return nil, merrors.NoData(fmt.Sprintf("discovery: chain gateway unavailable for network %s", strings.ToUpper(strings.TrimSpace(network))))
|
|
}
|
|
return r.ChainClientByInvokeURI(ctx, entry.InvokeURI)
|
|
}
|
|
|
|
func (r *discoveryClientResolver) findChainEntry(network string) (*discovery.RegistryEntry, bool) {
|
|
if r == nil || r.registry == nil {
|
|
r.logMissing("chain", "discovery registry unavailable", "", nil)
|
|
return nil, false
|
|
}
|
|
|
|
network = strings.ToUpper(strings.TrimSpace(network))
|
|
entries := r.registry.List(time.Now(), true)
|
|
matches := make([]discovery.RegistryEntry, 0)
|
|
for _, entry := range entries {
|
|
if discovery.NormalizeRail(entry.Rail) != discovery.RailCrypto {
|
|
continue
|
|
}
|
|
if strings.TrimSpace(entry.InvokeURI) == "" {
|
|
continue
|
|
}
|
|
if network != "" && !strings.EqualFold(strings.TrimSpace(entry.Network), network) {
|
|
continue
|
|
}
|
|
matches = append(matches, entry)
|
|
}
|
|
if len(matches) == 0 {
|
|
r.logMissing("chain", "discovery chain entry missing", "", nil)
|
|
return nil, false
|
|
}
|
|
|
|
sort.Slice(matches, func(i, j int) bool {
|
|
if matches[i].RoutingPriority != matches[j].RoutingPriority {
|
|
return matches[i].RoutingPriority > matches[j].RoutingPriority
|
|
}
|
|
if matches[i].ID != matches[j].ID {
|
|
return matches[i].ID < matches[j].ID
|
|
}
|
|
return matches[i].InstanceID < matches[j].InstanceID
|
|
})
|
|
|
|
entry := matches[0]
|
|
entryKey := discoveryEntryKey(entry)
|
|
r.logSelection("chain", entryKey, entry)
|
|
return &entry, true
|
|
}
|
|
|
|
func (r *discoveryClientResolver) logSelection(key, entryKey string, entry discovery.RegistryEntry) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
r.mu.Lock()
|
|
last := r.lastSelection[key]
|
|
if last == entryKey {
|
|
r.mu.Unlock()
|
|
return
|
|
}
|
|
r.lastSelection[key] = entryKey
|
|
r.mu.Unlock()
|
|
if r.logger == nil {
|
|
return
|
|
}
|
|
r.logger.Info("Discovery endpoint selected",
|
|
zap.String("service_key", key),
|
|
zap.String("service", entry.Service),
|
|
zap.String("rail", entry.Rail),
|
|
zap.String("network", entry.Network),
|
|
zap.String("entry_id", entry.ID),
|
|
zap.String("instance_id", entry.InstanceID),
|
|
zap.String("invoke_uri", entry.InvokeURI))
|
|
}
|
|
|
|
func (r *discoveryClientResolver) logMissing(key, message, invokeURI string, err error) {
|
|
if r == nil {
|
|
return
|
|
}
|
|
now := time.Now()
|
|
r.mu.Lock()
|
|
last := r.lastMissing[key]
|
|
if !last.IsZero() && now.Sub(last) < discoveryLogThrottle {
|
|
r.mu.Unlock()
|
|
return
|
|
}
|
|
r.lastMissing[key] = now
|
|
r.mu.Unlock()
|
|
|
|
if r.logger == nil {
|
|
return
|
|
}
|
|
fields := []zap.Field{zap.String("service_key", key)}
|
|
if invokeURI != "" {
|
|
fields = append(fields, zap.String("invoke_uri", strings.TrimSpace(invokeURI)))
|
|
}
|
|
if err != nil {
|
|
fields = append(fields, zap.Error(err))
|
|
}
|
|
r.logger.Warn(message, fields...)
|
|
}
|
|
|
|
func discoveryEntryKey(entry discovery.RegistryEntry) string {
|
|
return fmt.Sprintf("%s|%s|%s|%s|%s|%s",
|
|
strings.TrimSpace(entry.Service),
|
|
strings.TrimSpace(entry.ID),
|
|
strings.TrimSpace(entry.InstanceID),
|
|
strings.TrimSpace(entry.Rail),
|
|
strings.TrimSpace(entry.Network),
|
|
strings.TrimSpace(entry.InvokeURI))
|
|
}
|
|
|
|
func parseDiscoveryEndpoint(raw string) (discoveryEndpoint, error) {
|
|
raw = strings.TrimSpace(raw)
|
|
if raw == "" {
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri is required")
|
|
}
|
|
|
|
if !strings.Contains(raw, "://") {
|
|
if _, _, splitErr := net.SplitHostPort(raw); splitErr != nil {
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
|
|
}
|
|
return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil
|
|
}
|
|
|
|
parsed, err := url.Parse(raw)
|
|
if err != nil || parsed.Scheme == "" {
|
|
if err != nil {
|
|
return discoveryEndpoint{}, err
|
|
}
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
|
|
}
|
|
|
|
switch strings.ToLower(strings.TrimSpace(parsed.Scheme)) {
|
|
case "grpc":
|
|
address := strings.TrimSpace(parsed.Host)
|
|
if _, _, splitErr := net.SplitHostPort(address); splitErr != nil {
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
|
|
}
|
|
return discoveryEndpoint{address: address, insecure: true, raw: raw}, nil
|
|
case "grpcs":
|
|
address := strings.TrimSpace(parsed.Host)
|
|
if _, _, splitErr := net.SplitHostPort(address); splitErr != nil {
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
|
|
}
|
|
return discoveryEndpoint{address: address, insecure: false, raw: raw}, nil
|
|
case "dns", "passthrough":
|
|
return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil
|
|
default:
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: unsupported invoke uri scheme")
|
|
}
|
|
}
|