package paymentapiimp import ( "context" "crypto/tls" "fmt" "os" "strings" "sync" "time" orchestratorclient "github.com/tech/sendico/payments/orchestrator/client" api "github.com/tech/sendico/pkg/api/http" "github.com/tech/sendico/pkg/auth" "github.com/tech/sendico/pkg/discovery" "github.com/tech/sendico/pkg/merrors" msg "github.com/tech/sendico/pkg/messaging" msgconsumer "github.com/tech/sendico/pkg/messaging/consumer" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mservice" orchestrationv2 "github.com/tech/sendico/pkg/proto/payments/orchestration/v2" quotationv2 "github.com/tech/sendico/pkg/proto/payments/quotation/v2" eapi "github.com/tech/sendico/server/interface/api" mutil "github.com/tech/sendico/server/internal/mutil/param" "go.mongodb.org/mongo-driver/v2/bson" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" ) type executionClient interface { ExecutePayment(ctx context.Context, req *orchestrationv2.ExecutePaymentRequest) (*orchestrationv2.ExecutePaymentResponse, error) ExecuteBatchPayment(ctx context.Context, req *orchestrationv2.ExecuteBatchPaymentRequest) (*orchestrationv2.ExecuteBatchPaymentResponse, error) ListPayments(ctx context.Context, req *orchestrationv2.ListPaymentsRequest) (*orchestrationv2.ListPaymentsResponse, error) Close() error } type quotationClient interface { QuotePayment(ctx context.Context, req *quotationv2.QuotePaymentRequest) (*quotationv2.QuotePaymentResponse, error) QuotePayments(ctx context.Context, req *quotationv2.QuotePaymentsRequest) (*quotationv2.QuotePaymentsResponse, error) Close() error } type PaymentAPI struct { logger mlogger.Logger execution executionClient quotation quotationClient enf auth.Enforcer oph mutil.ParamHelper discovery *discovery.Client refreshConsumer msg.Consumer refreshMu sync.RWMutex refreshEvent *discovery.RefreshEvent permissionRef bson.ObjectID } func (a *PaymentAPI) Name() mservice.Type { return mservice.Payments } func (a *PaymentAPI) Finish(ctx context.Context) error { if a.execution != nil { if err := a.execution.Close(); err != nil { a.logger.Warn("Failed to close payment orchestrator client", zap.Error(err)) } } if a.quotation != nil { if err := a.quotation.Close(); err != nil { a.logger.Warn("Failed to close payment quotation client", zap.Error(err)) } } if a.discovery != nil { a.discovery.Close() } if a.refreshConsumer != nil { a.refreshConsumer.Close() } return nil } func CreateAPI(apiCtx eapi.API) (*PaymentAPI, error) { p := &PaymentAPI{ logger: apiCtx.Logger().Named(mservice.Payments), enf: apiCtx.Permissions().Enforcer(), oph: mutil.CreatePH(mservice.Organizations), } desc, err := apiCtx.Permissions().GetPolicyDescription(context.Background(), mservice.Payments) if err != nil { p.logger.Warn("Failed to fetch payment orchestrator permission description", zap.Error(err)) return nil, err } p.permissionRef = desc.ID if err := p.initPaymentClient(apiCtx.Config().PaymentOrchestrator, apiCtx.Config().PaymentQuotation); err != nil { p.logger.Error("Failed to initialize payment orchestrator client", zap.Error(err)) return nil, err } if err := p.initDiscoveryClient(apiCtx.Config()); err != nil { p.logger.Warn("Failed to initialize discovery client", zap.Error(err)) } apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/quote"), api.Post, p.quotePayment) apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/multiquote"), api.Post, p.quotePayments) apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/immediate"), api.Post, p.initiateImmediate) apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/by-quote"), api.Post, p.initiateByQuote) apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/by-multiquote"), api.Post, p.initiatePaymentsByQuote) apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/"), api.Get, p.listPayments) apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/documents/act"), api.Get, p.getActDocument) apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/registry"), api.Get, p.listDiscoveryRegistry) apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/registry/refresh"), api.Get, p.getDiscoveryRefresh) return p, nil } func (a *PaymentAPI) initPaymentClient(cfg *eapi.PaymentOrchestratorConfig, quoteCfg *eapi.PaymentOrchestratorConfig) error { if cfg == nil { return merrors.InvalidArgument("payment orchestrator configuration is not provided") } address, err := resolveClientAddress("payment orchestrator", cfg) if err != nil { return err } quoteAddress := address quoteInsecure := cfg.Insecure quoteDialTimeout := cfg.DialTimeoutSeconds quoteCallTimeout := cfg.CallTimeoutSeconds if quoteCfg != nil { if addr := strings.TrimSpace(quoteCfg.Address); addr != "" { quoteAddress = addr } else if env := strings.TrimSpace(quoteCfg.AddressEnv); env != "" { if resolved := strings.TrimSpace(os.Getenv(env)); resolved != "" { quoteAddress = resolved } } quoteInsecure = quoteCfg.Insecure quoteDialTimeout = quoteCfg.DialTimeoutSeconds quoteCallTimeout = quoteCfg.CallTimeoutSeconds } clientCfg := orchestratorclient.Config{ Address: address, DialTimeout: time.Duration(cfg.DialTimeoutSeconds) * time.Second, CallTimeout: time.Duration(cfg.CallTimeoutSeconds) * time.Second, Insecure: cfg.Insecure, } execution, err := orchestratorclient.New(context.Background(), clientCfg) if err != nil { return err } quotation, err := newQuotationClient(context.Background(), quotationClientConfig{ Address: quoteAddress, DialTimeout: time.Duration(quoteDialTimeout) * time.Second, CallTimeout: time.Duration(quoteCallTimeout) * time.Second, Insecure: quoteInsecure, }) if err != nil { _ = execution.Close() return err } a.execution = execution a.quotation = quotation return nil } func resolveClientAddress(service string, cfg *eapi.PaymentOrchestratorConfig) (string, error) { if cfg == nil { return "", merrors.InvalidArgument(strings.TrimSpace(service) + " configuration is not provided") } address := strings.TrimSpace(cfg.Address) if address != "" { return address, nil } if env := strings.TrimSpace(cfg.AddressEnv); env != "" { if resolved := strings.TrimSpace(os.Getenv(env)); resolved != "" { return resolved, nil } return "", merrors.InvalidArgument(fmt.Sprintf("%s address is not specified and address env %s is empty", strings.TrimSpace(service), env)) } return "", merrors.InvalidArgument(strings.TrimSpace(service) + " address is not specified") } type quotationClientConfig struct { Address string DialTimeout time.Duration CallTimeout time.Duration Insecure bool } func (c *quotationClientConfig) setDefaults() { if c.DialTimeout <= 0 { c.DialTimeout = 5 * time.Second } if c.CallTimeout <= 0 { c.CallTimeout = 3 * time.Second } } type grpcQuotationClient struct { conn *grpc.ClientConn client quotationv2.QuotationServiceClient callTimeout time.Duration } func newQuotationClient(ctx context.Context, cfg quotationClientConfig, opts ...grpc.DialOption) (quotationClient, error) { cfg.setDefaults() if strings.TrimSpace(cfg.Address) == "" { return nil, merrors.InvalidArgument("payment quotation: address is required") } dialOpts := make([]grpc.DialOption, 0, len(opts)+1) dialOpts = append(dialOpts, opts...) if cfg.Insecure { dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) } else { dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))) } conn, err := grpc.NewClient(cfg.Address, dialOpts...) if err != nil { return nil, merrors.InternalWrap(err, fmt.Sprintf("payment-quotation: dial %s", cfg.Address)) } return &grpcQuotationClient{ conn: conn, client: quotationv2.NewQuotationServiceClient(conn), callTimeout: cfg.CallTimeout, }, nil } func (c *grpcQuotationClient) Close() error { if c == nil || c.conn == nil { return nil } return c.conn.Close() } func (c *grpcQuotationClient) QuotePayment(ctx context.Context, req *quotationv2.QuotePaymentRequest) (*quotationv2.QuotePaymentResponse, error) { callCtx, cancel := c.callContext(ctx) defer cancel() return c.client.QuotePayment(callCtx, req) } func (c *grpcQuotationClient) QuotePayments(ctx context.Context, req *quotationv2.QuotePaymentsRequest) (*quotationv2.QuotePaymentsResponse, error) { callCtx, cancel := c.callContext(ctx) defer cancel() return c.client.QuotePayments(callCtx, req) } func (c *grpcQuotationClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) { timeout := c.callTimeout if timeout <= 0 { timeout = 3 * time.Second } return context.WithTimeout(ctx, timeout) } func (a *PaymentAPI) initDiscoveryClient(cfg *eapi.Config) error { if cfg == nil || cfg.Mw == nil { return nil } msgCfg := cfg.Mw.Messaging if msgCfg.Driver == "" { return nil } broker, err := msg.CreateMessagingBroker(a.logger.Named("discovery_bus"), &msgCfg) if err != nil { return err } client, err := discovery.NewClient(a.logger, broker, nil, string(a.Name())) if err != nil { return err } a.discovery = client refreshConsumer, err := msgconsumer.NewConsumer(a.logger, broker, discovery.RefreshUIEvent()) if err != nil { return err } a.refreshConsumer = refreshConsumer go func() { if err := refreshConsumer.ConsumeMessages(a.handleRefreshEvent); err != nil { a.logger.Warn("Discovery refresh consumer stopped", zap.Error(err)) } }() return nil }