158 lines
5.3 KiB
Go
158 lines
5.3 KiB
Go
package paymentapiimp
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
orchestratorclient "github.com/tech/sendico/payments/orchestrator/client"
|
|
api "github.com/tech/sendico/pkg/api/http"
|
|
"github.com/tech/sendico/pkg/auth"
|
|
"github.com/tech/sendico/pkg/discovery"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
msg "github.com/tech/sendico/pkg/messaging"
|
|
msgconsumer "github.com/tech/sendico/pkg/messaging/consumer"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
orchestratorv1 "github.com/tech/sendico/pkg/proto/payments/orchestrator/v1"
|
|
eapi "github.com/tech/sendico/server/interface/api"
|
|
mutil "github.com/tech/sendico/server/internal/mutil/param"
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type paymentClient interface {
|
|
QuotePayment(ctx context.Context, req *orchestratorv1.QuotePaymentRequest) (*orchestratorv1.QuotePaymentResponse, error)
|
|
QuotePayments(ctx context.Context, req *orchestratorv1.QuotePaymentsRequest) (*orchestratorv1.QuotePaymentsResponse, error)
|
|
InitiatePayments(ctx context.Context, req *orchestratorv1.InitiatePaymentsRequest) (*orchestratorv1.InitiatePaymentsResponse, error)
|
|
InitiatePayment(ctx context.Context, req *orchestratorv1.InitiatePaymentRequest) (*orchestratorv1.InitiatePaymentResponse, error)
|
|
ListPayments(ctx context.Context, req *orchestratorv1.ListPaymentsRequest) (*orchestratorv1.ListPaymentsResponse, error)
|
|
Close() error
|
|
}
|
|
|
|
type PaymentAPI struct {
|
|
logger mlogger.Logger
|
|
client paymentClient
|
|
enf auth.Enforcer
|
|
oph mutil.ParamHelper
|
|
discovery *discovery.Client
|
|
refreshConsumer msg.Consumer
|
|
refreshMu sync.RWMutex
|
|
refreshEvent *discovery.RefreshEvent
|
|
|
|
permissionRef bson.ObjectID
|
|
}
|
|
|
|
func (a *PaymentAPI) Name() mservice.Type { return mservice.Payments }
|
|
|
|
func (a *PaymentAPI) Finish(ctx context.Context) error {
|
|
if a.client != nil {
|
|
if err := a.client.Close(); err != nil {
|
|
a.logger.Warn("Failed to close payment orchestrator client", zap.Error(err))
|
|
}
|
|
}
|
|
if a.discovery != nil {
|
|
a.discovery.Close()
|
|
}
|
|
if a.refreshConsumer != nil {
|
|
a.refreshConsumer.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func CreateAPI(apiCtx eapi.API) (*PaymentAPI, error) {
|
|
p := &PaymentAPI{
|
|
logger: apiCtx.Logger().Named(mservice.Payments),
|
|
enf: apiCtx.Permissions().Enforcer(),
|
|
oph: mutil.CreatePH(mservice.Organizations),
|
|
}
|
|
|
|
desc, err := apiCtx.Permissions().GetPolicyDescription(context.Background(), mservice.Payments)
|
|
if err != nil {
|
|
p.logger.Warn("Failed to fetch payment orchestrator permission description", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
p.permissionRef = desc.ID
|
|
|
|
if err := p.initPaymentClient(apiCtx.Config().PaymentOrchestrator); err != nil {
|
|
p.logger.Error("Failed to initialize payment orchestrator client", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
if err := p.initDiscoveryClient(apiCtx.Config()); err != nil {
|
|
p.logger.Warn("Failed to initialize discovery client", zap.Error(err))
|
|
}
|
|
|
|
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/quote"), api.Post, p.quotePayment)
|
|
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/multiquote"), api.Post, p.quotePayments)
|
|
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/immediate"), api.Post, p.initiateImmediate)
|
|
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/by-quote"), api.Post, p.initiateByQuote)
|
|
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/by-multiquote"), api.Post, p.initiatePaymentsByQuote)
|
|
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/"), api.Get, p.listPayments)
|
|
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/registry"), api.Get, p.listDiscoveryRegistry)
|
|
apiCtx.Register().AccountHandler(p.Name(), p.oph.AddRef("/registry/refresh"), api.Get, p.getDiscoveryRefresh)
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (a *PaymentAPI) initPaymentClient(cfg *eapi.PaymentOrchestratorConfig) error {
|
|
if cfg == nil {
|
|
return merrors.InvalidArgument("payment orchestrator configuration is not provided")
|
|
}
|
|
|
|
address := strings.TrimSpace(cfg.Address)
|
|
if address == "" {
|
|
address = strings.TrimSpace(os.Getenv(cfg.AddressEnv))
|
|
}
|
|
if address == "" {
|
|
return merrors.InvalidArgument(fmt.Sprintf("payment orchestrator address is not specified and address env %s is empty", cfg.AddressEnv))
|
|
}
|
|
|
|
clientCfg := orchestratorclient.Config{
|
|
Address: address,
|
|
DialTimeout: time.Duration(cfg.DialTimeoutSeconds) * time.Second,
|
|
CallTimeout: time.Duration(cfg.CallTimeoutSeconds) * time.Second,
|
|
Insecure: cfg.Insecure,
|
|
}
|
|
|
|
client, err := orchestratorclient.New(context.Background(), clientCfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
a.client = client
|
|
return nil
|
|
}
|
|
|
|
func (a *PaymentAPI) initDiscoveryClient(cfg *eapi.Config) error {
|
|
if cfg == nil || cfg.Mw == nil {
|
|
return nil
|
|
}
|
|
msgCfg := cfg.Mw.Messaging
|
|
if msgCfg.Driver == "" {
|
|
return nil
|
|
}
|
|
broker, err := msg.CreateMessagingBroker(a.logger.Named("discovery_bus"), &msgCfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
client, err := discovery.NewClient(a.logger, broker, nil, string(a.Name()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
a.discovery = client
|
|
refreshConsumer, err := msgconsumer.NewConsumer(a.logger, broker, discovery.RefreshUIEvent())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
a.refreshConsumer = refreshConsumer
|
|
go func() {
|
|
if err := refreshConsumer.ConsumeMessages(a.handleRefreshEvent); err != nil {
|
|
a.logger.Warn("Discovery refresh consumer stopped", zap.Error(err))
|
|
}
|
|
}()
|
|
return nil
|
|
}
|