Files
sendico/api/gateway/chain/internal/service/gateway/service.go
2025-12-05 10:55:01 +01:00

160 lines
5.4 KiB
Go

package gateway
import (
"context"
"github.com/tech/sendico/gateway/chain/internal/keymanager"
"github.com/tech/sendico/gateway/chain/internal/service/gateway/commands"
"github.com/tech/sendico/gateway/chain/internal/service/gateway/commands/transfer"
"github.com/tech/sendico/gateway/chain/internal/service/gateway/commands/wallet"
"github.com/tech/sendico/gateway/chain/internal/service/gateway/shared"
"github.com/tech/sendico/gateway/chain/storage"
"github.com/tech/sendico/pkg/api/routers"
"github.com/tech/sendico/pkg/api/routers/gsresponse"
clockpkg "github.com/tech/sendico/pkg/clock"
msg "github.com/tech/sendico/pkg/messaging"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
chainv1 "github.com/tech/sendico/pkg/proto/gateway/chain/v1"
"google.golang.org/grpc"
)
type serviceError string
func (e serviceError) Error() string {
return string(e)
}
var (
errStorageUnavailable = serviceError("chain_gateway: storage not initialised")
)
// Service implements the ChainGatewayService RPC contract.
type Service struct {
logger mlogger.Logger
storage storage.Repository
producer msg.Producer
clock clockpkg.Clock
settings CacheSettings
networks map[string]shared.Network
serviceWallet shared.ServiceWallet
keyManager keymanager.Manager
executor TransferExecutor
commands commands.Registry
chainv1.UnimplementedChainGatewayServiceServer
}
// NewService constructs the chain gateway service skeleton.
func NewService(logger mlogger.Logger, repo storage.Repository, producer msg.Producer, opts ...Option) *Service {
svc := &Service{
logger: logger.Named("service"),
storage: repo,
producer: producer,
clock: clockpkg.System{},
settings: defaultSettings(),
networks: map[string]shared.Network{},
}
initMetrics()
for _, opt := range opts {
if opt != nil {
opt(svc)
}
}
if svc.clock == nil {
svc.clock = clockpkg.System{}
}
if svc.networks == nil {
svc.networks = map[string]shared.Network{}
}
svc.settings = svc.settings.withDefaults()
svc.commands = commands.NewRegistry(commands.RegistryDeps{
Wallet: commandsWalletDeps(svc),
Transfer: commandsTransferDeps(svc),
})
return svc
}
// Register wires the service onto the provided gRPC router.
func (s *Service) Register(router routers.GRPC) error {
return router.Register(func(reg grpc.ServiceRegistrar) {
chainv1.RegisterChainGatewayServiceServer(reg, s)
})
}
func (s *Service) CreateManagedWallet(ctx context.Context, req *chainv1.CreateManagedWalletRequest) (*chainv1.CreateManagedWalletResponse, error) {
return executeUnary(ctx, s, "CreateManagedWallet", s.commands.CreateManagedWallet.Execute, req)
}
func (s *Service) GetManagedWallet(ctx context.Context, req *chainv1.GetManagedWalletRequest) (*chainv1.GetManagedWalletResponse, error) {
return executeUnary(ctx, s, "GetManagedWallet", s.commands.GetManagedWallet.Execute, req)
}
func (s *Service) ListManagedWallets(ctx context.Context, req *chainv1.ListManagedWalletsRequest) (*chainv1.ListManagedWalletsResponse, error) {
return executeUnary(ctx, s, "ListManagedWallets", s.commands.ListManagedWallets.Execute, req)
}
func (s *Service) GetWalletBalance(ctx context.Context, req *chainv1.GetWalletBalanceRequest) (*chainv1.GetWalletBalanceResponse, error) {
return executeUnary(ctx, s, "GetWalletBalance", s.commands.GetWalletBalance.Execute, req)
}
func (s *Service) SubmitTransfer(ctx context.Context, req *chainv1.SubmitTransferRequest) (*chainv1.SubmitTransferResponse, error) {
return executeUnary(ctx, s, "SubmitTransfer", s.commands.SubmitTransfer.Execute, req)
}
func (s *Service) GetTransfer(ctx context.Context, req *chainv1.GetTransferRequest) (*chainv1.GetTransferResponse, error) {
return executeUnary(ctx, s, "GetTransfer", s.commands.GetTransfer.Execute, req)
}
func (s *Service) ListTransfers(ctx context.Context, req *chainv1.ListTransfersRequest) (*chainv1.ListTransfersResponse, error) {
return executeUnary(ctx, s, "ListTransfers", s.commands.ListTransfers.Execute, req)
}
func (s *Service) EstimateTransferFee(ctx context.Context, req *chainv1.EstimateTransferFeeRequest) (*chainv1.EstimateTransferFeeResponse, error) {
return executeUnary(ctx, s, "EstimateTransferFee", s.commands.EstimateTransfer.Execute, req)
}
func (s *Service) ensureRepository(ctx context.Context) error {
if s.storage == nil {
return errStorageUnavailable
}
return s.storage.Ping(ctx)
}
func commandsWalletDeps(s *Service) wallet.Deps {
return wallet.Deps{
Logger: s.logger.Named("command"),
Networks: s.networks,
KeyManager: s.keyManager,
Storage: s.storage,
Clock: s.clock,
BalanceCacheTTL: s.settings.walletBalanceCacheTTL(),
EnsureRepository: s.ensureRepository,
}
}
func commandsTransferDeps(s *Service) transfer.Deps {
return transfer.Deps{
Logger: s.logger.Named("transfer_cmd"),
Networks: s.networks,
Storage: s.storage,
Clock: s.clock,
EnsureRepository: s.ensureRepository,
LaunchExecution: s.launchTransferExecution,
}
}
func executeUnary[TReq any, TResp any](ctx context.Context, svc *Service, method string, handler func(context.Context, *TReq) gsresponse.Responder[TResp], req *TReq) (*TResp, error) {
start := svc.clock.Now()
resp, err := gsresponse.Unary(svc.logger, mservice.ChainGateway, handler)(ctx, req)
observeRPC(method, err, svc.clock.Now().Sub(start))
return resp, err
}