122 lines
3.4 KiB
Go
122 lines
3.4 KiB
Go
package serverimp
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"strings"
|
|
"time"
|
|
|
|
oracleclient "github.com/tech/sendico/fx/oracle/client"
|
|
feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
func (i *Imp) initDependencies(cfg *config) *clientDependencies {
|
|
deps := &clientDependencies{}
|
|
if cfg == nil {
|
|
return deps
|
|
}
|
|
|
|
if feesAddress := cfg.Fees.resolveAddress(); feesAddress != "" {
|
|
dialCtx, cancel := context.WithTimeout(context.Background(), cfg.Fees.dialTimeout())
|
|
conn, err := dialGRPC(dialCtx, cfg.Fees, feesAddress)
|
|
cancel()
|
|
if err != nil {
|
|
i.logger.Warn("Failed to dial fee engine", zap.Error(err), zap.String("address", feesAddress))
|
|
} else {
|
|
deps.feesConn = conn
|
|
deps.feesClient = feesv1.NewFeeEngineClient(conn)
|
|
}
|
|
}
|
|
|
|
if oracleAddress := cfg.Oracle.resolveAddress(); oracleAddress != "" {
|
|
client, err := oracleclient.New(context.Background(), oracleclient.Config{
|
|
Address: oracleAddress,
|
|
DialTimeout: cfg.Oracle.dialTimeout(),
|
|
CallTimeout: cfg.Oracle.callTimeout(),
|
|
Insecure: cfg.Oracle.InsecureTransport,
|
|
})
|
|
if err != nil {
|
|
i.logger.Warn("Failed to initialise oracle client", zap.Error(err), zap.String("address", oracleAddress))
|
|
} else {
|
|
deps.oracleClient = client
|
|
}
|
|
}
|
|
|
|
if i != nil && i.discoveryReg != nil {
|
|
i.discoveryClients = newDiscoveryClientResolver(i.logger, i.discoveryReg)
|
|
deps.gatewayResolver = discoveryChainGatewayResolver{resolver: i.discoveryClients}
|
|
deps.gatewayInvokeResolver = discoveryGatewayInvokeResolver{resolver: i.discoveryClients}
|
|
ledgerClient, ledgerErr := i.discoveryClients.LedgerClient(context.Background())
|
|
if ledgerErr != nil {
|
|
i.logger.Warn("Failed to initialise ledger client from discovery", zap.Error(ledgerErr))
|
|
} else {
|
|
deps.ledgerClient = ledgerClient
|
|
}
|
|
} else if i != nil && i.logger != nil {
|
|
i.logger.Warn("Discovery registry unavailable; chain gateway clients disabled")
|
|
}
|
|
|
|
return deps
|
|
}
|
|
|
|
func (i *Imp) closeDependencies() {
|
|
if i.deps == nil {
|
|
return
|
|
}
|
|
if i.deps.oracleClient != nil {
|
|
_ = i.deps.oracleClient.Close()
|
|
i.deps.oracleClient = nil
|
|
}
|
|
if i.discoveryClients != nil {
|
|
i.discoveryClients.Close()
|
|
i.discoveryClients = nil
|
|
}
|
|
if i.deps.feesConn != nil {
|
|
_ = i.deps.feesConn.Close()
|
|
i.deps.feesConn = nil
|
|
}
|
|
}
|
|
|
|
func dialGRPC(ctx context.Context, cfg clientConfig, address string) (*grpc.ClientConn, error) {
|
|
address = strings.TrimSpace(address)
|
|
if ctx == nil {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
}
|
|
dialOpts := make([]grpc.DialOption, 0, 1)
|
|
if cfg.InsecureTransport {
|
|
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
} else {
|
|
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
|
|
}
|
|
|
|
conn, err := grpc.NewClient(address, dialOpts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
conn.Connect()
|
|
if err := waitUntilReady(ctx, conn); err != nil {
|
|
_ = conn.Close()
|
|
return nil, err
|
|
}
|
|
return conn, nil
|
|
}
|
|
|
|
func waitUntilReady(ctx context.Context, conn *grpc.ClientConn) error {
|
|
for {
|
|
state := conn.GetState()
|
|
if state == connectivity.Ready {
|
|
return nil
|
|
}
|
|
if !conn.WaitForStateChange(ctx, state) {
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
}
|