Files
sendico/api/server/internal/server/paymentapiimp/service.go
2026-02-11 18:53:27 +01:00

294 lines
9.6 KiB
Go

package paymentapiimp
import (
"context"
"crypto/tls"
"fmt"
"os"
"strings"
"sync"
"time"
orchestratorclient "github.com/tech/sendico/payments/orchestrator/client"
api "github.com/tech/sendico/pkg/api/http"
"github.com/tech/sendico/pkg/auth"
"github.com/tech/sendico/pkg/discovery"
"github.com/tech/sendico/pkg/merrors"
msg "github.com/tech/sendico/pkg/messaging"
msgconsumer "github.com/tech/sendico/pkg/messaging/consumer"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/mservice"
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestration/v1"
quotationv1 "github.com/tech/sendico/pkg/proto/payments/quotation/v1"
eapi "github.com/tech/sendico/server/interface/api"
mutil "github.com/tech/sendico/server/internal/mutil/param"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)
type executionClient interface {
InitiatePayments(ctx context.Context, req *orchestratorv1.InitiatePaymentsRequest) (*orchestratorv1.InitiatePaymentsResponse, error)
InitiatePayment(ctx context.Context, req *orchestratorv1.InitiatePaymentRequest) (*orchestratorv1.InitiatePaymentResponse, error)
ListPayments(ctx context.Context, req *orchestratorv1.ListPaymentsRequest) (*orchestratorv1.ListPaymentsResponse, error)
Close() error
}
type quotationClient interface {
QuotePayment(ctx context.Context, req *quotationv1.QuotePaymentRequest) (*quotationv1.QuotePaymentResponse, error)
QuotePayments(ctx context.Context, req *quotationv1.QuotePaymentsRequest) (*quotationv1.QuotePaymentsResponse, error)
Close() error
}
type PaymentAPI struct {
logger mlogger.Logger
execution executionClient
quotation quotationClient
enf auth.Enforcer
oph mutil.ParamHelper
discovery *discovery.Client
refreshConsumer msg.Consumer
refreshMu sync.RWMutex
refreshEvent *discovery.RefreshEvent
permissionRef bson.ObjectID
}
func (a *PaymentAPI) Name() mservice.Type { return mservice.Payments }
func (a *PaymentAPI) Finish(ctx context.Context) error {
if a.execution != nil {
if err := a.execution.Close(); err != nil {
a.logger.Warn("Failed to close payment orchestrator client", zap.Error(err))
}
}
if a.quotation != nil {
if err := a.quotation.Close(); err != nil {
a.logger.Warn("Failed to close payment quotation client", zap.Error(err))
}
}
if a.discovery != nil {
a.discovery.Close()
}
if a.refreshConsumer != nil {
a.refreshConsumer.Close()
}
return nil
}
func CreateAPI(apiCtx eapi.API) (*PaymentAPI, error) {
p := &PaymentAPI{
logger: apiCtx.Logger().Named(mservice.Payments),
enf: apiCtx.Permissions().Enforcer(),
oph: mutil.CreatePH(mservice.Organizations),
}
desc, err := apiCtx.Permissions().GetPolicyDescription(context.Background(), mservice.Payments)
if err != nil {
p.logger.Warn("Failed to fetch payment orchestrator permission description", zap.Error(err))
return nil, err
}
p.permissionRef = desc.ID
if err := p.initPaymentClient(apiCtx.Config().PaymentOrchestrator, apiCtx.Config().PaymentQuotation); err != nil {
p.logger.Error("Failed to initialize payment orchestrator client", zap.Error(err))
return nil, err
}
if err := p.initDiscoveryClient(apiCtx.Config()); err != nil {
p.logger.Warn("Failed to initialize discovery client", zap.Error(err))
}
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/quote"), api.Post, p.quotePayment)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/multiquote"), api.Post, p.quotePayments)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/immediate"), api.Post, p.initiateImmediate)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/by-quote"), api.Post, p.initiateByQuote)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/by-multiquote"), api.Post, p.initiatePaymentsByQuote)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/"), api.Get, p.listPayments)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/documents/act"), api.Get, p.getActDocument)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/registry"), api.Get, p.listDiscoveryRegistry)
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/registry/refresh"), api.Get, p.getDiscoveryRefresh)
return p, nil
}
func (a *PaymentAPI) initPaymentClient(cfg *eapi.PaymentOrchestratorConfig, quoteCfg *eapi.PaymentOrchestratorConfig) error {
if cfg == nil {
return merrors.InvalidArgument("payment orchestrator configuration is not provided")
}
address, err := resolveClientAddress("payment orchestrator", cfg)
if err != nil {
return err
}
quoteAddress := address
quoteInsecure := cfg.Insecure
quoteDialTimeout := cfg.DialTimeoutSeconds
quoteCallTimeout := cfg.CallTimeoutSeconds
if quoteCfg != nil {
if addr := strings.TrimSpace(quoteCfg.Address); addr != "" {
quoteAddress = addr
} else if env := strings.TrimSpace(quoteCfg.AddressEnv); env != "" {
if resolved := strings.TrimSpace(os.Getenv(env)); resolved != "" {
quoteAddress = resolved
}
}
quoteInsecure = quoteCfg.Insecure
quoteDialTimeout = quoteCfg.DialTimeoutSeconds
quoteCallTimeout = quoteCfg.CallTimeoutSeconds
}
clientCfg := orchestratorclient.Config{
Address: address,
DialTimeout: time.Duration(cfg.DialTimeoutSeconds) * time.Second,
CallTimeout: time.Duration(cfg.CallTimeoutSeconds) * time.Second,
Insecure: cfg.Insecure,
}
execution, err := orchestratorclient.New(context.Background(), clientCfg)
if err != nil {
return err
}
quotation, err := newQuotationClient(context.Background(), quotationClientConfig{
Address: quoteAddress,
DialTimeout: time.Duration(quoteDialTimeout) * time.Second,
CallTimeout: time.Duration(quoteCallTimeout) * time.Second,
Insecure: quoteInsecure,
})
if err != nil {
_ = execution.Close()
return err
}
a.execution = execution
a.quotation = quotation
return nil
}
func resolveClientAddress(service string, cfg *eapi.PaymentOrchestratorConfig) (string, error) {
if cfg == nil {
return "", merrors.InvalidArgument(strings.TrimSpace(service) + " configuration is not provided")
}
address := strings.TrimSpace(cfg.Address)
if address != "" {
return address, nil
}
if env := strings.TrimSpace(cfg.AddressEnv); env != "" {
if resolved := strings.TrimSpace(os.Getenv(env)); resolved != "" {
return resolved, nil
}
return "", merrors.InvalidArgument(fmt.Sprintf("%s address is not specified and address env %s is empty", strings.TrimSpace(service), env))
}
return "", merrors.InvalidArgument(strings.TrimSpace(service) + " address is not specified")
}
type quotationClientConfig struct {
Address string
DialTimeout time.Duration
CallTimeout time.Duration
Insecure bool
}
func (c *quotationClientConfig) setDefaults() {
if c.DialTimeout <= 0 {
c.DialTimeout = 5 * time.Second
}
if c.CallTimeout <= 0 {
c.CallTimeout = 3 * time.Second
}
}
type grpcQuotationClient struct {
conn *grpc.ClientConn
client quotationv1.QuotationServiceClient
callTimeout time.Duration
}
func newQuotationClient(ctx context.Context, cfg quotationClientConfig, opts ...grpc.DialOption) (quotationClient, error) {
cfg.setDefaults()
if strings.TrimSpace(cfg.Address) == "" {
return nil, merrors.InvalidArgument("payment quotation: address is required")
}
dialCtx, cancel := context.WithTimeout(ctx, cfg.DialTimeout)
defer cancel()
dialOpts := make([]grpc.DialOption, 0, len(opts)+1)
dialOpts = append(dialOpts, opts...)
if cfg.Insecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})))
}
conn, err := grpc.DialContext(dialCtx, cfg.Address, dialOpts...)
if err != nil {
return nil, merrors.InternalWrap(err, fmt.Sprintf("payment-quotation: dial %s", cfg.Address))
}
return &grpcQuotationClient{
conn: conn,
client: quotationv1.NewQuotationServiceClient(conn),
callTimeout: cfg.CallTimeout,
}, nil
}
func (c *grpcQuotationClient) Close() error {
if c == nil || c.conn == nil {
return nil
}
return c.conn.Close()
}
func (c *grpcQuotationClient) QuotePayment(ctx context.Context, req *quotationv1.QuotePaymentRequest) (*quotationv1.QuotePaymentResponse, error) {
callCtx, cancel := c.callContext(ctx)
defer cancel()
return c.client.QuotePayment(callCtx, req)
}
func (c *grpcQuotationClient) QuotePayments(ctx context.Context, req *quotationv1.QuotePaymentsRequest) (*quotationv1.QuotePaymentsResponse, error) {
callCtx, cancel := c.callContext(ctx)
defer cancel()
return c.client.QuotePayments(callCtx, req)
}
func (c *grpcQuotationClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) {
timeout := c.callTimeout
if timeout <= 0 {
timeout = 3 * time.Second
}
return context.WithTimeout(ctx, timeout)
}
func (a *PaymentAPI) initDiscoveryClient(cfg *eapi.Config) error {
if cfg == nil || cfg.Mw == nil {
return nil
}
msgCfg := cfg.Mw.Messaging
if msgCfg.Driver == "" {
return nil
}
broker, err := msg.CreateMessagingBroker(a.logger.Named("discovery_bus"), &msgCfg)
if err != nil {
return err
}
client, err := discovery.NewClient(a.logger, broker, nil, string(a.Name()))
if err != nil {
return err
}
a.discovery = client
refreshConsumer, err := msgconsumer.NewConsumer(a.logger, broker, discovery.RefreshUIEvent())
if err != nil {
return err
}
a.refreshConsumer = refreshConsumer
go func() {
if err := refreshConsumer.ConsumeMessages(a.handleRefreshEvent); err != nil {
a.logger.Warn("Discovery refresh consumer stopped", zap.Error(err))
}
}()
return nil
}