diff --git a/api/edge/bff/internal/server/paymentapiimp/documents.go b/api/edge/bff/internal/server/paymentapiimp/documents.go index 909863be..8496a57c 100644 --- a/api/edge/bff/internal/server/paymentapiimp/documents.go +++ b/api/edge/bff/internal/server/paymentapiimp/documents.go @@ -105,10 +105,7 @@ func (a *PaymentAPI) getActDocument(r *http.Request, account *model.Account, _ * } func (a *PaymentAPI) fetchActDocument(ctx context.Context, invokeURI, paymentRef string) (*documentsv1.GetDocumentResponse, error) { - dialCtx, cancel := context.WithTimeout(ctx, documentsDialTimeout) - defer cancel() - - conn, err := grpc.DialContext(dialCtx, invokeURI, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient(invokeURI, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, merrors.InternalWrap(err, "dial billing documents") } diff --git a/api/edge/bff/internal/server/paymentapiimp/service.go b/api/edge/bff/internal/server/paymentapiimp/service.go index 76c18cd5..2e04706e 100644 --- a/api/edge/bff/internal/server/paymentapiimp/service.go +++ b/api/edge/bff/internal/server/paymentapiimp/service.go @@ -213,9 +213,6 @@ func newQuotationClient(ctx context.Context, cfg quotationClientConfig, opts ... return nil, merrors.InvalidArgument("payment quotation: address is required") } - dialCtx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) - defer cancel() - dialOpts := make([]grpc.DialOption, 0, len(opts)+1) dialOpts = append(dialOpts, opts...) if cfg.Insecure { @@ -224,7 +221,7 @@ func newQuotationClient(ctx context.Context, cfg quotationClientConfig, opts ... dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - conn, err := grpc.DialContext(dialCtx, cfg.Address, dialOpts...) + conn, err := grpc.NewClient(cfg.Address, dialOpts...) if err != nil { return nil, merrors.InternalWrap(err, fmt.Sprintf("payment-quotation: dial %s", cfg.Address)) } diff --git a/api/edge/bff/internal/server/walletapiimp/balance.go b/api/edge/bff/internal/server/walletapiimp/balance.go index b71b3b28..5e081783 100644 --- a/api/edge/bff/internal/server/walletapiimp/balance.go +++ b/api/edge/bff/internal/server/walletapiimp/balance.go @@ -227,10 +227,6 @@ func (a *WalletAPI) queryBalanceFromGateways(ctx context.Context, gateways []dis } func (a *WalletAPI) queryGatewayBalance(ctx context.Context, gateway discovery.GatewaySummary, walletRef string) (*connectorv1.Balance, error) { - // Create connection with timeout - dialCtx, cancel := context.WithTimeout(ctx, a.dialTimeout) - defer cancel() - var dialOpts []grpc.DialOption if a.insecure { dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -238,7 +234,7 @@ func (a *WalletAPI) queryGatewayBalance(ctx context.Context, gateway discovery.G dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - conn, err := grpc.DialContext(dialCtx, gateway.InvokeURI, dialOpts...) + conn, err := grpc.NewClient(gateway.InvokeURI, dialOpts...) if err != nil { return nil, merrors.InternalWrap(err, "dial gateway") } diff --git a/api/edge/bff/internal/server/walletapiimp/create.go b/api/edge/bff/internal/server/walletapiimp/create.go index 2f49ed31..56eab75e 100644 --- a/api/edge/bff/internal/server/walletapiimp/create.go +++ b/api/edge/bff/internal/server/walletapiimp/create.go @@ -162,10 +162,6 @@ func findGatewayForNetwork(gateways []discovery.GatewaySummary, network string) } func (a *WalletAPI) createWalletOnGateway(ctx context.Context, gateway discovery.GatewaySummary, req *connectorv1.OpenAccountRequest) (string, error) { - // Create connection with timeout - dialCtx, cancel := context.WithTimeout(ctx, a.dialTimeout) - defer cancel() - var dialOpts []grpc.DialOption if a.insecure { dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -173,7 +169,7 @@ func (a *WalletAPI) createWalletOnGateway(ctx context.Context, gateway discovery dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - conn, err := grpc.DialContext(dialCtx, gateway.InvokeURI, dialOpts...) + conn, err := grpc.NewClient(gateway.InvokeURI, dialOpts...) if err != nil { return "", merrors.InternalWrap(err, "dial gateway") } diff --git a/api/edge/bff/internal/server/walletapiimp/list.go b/api/edge/bff/internal/server/walletapiimp/list.go index 33e8084a..38c4d8c8 100644 --- a/api/edge/bff/internal/server/walletapiimp/list.go +++ b/api/edge/bff/internal/server/walletapiimp/list.go @@ -215,10 +215,6 @@ func (a *WalletAPI) queryAllGateways(ctx context.Context, gateways []discovery.G } func (a *WalletAPI) queryGateway(ctx context.Context, gateway discovery.GatewaySummary, req *connectorv1.ListAccountsRequest) ([]*connectorv1.Account, error) { - // Create connection with timeout - dialCtx, cancel := context.WithTimeout(ctx, a.dialTimeout) - defer cancel() - var dialOpts []grpc.DialOption if a.insecure { dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -226,7 +222,7 @@ func (a *WalletAPI) queryGateway(ctx context.Context, gateway discovery.GatewayS dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - conn, err := grpc.DialContext(dialCtx, gateway.InvokeURI, dialOpts...) + conn, err := grpc.NewClient(gateway.InvokeURI, dialOpts...) if err != nil { return nil, merrors.InternalWrap(err, "dial gateway") } diff --git a/api/fx/oracle/client/client.go b/api/fx/oracle/client/client.go index 81453ebd..3f7dbe66 100644 --- a/api/fx/oracle/client/client.go +++ b/api/fx/oracle/client/client.go @@ -92,9 +92,6 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro return nil, merrors.InvalidArgument("oracle: address is required") } - dialCtx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) - defer cancel() - dialOpts := make([]grpc.DialOption, 0, len(opts)+1) dialOpts = append(dialOpts, opts...) @@ -104,7 +101,7 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - conn, err := grpc.DialContext(dialCtx, cfg.Address, dialOpts...) + conn, err := grpc.NewClient(cfg.Address, dialOpts...) if err != nil { return nil, merrors.InternalWrap(err, fmt.Sprintf("oracle: dial %s", cfg.Address)) } diff --git a/api/gateway/chain/client/client.go b/api/gateway/chain/client/client.go index 39fd0820..3581b3d2 100644 --- a/api/gateway/chain/client/client.go +++ b/api/gateway/chain/client/client.go @@ -63,9 +63,6 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro return nil, merrors.InvalidArgument("chain-gateway: address is required") } - dialCtx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) - defer cancel() - dialOpts := make([]grpc.DialOption, 0, len(opts)+1) dialOpts = append(dialOpts, opts...) @@ -75,7 +72,7 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - conn, err := grpc.DialContext(dialCtx, cfg.Address, dialOpts...) + conn, err := grpc.NewClient(cfg.Address, dialOpts...) if err != nil { return nil, merrors.Internal(fmt.Sprintf("chain-gateway: dial %s: %s", cfg.Address, err.Error())) } diff --git a/api/gateway/mntx/client/client.go b/api/gateway/mntx/client/client.go index 8bf36817..ee04cd5a 100644 --- a/api/gateway/mntx/client/client.go +++ b/api/gateway/mntx/client/client.go @@ -45,14 +45,11 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro if strings.TrimSpace(cfg.Address) == "" { return nil, merrors.InvalidArgument("mntx: address is required") } - dialCtx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) - defer cancel() - dialOpts := make([]grpc.DialOption, 0, len(opts)+1) dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) dialOpts = append(dialOpts, opts...) - conn, err := grpc.DialContext(dialCtx, cfg.Address, dialOpts...) + conn, err := grpc.NewClient(cfg.Address, dialOpts...) if err != nil { return nil, merrors.Internal("mntx: dial failed: " + err.Error()) } diff --git a/api/ledger/client/client.go b/api/ledger/client/client.go index 58166d2c..afb79c49 100644 --- a/api/ledger/client/client.go +++ b/api/ledger/client/client.go @@ -105,9 +105,6 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro return nil, merrors.InvalidArgument("ledger: address is required") } - dialCtx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) - defer cancel() - dialOpts := make([]grpc.DialOption, 0, len(opts)+1) dialOpts = append(dialOpts, opts...) @@ -117,7 +114,7 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - conn, err := grpc.DialContext(dialCtx, cfg.Address, dialOpts...) + conn, err := grpc.NewClient(cfg.Address, dialOpts...) if err != nil { return nil, merrors.InternalWrap(err, fmt.Sprintf("ledger: dial %s", cfg.Address)) } diff --git a/api/payments/methods/client/client.go b/api/payments/methods/client/client.go index 6434616d..130007ad 100644 --- a/api/payments/methods/client/client.go +++ b/api/payments/methods/client/client.go @@ -49,9 +49,6 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro return nil, merrors.InvalidArgument("payment-methods: address is required") } - dialCtx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) - defer cancel() - dialOpts := make([]grpc.DialOption, 0, len(opts)+1) dialOpts = append(dialOpts, opts...) if cfg.Insecure { @@ -60,7 +57,7 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - conn, err := grpc.DialContext(dialCtx, cfg.Address, dialOpts...) + conn, err := grpc.NewClient(cfg.Address, dialOpts...) if err != nil { return nil, merrors.InternalWrap(err, fmt.Sprintf("payment-methods: dial %s", cfg.Address)) } diff --git a/api/payments/orchestrator/client/client.go b/api/payments/orchestrator/client/client.go index b1b404b4..aa4229f5 100644 --- a/api/payments/orchestrator/client/client.go +++ b/api/payments/orchestrator/client/client.go @@ -56,8 +56,6 @@ func New(ctx context.Context, cfg Config, opts ...grpc.DialOption) (Client, erro } func dial(ctx context.Context, cfg Config, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - dialCtx, cancel := context.WithTimeout(ctx, cfg.DialTimeout) - defer cancel() dialOpts := make([]grpc.DialOption, 0, len(opts)+1) dialOpts = append(dialOpts, opts...) @@ -67,7 +65,7 @@ func dial(ctx context.Context, cfg Config, address string, opts ...grpc.DialOpti dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - conn, err := grpc.DialContext(dialCtx, address, dialOpts...) + conn, err := grpc.NewClient(address, dialOpts...) if err != nil { return nil, merrors.InternalWrap(err, fmt.Sprintf("payment-orchestrator: dial %s", address)) } diff --git a/api/payments/orchestrator/internal/server/internal/discovery_clients.go b/api/payments/orchestrator/internal/server/internal/discovery_clients.go index 6f9c4079..6e3fa5ff 100644 --- a/api/payments/orchestrator/internal/server/internal/discovery_clients.go +++ b/api/payments/orchestrator/internal/server/internal/discovery_clients.go @@ -451,5 +451,5 @@ func dialGrpc(ctx context.Context, endpoint discoveryEndpoint) (*grpc.ClientConn if ctx == nil { ctx = context.Background() } - return grpc.DialContext(ctx, endpoint.address, dialOpts...) + return grpc.NewClient(endpoint.address, dialOpts...) } diff --git a/api/payments/quotation/internal/server/internal/dependencies.go b/api/payments/quotation/internal/server/internal/dependencies.go index 2a42b116..200fa805 100644 --- a/api/payments/quotation/internal/server/internal/dependencies.go +++ b/api/payments/quotation/internal/server/internal/dependencies.go @@ -10,6 +10,7 @@ import ( feesv1 "github.com/tech/sendico/pkg/proto/billing/fees/v1" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" ) @@ -82,19 +83,33 @@ func dialGRPC(ctx context.Context, cfg clientConfig, address string) (*grpc.Clie ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() } + dialOpts := make([]grpc.DialOption, 0, 1) if cfg.InsecureTransport { - return grpc.DialContext( - ctx, - address, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - ) + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } - return grpc.DialContext( - ctx, - address, - grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})), - grpc.WithBlock(), - ) + conn, err := grpc.NewClient(address, dialOpts...) + if err != nil { + return nil, err + } + conn.Connect() + if err := waitUntilReady(ctx, conn); err != nil { + _ = conn.Close() + return nil, err + } + return conn, nil +} + +func waitUntilReady(ctx context.Context, conn *grpc.ClientConn) error { + for { + state := conn.GetState() + if state == connectivity.Ready { + return nil + } + if !conn.WaitForStateChange(ctx, state) { + return ctx.Err() + } + } } diff --git a/api/pkg/discovery/announcer.go b/api/pkg/discovery/announcer.go index 6445c8c3..67d4bf16 100644 --- a/api/pkg/discovery/announcer.go +++ b/api/pkg/discovery/announcer.go @@ -10,6 +10,7 @@ import ( msg "github.com/tech/sendico/pkg/messaging" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) const defaultReannounceHeartbeatFactor = 6 @@ -20,10 +21,11 @@ type Announcer struct { sender string announce Announcement - startOnce sync.Once - stopOnce sync.Once - stopCh chan struct{} - doneCh chan struct{} + startOnce sync.Once + stopOnce sync.Once + stopCh chan struct{} + doneCh chan struct{} + announceLevel zapcore.Level } func NewAnnouncer(logger mlogger.Logger, producer msg.Producer, sender string, announce Announcement) *Announcer { @@ -42,12 +44,13 @@ func NewAnnouncer(logger mlogger.Logger, producer msg.Producer, sender string, a announce.ID = DefaultEntryID(announce.Service) } return &Announcer{ - logger: logger, - producer: producer, - sender: strings.TrimSpace(sender), - announce: announce, - stopCh: make(chan struct{}), - doneCh: make(chan struct{}), + logger: logger, + producer: producer, + sender: strings.TrimSpace(sender), + announce: announce, + stopCh: make(chan struct{}), + doneCh: make(chan struct{}), + announceLevel: zapcore.InfoLevel, } } @@ -126,7 +129,8 @@ func (a *Announcer) sendAnnouncement() { a.logWarn("Failed to publish discovery announce", fields...) return } - a.logInfo("Discovery announce published", append(announcementFields(a.announce), zap.String("event", event.ToString()))...) + a.logger.Log(a.announceLevel, "Discovery announce published", append(announcementFields(a.announce), zap.String("event", event.ToString()))...) + a.announceLevel = zapcore.DebugLevel } func (a *Announcer) sendHeartbeat() {