package gateway import ( "context" "net/http" "strings" "github.com/tech/sendico/gateway/aurora/internal/appversion" "github.com/tech/sendico/gateway/aurora/internal/service/provider" "github.com/tech/sendico/gateway/aurora/storage" gatewayoutbox "github.com/tech/sendico/gateway/common/outbox" "github.com/tech/sendico/pkg/api/routers" "github.com/tech/sendico/pkg/api/routers/gsresponse" clockpkg "github.com/tech/sendico/pkg/clock" "github.com/tech/sendico/pkg/discovery" msg "github.com/tech/sendico/pkg/messaging" "github.com/tech/sendico/pkg/mlogger" pmodel "github.com/tech/sendico/pkg/model" "github.com/tech/sendico/pkg/mservice" gatewayv1 "github.com/tech/sendico/pkg/proto/common/gateway/v1" connectorv1 "github.com/tech/sendico/pkg/proto/connector/v1" "go.uber.org/zap" "google.golang.org/grpc" ) type Service struct { logger mlogger.Logger clock clockpkg.Clock producer msg.Producer msgCfg pmodel.SettingsT storage storage.Repository config provider.Config httpClient *http.Client card *cardPayoutProcessor outbox gatewayoutbox.ReliableRuntime gatewayDescriptor *gatewayv1.GatewayInstanceDescriptor announcer *discovery.Announcer invokeURI string strictIsolation bool connectorv1.UnimplementedConnectorServiceServer } type payoutFailure interface { error Reason() string } type reasonedError struct { reason string err error } func (r reasonedError) Error() string { return r.err.Error() } func (r reasonedError) Unwrap() error { return r.err } func (r reasonedError) Reason() string { return r.reason } // NewService constructs the Aurora gateway service skeleton. func NewService(logger mlogger.Logger, opts ...Option) *Service { svc := &Service{ logger: logger.Named("service"), clock: clockpkg.NewSystem(), config: provider.DefaultConfig(), msgCfg: map[string]any{}, } initMetrics() for _, opt := range opts { if opt != nil { opt(svc) } } if svc.clock == nil { svc.clock = clockpkg.NewSystem() } if svc.httpClient == nil { svc.httpClient = &http.Client{Timeout: svc.config.Timeout()} } else if svc.httpClient.Timeout <= 0 { svc.httpClient.Timeout = svc.config.Timeout() } svc.card = newCardPayoutProcessor(svc.logger, svc.config, svc.clock, svc.storage, svc.httpClient, svc.producer) if svc.strictIsolation { svc.card.setExecutionMode(newStrictIsolatedPayoutExecutionMode()) } svc.card.outbox = &svc.outbox svc.card.msgCfg = svc.msgCfg if err := svc.card.startOutboxReliableProducer(); err != nil { svc.logger.Warn("Failed to initialise outbox reliable producer", zap.Error(err)) } svc.card.applyGatewayDescriptor(svc.gatewayDescriptor) svc.startDiscoveryAnnouncer() return svc } // Register wires the service onto the provided gRPC router. func (s *Service) Register(router routers.GRPC) error { return router.Register(func(reg grpc.ServiceRegistrar) { connectorv1.RegisterConnectorServiceServer(reg, s) }) } func (s *Service) Shutdown() { if s == nil { return } if s.card != nil { s.card.stopRetries() } s.outbox.Stop() if s.announcer != nil { s.announcer.Stop() } } func executeUnary[TReq any, TResp any](ctx context.Context, svc *Service, method string, handler func(context.Context, *TReq) gsresponse.Responder[TResp], req *TReq) (*TResp, error) { log := svc.logger.Named("rpc") log.Info("RPC request started", zap.String("method", method)) start := svc.clock.Now() resp, err := gsresponse.Unary(svc.logger, mservice.MntxGateway, handler)(ctx, req) duration := svc.clock.Now().Sub(start) observeRPC(method, err, duration) if err != nil { log.Warn("RPC request failed", zap.String("method", method), zap.Duration("duration", duration), zap.Error(err)) } else { log.Info("RPC request completed", zap.String("method", method), zap.Duration("duration", duration)) } return resp, err } func normalizeReason(reason string) string { return strings.ToLower(strings.TrimSpace(reason)) } func newPayoutError(reason string, err error) error { return reasonedError{ reason: normalizeReason(reason), err: err, } } func (s *Service) startDiscoveryAnnouncer() { if s == nil || s.producer == nil { return } announce := discovery.Announcement{ Service: mservice.MntxGateway, Rail: discovery.RailCardPayout, Operations: discovery.CardPayoutRailGatewayOperations(), InvokeURI: s.invokeURI, Version: appversion.Create().Short(), InstanceID: discovery.InstanceID(), } if s.gatewayDescriptor != nil { if id := strings.TrimSpace(s.gatewayDescriptor.GetId()); id != "" { announce.ID = id } announce.Currencies = currenciesFromDescriptor(s.gatewayDescriptor) } if strings.TrimSpace(announce.ID) == "" { announce.ID = discovery.StablePaymentGatewayID(discovery.RailCardPayout) } s.announcer = discovery.NewAnnouncer(s.logger, s.producer, string(mservice.MntxGateway), announce) s.announcer.Start() } func currenciesFromDescriptor(src *gatewayv1.GatewayInstanceDescriptor) []discovery.CurrencyAnnouncement { if src == nil { return nil } network := strings.TrimSpace(src.GetNetwork()) limitsCfg := src.GetLimits() values := src.GetCurrencies() if len(values) == 0 { return nil } seen := map[string]bool{} result := make([]discovery.CurrencyAnnouncement, 0, len(values)) for _, value := range values { currency := strings.ToUpper(strings.TrimSpace(value)) if currency == "" || seen[currency] { continue } seen[currency] = true result = append(result, discovery.CurrencyAnnouncement{ Currency: currency, Network: network, Limits: currencyLimitsFromDescriptor(limitsCfg, currency), }) } if len(result) == 0 { return nil } return result } func currencyLimitsFromDescriptor(src *gatewayv1.Limits, currency string) *discovery.CurrencyLimits { if src == nil { return nil } amountMin := firstNonEmpty(src.GetPerTxMinAmount(), src.GetMinAmount()) amountMax := firstNonEmpty(src.GetPerTxMaxAmount(), src.GetMaxAmount()) limits := &discovery.CurrencyLimits{} if amountMin != "" || amountMax != "" { limits.Amount = &discovery.CurrencyAmount{ Min: amountMin, Max: amountMax, } } running := &discovery.CurrencyRunningLimits{} for bucket, max := range src.GetVolumeLimit() { bucket = strings.TrimSpace(bucket) max = strings.TrimSpace(max) if bucket == "" || max == "" { continue } running.Volume = append(running.Volume, discovery.VolumeLimit{ Window: discovery.Window{ Raw: bucket, Named: bucket, }, Max: max, }) } for bucket, max := range src.GetVelocityLimit() { bucket = strings.TrimSpace(bucket) if bucket == "" || max <= 0 { continue } running.Velocity = append(running.Velocity, discovery.VelocityLimit{ Window: discovery.Window{ Raw: bucket, Named: bucket, }, Max: int(max), }) } if override := src.GetCurrencyLimits()[strings.ToUpper(strings.TrimSpace(currency))]; override != nil { if min := strings.TrimSpace(override.GetMinAmount()); min != "" { if limits.Amount == nil { limits.Amount = &discovery.CurrencyAmount{} } limits.Amount.Min = min } if max := strings.TrimSpace(override.GetMaxAmount()); max != "" { if limits.Amount == nil { limits.Amount = &discovery.CurrencyAmount{} } limits.Amount.Max = max } if maxVolume := strings.TrimSpace(override.GetMaxVolume()); maxVolume != "" { running.Volume = append(running.Volume, discovery.VolumeLimit{ Window: discovery.Window{ Raw: "default", Named: "default", }, Max: maxVolume, }) } if maxOps := int(override.GetMaxOps()); maxOps > 0 { running.Velocity = append(running.Velocity, discovery.VelocityLimit{ Window: discovery.Window{ Raw: "default", Named: "default", }, Max: maxOps, }) } } if len(running.Volume) > 0 || len(running.Velocity) > 0 { limits.Running = running } if limits.Amount == nil && limits.Running == nil { return nil } return limits } func firstNonEmpty(values ...string) string { for _, value := range values { clean := strings.TrimSpace(value) if clean != "" { return clean } } return "" }