196 lines
5.1 KiB
Go
196 lines
5.1 KiB
Go
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"github.com/tech/sendico/gateway/mntx/internal/appversion"
|
|
"github.com/tech/sendico/gateway/mntx/internal/service/monetix"
|
|
"github.com/tech/sendico/pkg/api/routers"
|
|
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
|
clockpkg "github.com/tech/sendico/pkg/clock"
|
|
"github.com/tech/sendico/pkg/discovery"
|
|
msg "github.com/tech/sendico/pkg/messaging"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1"
|
|
unifiedv1 "github.com/tech/sendico/pkg/proto/gateway/unified/v1"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type Service struct {
|
|
logger mlogger.Logger
|
|
clock clockpkg.Clock
|
|
producer msg.Producer
|
|
cardStore *cardPayoutStore
|
|
config monetix.Config
|
|
httpClient *http.Client
|
|
card *cardPayoutProcessor
|
|
gatewayDescriptor *gatewayv1.GatewayInstanceDescriptor
|
|
announcer *discovery.Announcer
|
|
|
|
unifiedv1.UnimplementedUnifiedGatewayServiceServer
|
|
}
|
|
|
|
type payoutFailure interface {
|
|
error
|
|
Reason() string
|
|
}
|
|
|
|
type reasonedError struct {
|
|
reason string
|
|
err error
|
|
}
|
|
|
|
func (r reasonedError) Error() string {
|
|
return r.err.Error()
|
|
}
|
|
|
|
func (r reasonedError) Unwrap() error {
|
|
return r.err
|
|
}
|
|
|
|
func (r reasonedError) Reason() string {
|
|
return r.reason
|
|
}
|
|
|
|
// NewService constructs the Monetix gateway service skeleton.
|
|
func NewService(logger mlogger.Logger, opts ...Option) *Service {
|
|
svc := &Service{
|
|
logger: logger.Named("service"),
|
|
clock: clockpkg.NewSystem(),
|
|
cardStore: newCardPayoutStore(),
|
|
config: monetix.DefaultConfig(),
|
|
}
|
|
|
|
initMetrics()
|
|
|
|
for _, opt := range opts {
|
|
if opt != nil {
|
|
opt(svc)
|
|
}
|
|
}
|
|
|
|
if svc.clock == nil {
|
|
svc.clock = clockpkg.NewSystem()
|
|
}
|
|
|
|
if svc.httpClient == nil {
|
|
svc.httpClient = &http.Client{Timeout: svc.config.Timeout()}
|
|
} else if svc.httpClient.Timeout <= 0 {
|
|
svc.httpClient.Timeout = svc.config.Timeout()
|
|
}
|
|
|
|
if svc.cardStore == nil {
|
|
svc.cardStore = newCardPayoutStore()
|
|
}
|
|
|
|
svc.card = newCardPayoutProcessor(svc.logger, svc.config, svc.clock, svc.cardStore, svc.httpClient, svc.producer)
|
|
svc.startDiscoveryAnnouncer()
|
|
|
|
return svc
|
|
}
|
|
|
|
// Register wires the service onto the provided gRPC router.
|
|
func (s *Service) Register(router routers.GRPC) error {
|
|
return router.Register(func(reg grpc.ServiceRegistrar) {
|
|
unifiedv1.RegisterUnifiedGatewayServiceServer(reg, s)
|
|
})
|
|
}
|
|
|
|
func (s *Service) Shutdown() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
if s.announcer != nil {
|
|
s.announcer.Stop()
|
|
}
|
|
}
|
|
|
|
func executeUnary[TReq any, TResp any](ctx context.Context, svc *Service, method string, handler func(context.Context, *TReq) gsresponse.Responder[TResp], req *TReq) (*TResp, error) {
|
|
log := svc.logger.Named("rpc")
|
|
log.Info("RPC request started", zap.String("method", method))
|
|
|
|
start := svc.clock.Now()
|
|
resp, err := gsresponse.Unary(svc.logger, mservice.MntxGateway, handler)(ctx, req)
|
|
duration := svc.clock.Now().Sub(start)
|
|
observeRPC(method, err, duration)
|
|
|
|
if err != nil {
|
|
log.Warn("RPC request failed", zap.String("method", method), zap.Duration("duration", duration), zap.Error(err))
|
|
} else {
|
|
log.Info("RPC request completed", zap.String("method", method), zap.Duration("duration", duration))
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
func normalizeReason(reason string) string {
|
|
return strings.ToLower(strings.TrimSpace(reason))
|
|
}
|
|
|
|
func newPayoutError(reason string, err error) error {
|
|
return reasonedError{
|
|
reason: normalizeReason(reason),
|
|
err: err,
|
|
}
|
|
}
|
|
|
|
func (s *Service) startDiscoveryAnnouncer() {
|
|
if s == nil || s.producer == nil {
|
|
return
|
|
}
|
|
announce := discovery.Announcement{
|
|
Service: "CARD_PAYOUT_RAIL_GATEWAY",
|
|
Rail: "CARD_PAYOUT",
|
|
Operations: []string{"payout.card"},
|
|
InvokeURI: discovery.DefaultInvokeURI(string(mservice.MntxGateway)),
|
|
Version: appversion.Create().Short(),
|
|
}
|
|
if s.gatewayDescriptor != nil {
|
|
if id := strings.TrimSpace(s.gatewayDescriptor.GetId()); id != "" {
|
|
announce.ID = id
|
|
}
|
|
announce.Network = strings.TrimSpace(s.gatewayDescriptor.GetNetwork())
|
|
announce.Currencies = append([]string(nil), s.gatewayDescriptor.GetCurrencies()...)
|
|
announce.Limits = limitsFromDescriptor(s.gatewayDescriptor.GetLimits())
|
|
}
|
|
s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.MntxGateway), announce)
|
|
s.announcer.Start()
|
|
}
|
|
|
|
func limitsFromDescriptor(src *gatewayv1.Limits) *discovery.Limits {
|
|
if src == nil {
|
|
return nil
|
|
}
|
|
limits := &discovery.Limits{
|
|
MinAmount: strings.TrimSpace(src.GetMinAmount()),
|
|
MaxAmount: strings.TrimSpace(src.GetMaxAmount()),
|
|
VolumeLimit: map[string]string{},
|
|
VelocityLimit: map[string]int{},
|
|
}
|
|
for key, value := range src.GetVolumeLimit() {
|
|
k := strings.TrimSpace(key)
|
|
v := strings.TrimSpace(value)
|
|
if k == "" || v == "" {
|
|
continue
|
|
}
|
|
limits.VolumeLimit[k] = v
|
|
}
|
|
for key, value := range src.GetVelocityLimit() {
|
|
k := strings.TrimSpace(key)
|
|
if k == "" {
|
|
continue
|
|
}
|
|
limits.VelocityLimit[k] = int(value)
|
|
}
|
|
if len(limits.VolumeLimit) == 0 {
|
|
limits.VolumeLimit = nil
|
|
}
|
|
if len(limits.VelocityLimit) == 0 {
|
|
limits.VelocityLimit = nil
|
|
}
|
|
return limits
|
|
}
|