package ledger import ( "context" "fmt" "net" "net/url" "sort" "strings" "sync" "time" "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mservice" "go.uber.org/zap" ) type DiscoveryConfig struct { Logger mlogger.Logger Registry *discovery.Registry Timeout time.Duration } type discoveryEndpoint struct { address string insecure bool raw string } func (e discoveryEndpoint) key() string { return fmt.Sprintf("%s|%t", e.address, e.insecure) } type discoveryClient struct { logger mlogger.Logger registry *discovery.Registry timeout time.Duration mu sync.Mutex client Client endpointKey string } func NewDiscoveryClient(cfg DiscoveryConfig) (Client, error) { if cfg.Registry == nil { return nil, merrors.InvalidArgument("treasury ledger discovery registry is required", "registry") } if cfg.Timeout <= 0 { cfg.Timeout = 5 * time.Second } logger := cfg.Logger if logger != nil { logger = logger.Named("treasury_ledger_discovery") } return &discoveryClient{ logger: logger, registry: cfg.Registry, timeout: cfg.Timeout, }, nil } func (c *discoveryClient) Close() error { if c == nil { return nil } c.mu.Lock() defer c.mu.Unlock() if c.client != nil { err := c.client.Close() c.client = nil c.endpointKey = "" return err } return nil } func (c *discoveryClient) GetAccount(ctx context.Context, accountID string) (*Account, error) { client, err := c.resolveClient(ctx) if err != nil { return nil, err } return client.GetAccount(ctx, accountID) } func (c *discoveryClient) GetBalance(ctx context.Context, accountID string) (*Balance, error) { client, err := c.resolveClient(ctx) if err != nil { return nil, err } return client.GetBalance(ctx, accountID) } func (c *discoveryClient) ExternalCredit(ctx context.Context, req PostRequest) (*OperationResult, error) { client, err := c.resolveClient(ctx) if err != nil { return nil, err } return client.ExternalCredit(ctx, req) } func (c *discoveryClient) ExternalDebit(ctx context.Context, req PostRequest) (*OperationResult, error) { client, err := c.resolveClient(ctx) if err != nil { return nil, err } return client.ExternalDebit(ctx, req) } func (c *discoveryClient) resolveClient(_ context.Context) (Client, error) { if c == nil || c.registry == nil { return nil, merrors.Internal("treasury ledger discovery is unavailable") } endpoint, err := c.resolveEndpoint() if err != nil { return nil, err } key := endpoint.key() c.mu.Lock() defer c.mu.Unlock() if c.client != nil && c.endpointKey == key { return c.client, nil } if c.client != nil { _ = c.client.Close() c.client = nil c.endpointKey = "" } next, err := New(Config{ Endpoint: endpoint.address, Timeout: c.timeout, Insecure: endpoint.insecure, }) if err != nil { return nil, err } c.client = next c.endpointKey = key if c.logger != nil { c.logger.Info("Discovered ledger endpoint selected", zap.String("service", string(mservice.Ledger)), zap.String("invoke_uri", endpoint.raw), zap.String("address", endpoint.address), zap.Bool("insecure", endpoint.insecure)) } return c.client, nil } func (c *discoveryClient) resolveEndpoint() (discoveryEndpoint, error) { entries := c.registry.List(time.Now(), true) type match struct { entry discovery.RegistryEntry opMatch bool } matches := make([]match, 0, len(entries)) requiredOps := discovery.LedgerServiceOperations() for _, entry := range entries { if !matchesService(entry.Service, mservice.Ledger) { continue } matches = append(matches, match{ entry: entry, opMatch: discovery.HasAnyOperation(entry.Operations, requiredOps), }) } if len(matches) == 0 { return discoveryEndpoint{}, merrors.NoData("discovery: ledger service unavailable") } sort.Slice(matches, func(i, j int) bool { if matches[i].opMatch != matches[j].opMatch { return matches[i].opMatch } if matches[i].entry.RoutingPriority != matches[j].entry.RoutingPriority { return matches[i].entry.RoutingPriority > matches[j].entry.RoutingPriority } if matches[i].entry.ID != matches[j].entry.ID { return matches[i].entry.ID < matches[j].entry.ID } return matches[i].entry.InstanceID < matches[j].entry.InstanceID }) return parseDiscoveryEndpoint(matches[0].entry.InvokeURI) } func matchesService(service string, candidate mservice.Type) bool { service = strings.TrimSpace(service) if service == "" || strings.TrimSpace(string(candidate)) == "" { return false } return strings.EqualFold(service, strings.TrimSpace(string(candidate))) } func parseDiscoveryEndpoint(raw string) (discoveryEndpoint, error) { raw = strings.TrimSpace(raw) if raw == "" { return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri is required") } if !strings.Contains(raw, "://") { if _, _, splitErr := net.SplitHostPort(raw); splitErr != nil { return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") } return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil } parsed, err := url.Parse(raw) if err != nil || parsed.Scheme == "" { if err != nil { return discoveryEndpoint{}, err } return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") } scheme := strings.ToLower(strings.TrimSpace(parsed.Scheme)) switch scheme { case "grpc": address := strings.TrimSpace(parsed.Host) if _, _, splitErr := net.SplitHostPort(address); splitErr != nil { return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") } return discoveryEndpoint{address: address, insecure: true, raw: raw}, nil case "grpcs": address := strings.TrimSpace(parsed.Host) if _, _, splitErr := net.SplitHostPort(address); splitErr != nil { return discoveryEndpoint{}, merrors.InvalidArgument("discovery: invoke uri must include host:port") } return discoveryEndpoint{address: address, insecure: false, raw: raw}, nil case "dns", "passthrough": return discoveryEndpoint{address: raw, insecure: true, raw: raw}, nil default: return discoveryEndpoint{}, merrors.InvalidArgument("discovery: unsupported invoke uri scheme") } }