Files
sendico/api/payments/methods/internal/service/methods/recipient_consumer.go
2026-02-12 21:10:33 +01:00

88 lines
2.9 KiB
Go

package methods
import (
"context"
cons "github.com/tech/sendico/pkg/messaging/consumer"
objectnotifications "github.com/tech/sendico/pkg/messaging/notifications/object"
np "github.com/tech/sendico/pkg/messaging/notifications/processor"
nm "github.com/tech/sendico/pkg/model/notification"
"github.com/tech/sendico/pkg/mservice"
"go.mongodb.org/mongo-driver/v2/bson"
"go.uber.org/zap"
)
func (s *Service) startRecipientConsumers() {
if s == nil || s.recipientBroker == nil {
s.logger.Warn("Missing broker. Recipient cascade consumers have NOT started")
return
}
s.consumeRecipientProcessor(
objectnotifications.NewObjectChangedMessageProcessor(s.logger, mservice.Recipients, nm.NAArchived, s.onRecipientNotification),
)
s.consumeRecipientProcessor(
objectnotifications.NewObjectChangedMessageProcessor(s.logger, mservice.Recipients, nm.NADeleted, s.onRecipientNotification),
)
s.logger.Info("Recipient cascade consumers started")
}
func (s *Service) consumeRecipientProcessor(processor np.EnvelopeProcessor) {
consumer, err := cons.NewConsumer(s.logger, s.recipientBroker, processor.GetSubject())
if err != nil {
s.logger.Warn("Failed to create recipient consumer", zap.Error(err), zap.String("event", processor.GetSubject().ToString()))
return
}
s.recipientConsumers = append(s.recipientConsumers, consumer)
go func() {
if err := consumer.ConsumeMessages(processor.Process); err != nil {
s.logger.Warn("Recipient consumer stopped", zap.Error(err), zap.String("event", processor.GetSubject().ToString()))
}
}()
}
func (s *Service) onRecipientNotification(
ctx context.Context,
objectType mservice.Type,
recipientRef, actorAccountRef bson.ObjectID,
action nm.NotificationAction,
) error {
if s.pmstore == nil {
return errStoreUnavailable
}
if objectType != mservice.Recipients || recipientRef == bson.NilObjectID {
return nil
}
switch action {
case nm.NAArchived:
updated, err := s.pmstore.SetArchivedByRecipient(ctx, recipientRef, true)
if err != nil {
s.logger.Warn("Failed to cascade archive payment methods by recipient",
zap.Error(err),
zap.String("recipient_ref", recipientRef.Hex()),
zap.String("actor_account_ref", actorAccountRef.Hex()))
return err
}
s.logger.Info("Recipient archive cascade applied to payment methods",
zap.String("recipient_ref", recipientRef.Hex()),
zap.String("actor_account_ref", actorAccountRef.Hex()),
zap.Int("updated_count", updated))
case nm.NADeleted:
if err := s.pmstore.DeleteByRecipient(ctx, recipientRef); err != nil {
s.logger.Warn("Failed to cascade delete payment methods by recipient",
zap.Error(err),
zap.String("recipient_ref", recipientRef.Hex()),
zap.String("actor_account_ref", actorAccountRef.Hex()))
return err
}
s.logger.Info("Recipient delete cascade applied to payment methods",
zap.String("recipient_ref", recipientRef.Hex()),
zap.String("actor_account_ref", actorAccountRef.Hex()))
}
return nil
}