package notifications import ( "context" "github.com/tech/sendico/pkg/db/account" me "github.com/tech/sendico/pkg/messaging/envelope" gmessaging "github.com/tech/sendico/pkg/messaging/internal/generated" mah "github.com/tech/sendico/pkg/messaging/notifications/account/handler" np "github.com/tech/sendico/pkg/messaging/notifications/processor" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/model" nm "github.com/tech/sendico/pkg/model/notification" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" "google.golang.org/protobuf/proto" ) type AccoountNotificaionProcessor struct { logger mlogger.Logger handler mah.AccountHandler db account.DB event model.NotificationEvent } func (acnp *AccoountNotificaionProcessor) Process(ctx context.Context, envelope me.Envelope) error { var msg gmessaging.AccountCreatedEvent if err := proto.Unmarshal(envelope.GetData(), &msg); err != nil { acnp.logger.Warn("Failed to unmarshall envelope", zap.Error(err), zap.String("topic", acnp.event.ToString())) return err } accountRef, err := primitive.ObjectIDFromHex(msg.AccountRef) if err != nil { acnp.logger.Warn("Failed to restore object ID", zap.Error(err), zap.String("topic", acnp.event.ToString()), zap.String("account_ref", msg.AccountRef)) return err } var account model.Account if err := acnp.db.Get(ctx, accountRef, &account); err != nil { acnp.logger.Warn("Failed to fetch account", zap.Error(err), zap.String("topic", acnp.event.ToString()), zap.String("account_ref", msg.AccountRef)) return err } return acnp.handler(ctx, &account) } func (acnp *AccoountNotificaionProcessor) GetSubject() model.NotificationEvent { return acnp.event } func NewAccountMessageProcessor(logger mlogger.Logger, handler mah.AccountHandler, db account.DB, action nm.NotificationAction) np.EnvelopeProcessor { event := NewAccountNotification(action) return &AccoountNotificaionProcessor{ logger: logger.Named("message_processor"), handler: handler, db: db, event: event, } }