package gateway import ( "context" "github.com/tech/sendico/gateway/chain/internal/appversion" "github.com/tech/sendico/gateway/chain/internal/keymanager" "github.com/tech/sendico/gateway/chain/internal/service/gateway/commands" "github.com/tech/sendico/gateway/chain/internal/service/gateway/commands/transfer" "github.com/tech/sendico/gateway/chain/internal/service/gateway/commands/wallet" "github.com/tech/sendico/gateway/chain/internal/service/gateway/drivers" "github.com/tech/sendico/gateway/chain/internal/service/gateway/rpcclient" "github.com/tech/sendico/gateway/chain/internal/service/gateway/shared" "github.com/tech/sendico/gateway/chain/storage" gatewayoutbox "github.com/tech/sendico/gateway/common/outbox" "github.com/tech/sendico/pkg/api/routers" "github.com/tech/sendico/pkg/api/routers/gsresponse" clockpkg "github.com/tech/sendico/pkg/clock" "github.com/tech/sendico/pkg/discovery" msg "github.com/tech/sendico/pkg/messaging" "github.com/tech/sendico/pkg/mlogger" pmodel "github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/mservice" connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1" "go.uber.org/zap" "google.golang.org/grpc" ) type serviceError string func (e serviceError) Error() string { return string(e) } var ( errStorageUnavailable = serviceError("chain_gateway: storage not initialised") ) // Service implements the ConnectorService RPC contract for chain operations. type Service struct { logger mlogger.Logger storage storage.Repository producer msg.Producer msgCfg pmodel.SettingsT clock clockpkg.Clock settings CacheSettings outbox gatewayoutbox.ReliableRuntime networks map[pmodel.ChainNetwork]shared.Network serviceWallet shared.ServiceWallet keyManager keymanager.Manager rpcClients *rpcclient.Clients networkRegistry *rpcclient.Registry drivers *drivers.Registry commands commands.Registry announcers []*discovery.Announcer invokeURI string connectorv1.UnimplementedConnectorServiceServer } // NewService constructs the chain gateway service skeleton. func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Producer, opts ...Option) *Service { svc := &Service{ logger: logger.Named("service"), storage: repo, producer: producer, msgCfg: map[string]any{}, clock: clockpkg.System{}, settings: defaultSettings(), networks: map[pmodel.ChainNetwork]shared.Network{}, } initMetrics() for _, opt := range opts { if opt != nil { opt(svc) } } if svc.clock == nil { svc.clock = clockpkg.System{} } if svc.networks == nil { svc.networks = map[pmodel.ChainNetwork]shared.Network{} } svc.settings = svc.settings.withDefaults() svc.networkRegistry = rpcclient.NewRegistry(svc.networks, svc.rpcClients) if err := svc.startOutboxReliableProducer(context.Background()); err != nil { svc.logger.Warn("Failed to initialise outbox reliable producer", zap.Error(err)) } svc.commands = commands.NewRegistry(commands.RegistryDeps{ Wallet: commandsWalletDeps(svc), Transfer: commandsTransferDeps(svc), }) svc.startDiscoveryAnnouncers() return svc } // Register wires the service onto the provided gRPC router. func (s *Service) Register(router routers.GRPC) error { return router.Register(func(reg grpc.ServiceRegistrar) { connectorv1.RegisterConnectorServiceServer(reg, s) }) } func (s *Service) Shutdown() { if s == nil { return } s.outbox.Stop() for _, announcer := range s.announcers { if announcer != nil { announcer.Stop() } } } func (s *Service) CreateManagedWallet(ctx context.Context, req *chainv1.CreateManagedWalletRequest) (*chainv1.CreateManagedWalletResponse, error) { return executeUnary(ctx, s, "CreateManagedWallet", s.commands.CreateManagedWallet.Execute, req) } func (s *Service) GetManagedWallet(ctx context.Context, req *chainv1.GetManagedWalletRequest) (*chainv1.GetManagedWalletResponse, error) { return executeUnary(ctx, s, "GetManagedWallet", s.commands.GetManagedWallet.Execute, req) } func (s *Service) ListManagedWallets(ctx context.Context, req *chainv1.ListManagedWalletsRequest) (*chainv1.ListManagedWalletsResponse, error) { return executeUnary(ctx, s, "ListManagedWallets", s.commands.ListManagedWallets.Execute, req) } func (s *Service) GetWalletBalance(ctx context.Context, req *chainv1.GetWalletBalanceRequest) (*chainv1.GetWalletBalanceResponse, error) { return executeUnary(ctx, s, "GetWalletBalance", s.commands.GetWalletBalance.Execute, req) } func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) { return executeUnary(ctx, s, "SubmitTransfer", s.commands.SubmitTransfer.Execute, req) } func (s *Service) GetTransfer(ctx context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) { return executeUnary(ctx, s, "GetTransfer", s.commands.GetTransfer.Execute, req) } func (s *Service) ListTransfers(ctx context.Context, req *chainv1.ListTransfersRequest) (*chainv1.ListTransfersResponse, error) { return executeUnary(ctx, s, "ListTransfers", s.commands.ListTransfers.Execute, req) } func (s *Service) EstimateTransferFee(ctx context.Context, req *chainv1.EstimateTransferFeeRequest) (*chainv1.EstimateTransferFeeResponse, error) { return executeUnary(ctx, s, "EstimateTransferFee", s.commands.EstimateTransfer.Execute, req) } func (s *Service) ComputeGasTopUp(ctx context.Context, req *chainv1.ComputeGasTopUpRequest) (*chainv1.ComputeGasTopUpResponse, error) { return executeUnary(ctx, s, "ComputeGasTopUp", s.commands.ComputeGasTopUp.Execute, req) } func (s *Service) EnsureGasTopUp(ctx context.Context, req *chainv1.EnsureGasTopUpRequest) (*chainv1.EnsureGasTopUpResponse, error) { return executeUnary(ctx, s, "EnsureGasTopUp", s.commands.EnsureGasTopUp.Execute, req) } func (s *Service) ensureRepository(ctx context.Context) error { if s.storage == nil { return errStorageUnavailable } return s.storage.Ping(ctx) } func commandsWalletDeps(s *Service) wallet.Deps { return wallet.Deps{ Logger: s.logger.Named("command"), Drivers: s.drivers, Networks: s.networkRegistry, KeyManager: s.keyManager, Storage: s.storage, Clock: s.clock, BalanceCacheTTL: s.settings.walletBalanceCacheTTL(), RPCTimeout: s.settings.rpcTimeout(), EnsureRepository: s.ensureRepository, } } func commandsTransferDeps(s *Service) transfer.Deps { return transfer.Deps{ Logger: s.logger.Named("transfer_cmd"), Drivers: s.drivers, Networks: s.networkRegistry, Storage: s.storage, Clock: s.clock, RPCTimeout: s.settings.rpcTimeout(), EnsureRepository: s.ensureRepository, LaunchExecution: s.launchTransferExecution, } } func executeUnary[TReq any, TResp any](ctx context.Context, svc *Service, method string, handler func(context.Context, *TReq) gsresponse.Responder[TResp], req *TReq) (*TResp, error) { start := svc.clock.Now() resp, err := gsresponse.Unary(svc.logger, mservice.ChainGateway, handler)(ctx, req) observeRPC(method, err, svc.clock.Now().Sub(start)) return resp, err } func (s *Service) startDiscoveryAnnouncers() { if s == nil || s.producer == nil || len(s.networks) == 0 { return } version := appversion.Create().Short() for _, network := range s.networks { currencies := []discovery.CurrencyAnnouncement{{ Currency: shared.NativeCurrency(network), Network: string(network.Name), }} for _, token := range network.TokenConfigs { if token.Symbol != "" { currencies = append(currencies, discovery.CurrencyAnnouncement{ Currency: token.Symbol, Network: string(network.Name), ContractAddress: token.ContractAddress, }) } } announce := discovery.Announcement{ ID: discovery.StableCryptoRailGatewayID(string(network.Name)), InstanceID: discovery.InstanceID(), Service: mservice.ChainWallets, Rail: discovery.RailCrypto, Operations: discovery.CryptoRailGatewayOperations(), Currencies: currencies, InvokeURI: s.invokeURI, Version: version, } announcer := discovery.NewAnnouncer(s.logger, s.producer, mservice.ChainGateway, announce) announcer.Start() s.announcers = append(s.announcers, announcer) } }