236 lines
6.2 KiB
Go
236 lines
6.2 KiB
Go
package ledger
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/url"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/pkg/discovery"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type DiscoveryConfig struct {
|
|
Logger mlogger.Logger
|
|
Registry *discovery.Registry
|
|
Timeout time.Duration
|
|
}
|
|
|
|
type discoveryEndpoint struct {
|
|
address string
|
|
insecure bool
|
|
raw string
|
|
}
|
|
|
|
func (e discoveryEndpoint) key() string {
|
|
return fmt.Sprintf("%s|%t", e.address, e.insecure)
|
|
}
|
|
|
|
type discoveryClient struct {
|
|
logger mlogger.Logger
|
|
registry *discovery.Registry
|
|
timeout time.Duration
|
|
|
|
mu sync.Mutex
|
|
client Client
|
|
endpointKey string
|
|
}
|
|
|
|
func NewDiscoveryClient(cfg DiscoveryConfig) (Client, error) {
|
|
if cfg.Registry == nil {
|
|
return nil, merrors.InvalidArgument("treasury ledger discovery registry is required", "registry")
|
|
}
|
|
if cfg.Timeout <= 0 {
|
|
cfg.Timeout = 5 * time.Second
|
|
}
|
|
logger := cfg.Logger
|
|
if logger != nil {
|
|
logger = logger.Named("treasury_ledger_discovery")
|
|
}
|
|
return &discoveryClient{
|
|
logger: logger,
|
|
registry: cfg.Registry,
|
|
timeout: cfg.Timeout,
|
|
}, nil
|
|
}
|
|
|
|
func (c *discoveryClient) Close() error {
|
|
if c == nil {
|
|
return nil
|
|
}
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.client != nil {
|
|
err := c.client.Close()
|
|
c.client = nil
|
|
c.endpointKey = ""
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *discoveryClient) GetAccount(ctx context.Context, accountID string) (*Account, error) {
|
|
client, err := c.resolveClient(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return client.GetAccount(ctx, accountID)
|
|
}
|
|
|
|
func (c *discoveryClient) GetBalance(ctx context.Context, accountID string) (*Balance, error) {
|
|
client, err := c.resolveClient(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return client.GetBalance(ctx, accountID)
|
|
}
|
|
|
|
func (c *discoveryClient) ExternalCredit(ctx context.Context, req PostRequest) (*OperationResult, error) {
|
|
client, err := c.resolveClient(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return client.ExternalCredit(ctx, req)
|
|
}
|
|
|
|
func (c *discoveryClient) ExternalDebit(ctx context.Context, req PostRequest) (*OperationResult, error) {
|
|
client, err := c.resolveClient(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return client.ExternalDebit(ctx, req)
|
|
}
|
|
|
|
func (c *discoveryClient) resolveClient(_ context.Context) (Client, error) {
|
|
if c == nil || c.registry == nil {
|
|
return nil, merrors.Internal("treasury ledger discovery is unavailable")
|
|
}
|
|
endpoint, err := c.resolveEndpoint()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
key := endpoint.key()
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.client != nil && c.endpointKey == key {
|
|
return c.client, nil
|
|
}
|
|
if c.client != nil {
|
|
_ = c.client.Close()
|
|
c.client = nil
|
|
c.endpointKey = ""
|
|
}
|
|
next, err := New(Config{
|
|
Endpoint: endpoint.address,
|
|
Timeout: c.timeout,
|
|
Insecure: endpoint.insecure,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.client = next
|
|
c.endpointKey = key
|
|
if c.logger != nil {
|
|
c.logger.Info("Discovered ledger endpoint selected",
|
|
zap.String("service", string(mservice.Ledger)),
|
|
zap.String("invoke_uri", endpoint.raw),
|
|
zap.String("address", endpoint.address),
|
|
zap.Bool("insecure", endpoint.insecure))
|
|
}
|
|
return c.client, nil
|
|
}
|
|
|
|
func (c *discoveryClient) resolveEndpoint() (discoveryEndpoint, error) {
|
|
entries := c.registry.List(time.Now(), true)
|
|
type match struct {
|
|
entry discovery.RegistryEntry
|
|
opMatch bool
|
|
}
|
|
matches := make([]match, 0, len(entries))
|
|
requiredOps := discovery.LedgerServiceOperations()
|
|
for _, entry := range entries {
|
|
if !matchesService(entry.Service, mservice.Ledger) {
|
|
continue
|
|
}
|
|
matches = append(matches, match{
|
|
entry: entry,
|
|
opMatch: discovery.HasAnyOperation(entry.Operations, requiredOps),
|
|
})
|
|
}
|
|
if len(matches) == 0 {
|
|
return discoveryEndpoint{}, merrors.NoData("discovery: ledger service unavailable")
|
|
}
|
|
sort.Slice(matches, func(i, j int) bool {
|
|
if matches[i].opMatch != matches[j].opMatch {
|
|
return matches[i].opMatch
|
|
}
|
|
if matches[i].entry.RoutingPriority != matches[j].entry.RoutingPriority {
|
|
return matches[i].entry.RoutingPriority > matches[j].entry.RoutingPriority
|
|
}
|
|
if matches[i].entry.ID != matches[j].entry.ID {
|
|
return matches[i].entry.ID < matches[j].entry.ID
|
|
}
|
|
return matches[i].entry.InstanceID < matches[j].entry.InstanceID
|
|
})
|
|
return parseDiscoveryEndpoint(matches[0].entry.InvokeURI)
|
|
}
|
|
|
|
func matchesService(service string, candidate mservice.Type) bool {
|
|
service = strings.TrimSpace(service)
|
|
if service == "" || strings.TrimSpace(string(candidate)) == "" {
|
|
return false
|
|
}
|
|
return strings.EqualFold(service, strings.TrimSpace(string(candidate)))
|
|
}
|
|
|
|
func parseDiscoveryEndpoint(raw string) (discoveryEndpoint, error) {
|
|
raw = strings.TrimSpace(raw)
|
|
if raw == "" {
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri is required")
|
|
}
|
|
|
|
if !strings.Contains(raw, "://") {
|
|
if _, _, splitErr := net.SplitHostPort(raw); splitErr != nil {
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
|
|
}
|
|
return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil
|
|
}
|
|
|
|
parsed, err := url.Parse(raw)
|
|
if err != nil || parsed.Scheme == "" {
|
|
if err != nil {
|
|
return discoveryEndpoint{}, err
|
|
}
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
|
|
}
|
|
|
|
scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme))
|
|
switch scheme {
|
|
case "grpc":
|
|
address := strings.TrimSpace(parsed.Host)
|
|
if _, _, splitErr := net.SplitHostPort(address); splitErr != nil {
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
|
|
}
|
|
return discoveryEndpoint{address: address, insecure: true, raw: raw}, nil
|
|
case "grpcs":
|
|
address := strings.TrimSpace(parsed.Host)
|
|
if _, _, splitErr := net.SplitHostPort(address); splitErr != nil {
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port")
|
|
}
|
|
return discoveryEndpoint{address: address, insecure: false, raw: raw}, nil
|
|
case "dns", "passthrough":
|
|
return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil
|
|
default:
|
|
return discoveryEndpoint{}, merrors.InvalidArgument("discovery: unsupported invoke uri scheme")
|
|
}
|
|
}
|