new payment methods service
This commit is contained in:
28
api/payments/methods/internal/appversion/version.go
Normal file
28
api/payments/methods/internal/appversion/version.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package appversion
|
||||
|
||||
import (
|
||||
"github.com/tech/sendico/pkg/version"
|
||||
vf "github.com/tech/sendico/pkg/version/factory"
|
||||
)
|
||||
|
||||
// Build information populated via ldflags.
|
||||
var (
|
||||
Version string
|
||||
Revision string
|
||||
Branch string
|
||||
BuildUser string
|
||||
BuildDate string
|
||||
)
|
||||
|
||||
// Create returns a printer configured for the payment methods service.
|
||||
func Create() version.Printer {
|
||||
vi := version.Info{
|
||||
Program: "Sendico Payment Methods Service",
|
||||
Revision: Revision,
|
||||
Branch: Branch,
|
||||
BuildUser: BuildUser,
|
||||
BuildDate: BuildDate,
|
||||
Version: Version,
|
||||
}
|
||||
return vf.Create(&vi)
|
||||
}
|
||||
62
api/payments/methods/internal/server/internal/config.go
Normal file
62
api/payments/methods/internal/server/internal/config.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package serverimp
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/tech/sendico/pkg/api/routers"
|
||||
"github.com/tech/sendico/pkg/db"
|
||||
"github.com/tech/sendico/pkg/server/grpcapp"
|
||||
"go.uber.org/zap"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
*grpcapp.Config `yaml:",inline"`
|
||||
|
||||
// PermissionsDatabase points to the authorization store (policies/roles/assignments).
|
||||
// If omitted, startup falls back to Database for backward compatibility.
|
||||
PermissionsDatabase *db.Config `yaml:"permissions_database"`
|
||||
}
|
||||
|
||||
func (i *Imp) loadConfig() (*config, error) {
|
||||
data, err := os.ReadFile(i.file)
|
||||
if err != nil {
|
||||
i.logger.Error("Could not read configuration file", zap.String("config_file", i.file), zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := &config{Config: &grpcapp.Config{}}
|
||||
if err := yaml.Unmarshal(data, cfg); err != nil {
|
||||
i.logger.Error("Failed to parse configuration", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cfg.Runtime == nil {
|
||||
cfg.Runtime = &grpcapp.RuntimeConfig{ShutdownTimeoutSeconds: 15}
|
||||
}
|
||||
|
||||
if cfg.GRPC == nil {
|
||||
cfg.GRPC = &routers.GRPCConfig{
|
||||
Network: "tcp",
|
||||
Address: ":50066",
|
||||
EnableReflection: true,
|
||||
EnableHealth: true,
|
||||
}
|
||||
} else {
|
||||
if strings.TrimSpace(cfg.GRPC.Address) == "" {
|
||||
cfg.GRPC.Address = ":50066"
|
||||
}
|
||||
if strings.TrimSpace(cfg.GRPC.Network) == "" {
|
||||
cfg.GRPC.Network = "tcp"
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.Metrics == nil {
|
||||
cfg.Metrics = &grpcapp.MetricsConfig{Address: ":9416"}
|
||||
} else if strings.TrimSpace(cfg.Metrics.Address) == "" {
|
||||
cfg.Metrics.Address = ":9416"
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
88
api/payments/methods/internal/server/internal/discovery.go
Normal file
88
api/payments/methods/internal/server/internal/discovery.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package serverimp
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/tech/sendico/payments/methods/internal/appversion"
|
||||
"github.com/tech/sendico/pkg/discovery"
|
||||
msg "github.com/tech/sendico/pkg/messaging"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const methodsDiscoverySender = "payment_methods"
|
||||
|
||||
func (i *Imp) initDiscovery(cfg *config) {
|
||||
if i == nil || cfg == nil || cfg.Messaging == nil || cfg.Messaging.Driver == "" {
|
||||
return
|
||||
}
|
||||
|
||||
logger := i.logger.Named("discovery")
|
||||
broker, err := msg.CreateMessagingBroker(logger.Named("bus"), cfg.Messaging)
|
||||
if err != nil {
|
||||
i.logger.Warn("Failed to initialise discovery broker", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
registry := discovery.NewRegistry()
|
||||
watcher, err := discovery.NewRegistryWatcher(logger, broker, registry)
|
||||
if err != nil {
|
||||
i.logger.Warn("Failed to initialise discovery registry watcher", zap.Error(err))
|
||||
return
|
||||
}
|
||||
if err := watcher.Start(); err != nil {
|
||||
i.logger.Warn("Failed to start discovery registry watcher", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
i.discoveryWatcher = watcher
|
||||
i.discoveryReg = registry
|
||||
i.logger.Info("Discovery registry watcher started")
|
||||
}
|
||||
|
||||
func (i *Imp) startDiscoveryAnnouncer(cfg *config, producer msg.Producer) {
|
||||
if i == nil || cfg == nil || producer == nil || cfg.GRPC == nil {
|
||||
return
|
||||
}
|
||||
|
||||
invokeURI := strings.TrimSpace(cfg.GRPC.DiscoveryInvokeURI())
|
||||
if invokeURI == "" {
|
||||
i.logger.Warn("Skipping discovery announcement: missing advertise host/port in gRPC config")
|
||||
return
|
||||
}
|
||||
|
||||
announce := discovery.Announcement{
|
||||
Service: "PAYMENTS_METHODS",
|
||||
Operations: []string{
|
||||
"payment_methods.manage",
|
||||
"payment_methods.read",
|
||||
},
|
||||
InvokeURI: invokeURI,
|
||||
Version: appversion.Create().Short(),
|
||||
}
|
||||
|
||||
i.discoveryAnnouncer = discovery.NewAnnouncer(i.logger, producer, methodsDiscoverySender, announce)
|
||||
i.discoveryAnnouncer.Start()
|
||||
i.logger.Info("Discovery announcer started",
|
||||
zap.String("service", announce.Service),
|
||||
zap.String("invoke_uri", announce.InvokeURI))
|
||||
}
|
||||
|
||||
func (i *Imp) stopDiscoveryAnnouncer() {
|
||||
if i == nil || i.discoveryAnnouncer == nil {
|
||||
return
|
||||
}
|
||||
i.discoveryAnnouncer.Stop()
|
||||
i.discoveryAnnouncer = nil
|
||||
}
|
||||
|
||||
func (i *Imp) stopDiscovery() {
|
||||
if i == nil {
|
||||
return
|
||||
}
|
||||
i.stopDiscoveryAnnouncer()
|
||||
if i.discoveryWatcher != nil {
|
||||
i.discoveryWatcher.Stop()
|
||||
i.discoveryWatcher = nil
|
||||
}
|
||||
i.discoveryReg = nil
|
||||
}
|
||||
16
api/payments/methods/internal/server/internal/lifecycle.go
Normal file
16
api/payments/methods/internal/server/internal/lifecycle.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package serverimp
|
||||
|
||||
import "context"
|
||||
|
||||
func (i *Imp) shutdownApp() {
|
||||
if i == nil || i.app == nil {
|
||||
return
|
||||
}
|
||||
|
||||
timeout := i.config.Runtime.ShutdownTimeout()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
i.app.Shutdown(ctx)
|
||||
i.app = nil
|
||||
}
|
||||
117
api/payments/methods/internal/server/internal/serverimp.go
Normal file
117
api/payments/methods/internal/server/internal/serverimp.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package serverimp
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/payments/methods/internal/service/methods"
|
||||
"github.com/tech/sendico/payments/storage"
|
||||
mongostorage "github.com/tech/sendico/payments/storage/mongo"
|
||||
"github.com/tech/sendico/pkg/db"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
msg "github.com/tech/sendico/pkg/messaging"
|
||||
mb "github.com/tech/sendico/pkg/messaging/broker"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"github.com/tech/sendico/pkg/mservice"
|
||||
"github.com/tech/sendico/pkg/server/grpcapp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func Create(logger mlogger.Logger, file string, debug bool) (*Imp, error) {
|
||||
return &Imp{
|
||||
logger: logger.Named("server"),
|
||||
file: file,
|
||||
debug: debug,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (i *Imp) Shutdown() {
|
||||
i.stopDiscovery()
|
||||
if i.service != nil {
|
||||
i.service.Shutdown()
|
||||
}
|
||||
i.shutdownApp()
|
||||
if i.dbFactory != nil {
|
||||
i.dbFactory.CloseConnection()
|
||||
i.dbFactory = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Imp) Start() error {
|
||||
cfg, err := i.loadConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i.config = cfg
|
||||
|
||||
i.initDiscovery(cfg)
|
||||
|
||||
if cfg.Database == nil {
|
||||
return merrors.InvalidArgument("database configuration is required")
|
||||
}
|
||||
|
||||
permissionsDB := cfg.PermissionsDatabase
|
||||
if permissionsDB == nil {
|
||||
i.logger.Info("permissions_database is not configured, falling back to database settings")
|
||||
permissionsDB = cfg.Database
|
||||
}
|
||||
|
||||
i.dbFactory, err = db.NewConnection(i.logger, permissionsDB)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
policy, err := i.dbFactory.Permissions().GetPolicyDescription(context.Background(), mservice.PaymentMethods)
|
||||
if err != nil {
|
||||
i.dbFactory.CloseConnection()
|
||||
i.dbFactory = nil
|
||||
return err
|
||||
}
|
||||
|
||||
var broker mb.Broker
|
||||
if cfg.Messaging != nil && cfg.Messaging.Driver != "" {
|
||||
broker, err = msg.CreateMessagingBroker(i.logger, cfg.Messaging)
|
||||
if err != nil {
|
||||
i.logger.Warn("Failed to create recipient notifications broker", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
repoFactory := func(logger mlogger.Logger, conn *db.MongoConnection) (storage.Repository, error) {
|
||||
return mongostorage.New(
|
||||
logger,
|
||||
conn,
|
||||
mongostorage.WithPaymentMethodsAuth(i.dbFactory.Permissions().Enforcer(), policy.ID),
|
||||
)
|
||||
}
|
||||
|
||||
serviceFactory := func(logger mlogger.Logger, repo storage.Repository, producer msg.Producer) (grpcapp.Service, error) {
|
||||
opts := []methods.Option{}
|
||||
if broker != nil {
|
||||
opts = append(opts, methods.WithRecipientEventsBroker(broker))
|
||||
}
|
||||
|
||||
i.startDiscoveryAnnouncer(cfg, producer)
|
||||
svc, err := methods.NewService(logger, repo, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
i.service = svc
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
app, err := grpcapp.NewApp(i.logger, "payments_methods", cfg.Config, i.debug, repoFactory, serviceFactory)
|
||||
if err != nil {
|
||||
i.dbFactory.CloseConnection()
|
||||
i.dbFactory = nil
|
||||
return err
|
||||
}
|
||||
i.app = app
|
||||
|
||||
if err := i.app.Start(); err != nil {
|
||||
if i.dbFactory != nil {
|
||||
i.dbFactory.CloseConnection()
|
||||
i.dbFactory = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
29
api/payments/methods/internal/server/internal/types.go
Normal file
29
api/payments/methods/internal/server/internal/types.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package serverimp
|
||||
|
||||
import (
|
||||
"github.com/tech/sendico/payments/storage"
|
||||
"github.com/tech/sendico/pkg/db"
|
||||
"github.com/tech/sendico/pkg/discovery"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"github.com/tech/sendico/pkg/server/grpcapp"
|
||||
)
|
||||
|
||||
type methodsService interface {
|
||||
grpcapp.Service
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
type Imp struct {
|
||||
logger mlogger.Logger
|
||||
file string
|
||||
debug bool
|
||||
|
||||
config *config
|
||||
app *grpcapp.App[storage.Repository]
|
||||
service methodsService
|
||||
dbFactory db.Factory
|
||||
|
||||
discoveryWatcher *discovery.RegistryWatcher
|
||||
discoveryReg *discovery.Registry
|
||||
discoveryAnnouncer *discovery.Announcer
|
||||
}
|
||||
12
api/payments/methods/internal/server/server.go
Normal file
12
api/payments/methods/internal/server/server.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
serverimp "github.com/tech/sendico/payments/methods/internal/server/internal"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"github.com/tech/sendico/pkg/server"
|
||||
)
|
||||
|
||||
// Create initialises the payment methods server implementation.
|
||||
func Create(logger mlogger.Logger, file string, debug bool) (server.Application, error) {
|
||||
return serverimp.Create(logger, file, debug)
|
||||
}
|
||||
36
api/payments/methods/internal/service/methods/archive.go
Normal file
36
api/payments/methods/internal/service/methods/archive.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package methods
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
methodsv1 "github.com/tech/sendico/pkg/proto/payments/methods/v1"
|
||||
)
|
||||
|
||||
func (s *Service) SetPaymentMethodArchived(ctx context.Context, req *methodsv1.SetPaymentMethodArchivedRequest) (*methodsv1.SetPaymentMethodArchivedResponse, error) {
|
||||
if req == nil {
|
||||
return autoError[methodsv1.SetPaymentMethodArchivedResponse](ctx, s.logger, merrors.InvalidArgument("request is required"))
|
||||
}
|
||||
if s.pmstore == nil {
|
||||
return autoError[methodsv1.SetPaymentMethodArchivedResponse](ctx, s.logger, errStoreUnavailable)
|
||||
}
|
||||
|
||||
accountRef, err := parseObjectID(req.GetAccountRef(), "account_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.SetPaymentMethodArchivedResponse](ctx, s.logger, err)
|
||||
}
|
||||
organizationRef, err := parseObjectID(req.GetOrganizationRef(), "organization_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.SetPaymentMethodArchivedResponse](ctx, s.logger, err)
|
||||
}
|
||||
methodRef, err := parseObjectID(req.GetPaymentMethodRef(), "payment_method_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.SetPaymentMethodArchivedResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
if err := s.pmstore.SetArchived(ctx, accountRef, organizationRef, methodRef, req.GetArchived(), req.GetCascade()); err != nil {
|
||||
return autoError[methodsv1.SetPaymentMethodArchivedResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
return &methodsv1.SetPaymentMethodArchivedResponse{}, nil
|
||||
}
|
||||
41
api/payments/methods/internal/service/methods/create.go
Normal file
41
api/payments/methods/internal/service/methods/create.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package methods
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
methodsv1 "github.com/tech/sendico/pkg/proto/payments/methods/v1"
|
||||
)
|
||||
|
||||
func (s *Service) CreatePaymentMethod(ctx context.Context, req *methodsv1.CreatePaymentMethodRequest) (*methodsv1.CreatePaymentMethodResponse, error) {
|
||||
if req == nil {
|
||||
return autoError[methodsv1.CreatePaymentMethodResponse](ctx, s.logger, merrors.InvalidArgument("request is required"))
|
||||
}
|
||||
if s.pmstore == nil {
|
||||
return autoError[methodsv1.CreatePaymentMethodResponse](ctx, s.logger, errStoreUnavailable)
|
||||
}
|
||||
|
||||
accountRef, err := parseObjectID(req.GetAccountRef(), "account_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.CreatePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
organizationRef, err := parseObjectID(req.GetOrganizationRef(), "organization_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.CreatePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
pm, err := decodePaymentMethod(req.GetPaymentMethodJson())
|
||||
if err != nil {
|
||||
return autoError[methodsv1.CreatePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
if err := s.pmstore.Create(ctx, accountRef, organizationRef, pm); err != nil {
|
||||
return autoError[methodsv1.CreatePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
payload, err := encodePaymentMethod(pm)
|
||||
if err != nil {
|
||||
return autoError[methodsv1.CreatePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
return &methodsv1.CreatePaymentMethodResponse{PaymentMethodJson: payload}, nil
|
||||
}
|
||||
37
api/payments/methods/internal/service/methods/delete.go
Normal file
37
api/payments/methods/internal/service/methods/delete.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package methods
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
methodsv1 "github.com/tech/sendico/pkg/proto/payments/methods/v1"
|
||||
)
|
||||
|
||||
func (s *Service) DeletePaymentMethod(ctx context.Context, req *methodsv1.DeletePaymentMethodRequest) (*methodsv1.DeletePaymentMethodResponse, error) {
|
||||
if req == nil {
|
||||
return autoError[methodsv1.DeletePaymentMethodResponse](ctx, s.logger, merrors.InvalidArgument("request is required"))
|
||||
}
|
||||
if s.pmstore == nil {
|
||||
return autoError[methodsv1.DeletePaymentMethodResponse](ctx, s.logger, errStoreUnavailable)
|
||||
}
|
||||
|
||||
accountRef, err := parseObjectID(req.GetAccountRef(), "account_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.DeletePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
methodRef, err := parseObjectID(req.GetPaymentMethodRef(), "payment_method_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.DeletePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
if req.GetCascade() {
|
||||
err = s.pmstore.DeleteCascade(ctx, accountRef, methodRef)
|
||||
} else {
|
||||
err = s.pmstore.Delete(ctx, accountRef, methodRef)
|
||||
}
|
||||
if err != nil {
|
||||
return autoError[methodsv1.DeletePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
return &methodsv1.DeletePaymentMethodResponse{}, nil
|
||||
}
|
||||
38
api/payments/methods/internal/service/methods/get.go
Normal file
38
api/payments/methods/internal/service/methods/get.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package methods
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
methodsv1 "github.com/tech/sendico/pkg/proto/payments/methods/v1"
|
||||
)
|
||||
|
||||
func (s *Service) GetPaymentMethod(ctx context.Context, req *methodsv1.GetPaymentMethodRequest) (*methodsv1.GetPaymentMethodResponse, error) {
|
||||
if req == nil {
|
||||
return autoError[methodsv1.GetPaymentMethodResponse](ctx, s.logger, merrors.InvalidArgument("request is required"))
|
||||
}
|
||||
if s.pmstore == nil {
|
||||
return autoError[methodsv1.GetPaymentMethodResponse](ctx, s.logger, errStoreUnavailable)
|
||||
}
|
||||
|
||||
accountRef, err := parseObjectID(req.GetAccountRef(), "account_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.GetPaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
methodRef, err := parseObjectID(req.GetPaymentMethodRef(), "payment_method_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.GetPaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
pm, err := s.pmstore.Get(ctx, accountRef, methodRef)
|
||||
if err != nil {
|
||||
return autoError[methodsv1.GetPaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
payload, err := encodePaymentMethod(pm)
|
||||
if err != nil {
|
||||
return autoError[methodsv1.GetPaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
return &methodsv1.GetPaymentMethodResponse{PaymentMethodJson: payload}, nil
|
||||
}
|
||||
48
api/payments/methods/internal/service/methods/list.go
Normal file
48
api/payments/methods/internal/service/methods/list.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package methods
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
methodsv1 "github.com/tech/sendico/pkg/proto/payments/methods/v1"
|
||||
)
|
||||
|
||||
func (s *Service) ListPaymentMethods(ctx context.Context, req *methodsv1.ListPaymentMethodsRequest) (*methodsv1.ListPaymentMethodsResponse, error) {
|
||||
if req == nil {
|
||||
return autoError[methodsv1.ListPaymentMethodsResponse](ctx, s.logger, merrors.InvalidArgument("request is required"))
|
||||
}
|
||||
if s.pmstore == nil {
|
||||
return autoError[methodsv1.ListPaymentMethodsResponse](ctx, s.logger, errStoreUnavailable)
|
||||
}
|
||||
|
||||
accountRef, err := parseObjectID(req.GetAccountRef(), "account_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.ListPaymentMethodsResponse](ctx, s.logger, err)
|
||||
}
|
||||
organizationRef, err := parseObjectID(req.GetOrganizationRef(), "organization_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.ListPaymentMethodsResponse](ctx, s.logger, err)
|
||||
}
|
||||
recipientRef, err := parseObjectID(req.GetRecipientRef(), "recipient_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.ListPaymentMethodsResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
items, err := s.pmstore.List(ctx, accountRef, organizationRef, recipientRef, toModelCursor(req.GetCursor()))
|
||||
if err != nil {
|
||||
return autoError[methodsv1.ListPaymentMethodsResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
result := make([][]byte, 0, len(items))
|
||||
for i := range items {
|
||||
payload, err := encodePaymentMethod(&items[i])
|
||||
if err != nil {
|
||||
return autoError[methodsv1.ListPaymentMethodsResponse](ctx, s.logger, err)
|
||||
}
|
||||
result = append(result, payload)
|
||||
}
|
||||
|
||||
return &methodsv1.ListPaymentMethodsResponse{
|
||||
PaymentMethodsJson: result,
|
||||
}, nil
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package methods
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
cons "github.com/tech/sendico/pkg/messaging/consumer"
|
||||
objectnotifications "github.com/tech/sendico/pkg/messaging/notifications/object"
|
||||
np "github.com/tech/sendico/pkg/messaging/notifications/processor"
|
||||
nm "github.com/tech/sendico/pkg/model/notification"
|
||||
"github.com/tech/sendico/pkg/mservice"
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (s *Service) startRecipientConsumers() {
|
||||
if s == nil || s.recipientBroker == nil {
|
||||
s.logger.Warn("Missing broker. Recipient cascade consumers have NOT started")
|
||||
return
|
||||
}
|
||||
|
||||
s.consumeRecipientProcessor(
|
||||
objectnotifications.NewObjectChangedMessageProcessor(s.logger, mservice.Recipients, nm.NAArchived, s.onRecipientNotification),
|
||||
)
|
||||
s.consumeRecipientProcessor(
|
||||
objectnotifications.NewObjectChangedMessageProcessor(s.logger, mservice.Recipients, nm.NADeleted, s.onRecipientNotification),
|
||||
)
|
||||
|
||||
s.logger.Info("Recipient cascade consumers started")
|
||||
}
|
||||
|
||||
func (s *Service) consumeRecipientProcessor(processor np.EnvelopeProcessor) {
|
||||
consumer, err := cons.NewConsumer(s.logger, s.recipientBroker, processor.GetSubject())
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to create recipient consumer", zap.Error(err), zap.String("event", processor.GetSubject().ToString()))
|
||||
return
|
||||
}
|
||||
s.recipientConsumers = append(s.recipientConsumers, consumer)
|
||||
|
||||
go func() {
|
||||
if err := consumer.ConsumeMessages(processor.Process); err != nil {
|
||||
s.logger.Warn("Recipient consumer stopped", zap.Error(err), zap.String("event", processor.GetSubject().ToString()))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Service) onRecipientNotification(
|
||||
ctx context.Context,
|
||||
objectType mservice.Type,
|
||||
recipientRef, actorAccountRef bson.ObjectID,
|
||||
action nm.NotificationAction,
|
||||
) error {
|
||||
if s.pmstore == nil {
|
||||
return errStoreUnavailable
|
||||
}
|
||||
if objectType != mservice.Recipients || recipientRef == bson.NilObjectID {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch action {
|
||||
case nm.NAArchived:
|
||||
updated, err := s.pmstore.SetArchivedByRecipient(ctx, recipientRef, true)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to cascade archive payment methods by recipient",
|
||||
zap.Error(err),
|
||||
zap.String("recipient_ref", recipientRef.Hex()),
|
||||
zap.String("actor_account_ref", actorAccountRef.Hex()))
|
||||
return err
|
||||
}
|
||||
s.logger.Info("Recipient archive cascade applied to payment methods",
|
||||
zap.String("recipient_ref", recipientRef.Hex()),
|
||||
zap.String("actor_account_ref", actorAccountRef.Hex()),
|
||||
zap.Int("updated_count", updated))
|
||||
case nm.NADeleted:
|
||||
if err := s.pmstore.DeleteByRecipient(ctx, recipientRef); err != nil {
|
||||
s.logger.Warn("Failed to cascade delete payment methods by recipient",
|
||||
zap.Error(err),
|
||||
zap.String("recipient_ref", recipientRef.Hex()),
|
||||
zap.String("actor_account_ref", actorAccountRef.Hex()))
|
||||
return err
|
||||
}
|
||||
s.logger.Info("Recipient delete cascade applied to payment methods",
|
||||
zap.String("recipient_ref", recipientRef.Hex()),
|
||||
zap.String("actor_account_ref", actorAccountRef.Hex()))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
90
api/payments/methods/internal/service/methods/service.go
Normal file
90
api/payments/methods/internal/service/methods/service.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package methods
|
||||
|
||||
import (
|
||||
"github.com/tech/sendico/payments/storage"
|
||||
"github.com/tech/sendico/pkg/api/routers"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
msg "github.com/tech/sendico/pkg/messaging"
|
||||
mb "github.com/tech/sendico/pkg/messaging/broker"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
methodsv1 "github.com/tech/sendico/pkg/proto/payments/methods/v1"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var errStoreUnavailable = merrors.Internal("payment-methods: storage is not initialised")
|
||||
|
||||
// Option configures service dependencies.
|
||||
type Option func(*Service)
|
||||
|
||||
// WithRecipientEventsBroker wires the broker used to consume recipient events.
|
||||
func WithRecipientEventsBroker(broker mb.Broker) Option {
|
||||
return func(s *Service) {
|
||||
if broker != nil {
|
||||
s.recipientBroker = broker
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Service implements payments.methods.v1.PaymentMethodsService.
|
||||
type Service struct {
|
||||
logger mlogger.Logger
|
||||
storage storage.Repository
|
||||
pmstore storage.PaymentMethodsStore
|
||||
|
||||
recipientBroker mb.Broker
|
||||
recipientConsumers []msg.Consumer
|
||||
|
||||
methodsv1.UnimplementedPaymentMethodsServiceServer
|
||||
}
|
||||
|
||||
// NewService creates a payment methods gRPC service.
|
||||
func NewService(logger mlogger.Logger, repo storage.Repository, opts ...Option) (*Service, error) {
|
||||
if logger == nil {
|
||||
return nil, merrors.InvalidArgument("payment-methods: logger is required")
|
||||
}
|
||||
if repo == nil {
|
||||
return nil, merrors.InvalidArgument("payment-methods: storage repository is required")
|
||||
}
|
||||
|
||||
pmstore := repo.PaymentMethods()
|
||||
if pmstore == nil {
|
||||
return nil, errStoreUnavailable
|
||||
}
|
||||
|
||||
svc := &Service{
|
||||
logger: logger.Named("payment_methods"),
|
||||
storage: repo,
|
||||
pmstore: pmstore,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
if opt != nil {
|
||||
opt(svc)
|
||||
}
|
||||
}
|
||||
|
||||
svc.startRecipientConsumers()
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
// Register attaches the service to the supplied gRPC router.
|
||||
func (s *Service) Register(router routers.GRPC) error {
|
||||
return router.Register(func(reg grpc.ServiceRegistrar) {
|
||||
methodsv1.RegisterPaymentMethodsServiceServer(reg, s)
|
||||
})
|
||||
}
|
||||
|
||||
// Shutdown releases underlying resources.
|
||||
func (s *Service) Shutdown() {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
for _, consumer := range s.recipientConsumers {
|
||||
if consumer != nil {
|
||||
consumer.Close()
|
||||
}
|
||||
}
|
||||
s.recipientConsumers = nil
|
||||
s.pmstore = nil
|
||||
s.storage = nil
|
||||
}
|
||||
37
api/payments/methods/internal/service/methods/update.go
Normal file
37
api/payments/methods/internal/service/methods/update.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package methods
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
methodsv1 "github.com/tech/sendico/pkg/proto/payments/methods/v1"
|
||||
)
|
||||
|
||||
func (s *Service) UpdatePaymentMethod(ctx context.Context, req *methodsv1.UpdatePaymentMethodRequest) (*methodsv1.UpdatePaymentMethodResponse, error) {
|
||||
if req == nil {
|
||||
return autoError[methodsv1.UpdatePaymentMethodResponse](ctx, s.logger, merrors.InvalidArgument("request is required"))
|
||||
}
|
||||
if s.pmstore == nil {
|
||||
return autoError[methodsv1.UpdatePaymentMethodResponse](ctx, s.logger, errStoreUnavailable)
|
||||
}
|
||||
|
||||
accountRef, err := parseObjectID(req.GetAccountRef(), "account_ref")
|
||||
if err != nil {
|
||||
return autoError[methodsv1.UpdatePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
pm, err := decodePaymentMethod(req.GetPaymentMethodJson())
|
||||
if err != nil {
|
||||
return autoError[methodsv1.UpdatePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
if err := s.pmstore.Update(ctx, accountRef, pm); err != nil {
|
||||
return autoError[methodsv1.UpdatePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
payload, err := encodePaymentMethod(pm)
|
||||
if err != nil {
|
||||
return autoError[methodsv1.UpdatePaymentMethodResponse](ctx, s.logger, err)
|
||||
}
|
||||
|
||||
return &methodsv1.UpdatePaymentMethodResponse{PaymentMethodJson: payload}, nil
|
||||
}
|
||||
83
api/payments/methods/internal/service/methods/util.go
Normal file
83
api/payments/methods/internal/service/methods/util.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package methods
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/tech/sendico/pkg/api/routers/gsresponse"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"github.com/tech/sendico/pkg/model"
|
||||
"github.com/tech/sendico/pkg/mservice"
|
||||
methodsv1 "github.com/tech/sendico/pkg/proto/payments/methods/v1"
|
||||
"go.mongodb.org/mongo-driver/v2/bson"
|
||||
)
|
||||
|
||||
func autoError[T any](ctx context.Context, logger mlogger.Logger, err error) (*T, error) {
|
||||
return gsresponse.Execute(ctx, gsresponse.Auto[T](logger, mservice.PaymentMethods, err))
|
||||
}
|
||||
|
||||
func parseObjectID(value, field string) (bson.ObjectID, error) {
|
||||
trimmed := strings.TrimSpace(value)
|
||||
if trimmed == "" {
|
||||
return bson.NilObjectID, merrors.InvalidArgument(fmt.Sprintf("%s is required", field), field)
|
||||
}
|
||||
ref, err := bson.ObjectIDFromHex(trimmed)
|
||||
if err != nil {
|
||||
return bson.NilObjectID, merrors.InvalidArgument(fmt.Sprintf("%s must be a valid object id", field), field)
|
||||
}
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
func decodePaymentMethod(data []byte) (*model.PaymentMethod, error) {
|
||||
if len(data) == 0 {
|
||||
return nil, merrors.InvalidArgument("payment_method_json is required", "payment_method_json")
|
||||
}
|
||||
res := &model.PaymentMethod{}
|
||||
if err := json.Unmarshal(data, res); err != nil {
|
||||
return nil, merrors.InvalidArgumentWrap(err, "failed to decode payment method", "payment_method_json")
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func encodePaymentMethod(pm *model.PaymentMethod) ([]byte, error) {
|
||||
if pm == nil {
|
||||
return nil, merrors.InvalidArgument("payment method is required")
|
||||
}
|
||||
payload, err := json.Marshal(pm)
|
||||
if err != nil {
|
||||
return nil, merrors.InternalWrap(err, "failed to encode payment method")
|
||||
}
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
func toModelCursor(cursor *methodsv1.ViewCursor) *model.ViewCursor {
|
||||
if cursor == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
res := &model.ViewCursor{}
|
||||
hasAny := false
|
||||
|
||||
if limit := cursor.GetLimit(); limit != nil {
|
||||
v := limit.GetValue()
|
||||
res.Limit = &v
|
||||
hasAny = true
|
||||
}
|
||||
if offset := cursor.GetOffset(); offset != nil {
|
||||
v := offset.GetValue()
|
||||
res.Offset = &v
|
||||
hasAny = true
|
||||
}
|
||||
if archived := cursor.GetIsArchived(); archived != nil {
|
||||
v := archived.GetValue()
|
||||
res.IsArchived = &v
|
||||
hasAny = true
|
||||
}
|
||||
if !hasAny {
|
||||
return nil
|
||||
}
|
||||
return res
|
||||
}
|
||||
Reference in New Issue
Block a user