package ingest import ( "context" "encoding/json" "errors" "strings" "sync" "time" "github.com/tech/sendico/edge/callbacks/internal/events" "github.com/tech/sendico/pkg/merrors" pkgmsg "github.com/tech/sendico/pkg/messaging" cons "github.com/tech/sendico/pkg/messaging/consumer" me "github.com/tech/sendico/pkg/messaging/envelope" pon "github.com/tech/sendico/pkg/messaging/notifications/paymentorchestrator" np "github.com/tech/sendico/pkg/messaging/notifications/processor" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/model" "go.mongodb.org/mongo-driver/v2/bson" "go.uber.org/zap" ) const ( loggerNameIngest = "ingest" logFieldSubject = "subject" errBrokerRequired = "ingest: broker is required" errEventsRequired = "ingest: events service is required" errResolverRequired = "ingest: subscriptions resolver is required" errInboxRepoRequired = "ingest: inbox repo is required" errTaskRepoRequired = "ingest: task repo is required" configFieldBroker = "broker" configFieldEvents = "events" configFieldResolver = "resolver" configFieldInboxRepo = "inboxRepo" configFieldTaskRepo = "taskRepo" logFailedStartConsumer = "Failed to start messaging consumer" logIngestConsumerStarted = "Ingest consumer started" logIngestConsumerStopped = "Ingest consumer stopped" logIngestConsumerWarn = "Ingest consumer stopped with error" ingestResultOK = "ok" ingestResultEmptyPayload = "empty_payload" ingestResultInvalidEvent = "invalid_event" ingestResultPayloadError = "payload_error" ingestResultInboxError = "inbox_error" ingestResultDuplicate = "duplicate" ingestResultResolveError = "resolve_error" ingestResultNoEndpoints = "no_endpoints" ingestResultTaskError = "task_error" ) type service struct { logger mlogger.Logger deps Dependencies event model.NotificationEvent cancel context.CancelFunc wg sync.WaitGroup once sync.Once stop sync.Once mu sync.Mutex consumer pkgmsg.Consumer processor np.EnvelopeProcessor } func newService(deps Dependencies) (Service, error) { if deps.Broker == nil { return nil, merrors.InvalidArgument(errBrokerRequired, configFieldBroker) } if deps.Events == nil { return nil, merrors.InvalidArgument(errEventsRequired, configFieldEvents) } if deps.Resolver == nil { return nil, merrors.InvalidArgument(errResolverRequired, configFieldResolver) } if deps.InboxRepo == nil { return nil, merrors.InvalidArgument(errInboxRepoRequired, configFieldInboxRepo) } if deps.TaskRepo == nil { return nil, merrors.InvalidArgument(errTaskRepoRequired, configFieldTaskRepo) } logger := deps.Logger if logger == nil { logger = zap.NewNop() } svc := &service{ logger: logger.Named(loggerNameIngest), deps: deps, } svc.processor = pon.NewPaymentStatusUpdatedProcessor(svc.logger, svc.handlePaymentStatusUpdated) svc.event = svc.processor.GetSubject() return svc, nil } func (s *service) Start(ctx context.Context) { s.once.Do(func() { runCtx := ctx if runCtx == nil { runCtx = context.Background() } runCtx, s.cancel = context.WithCancel(runCtx) s.wg.Add(1) go func() { defer s.wg.Done() s.run(runCtx) }() }) } func (s *service) Stop() { s.stop.Do(func() { if s.cancel != nil { s.cancel() } s.closeConsumer() s.wg.Wait() }) } func (s *service) run(ctx context.Context) { consumer, err := cons.NewConsumer(s.logger, s.deps.Broker, s.event) if err != nil { s.logger.Error(logFailedStartConsumer, zap.String(logFieldSubject, s.event.ToString()), zap.Error(err)) return } s.setConsumer(consumer) defer s.closeConsumer() s.logger.Info(logIngestConsumerStarted, zap.String(logFieldSubject, s.event.ToString())) if err := consumer.ConsumeMessages(func(messageCtx context.Context, envelope me.Envelope) error { select { case <-ctx.Done(): return ctx.Err() default: } return s.processor.Process(messageCtx, envelope) }); err != nil && !errors.Is(err, context.Canceled) { s.logger.Warn(logIngestConsumerWarn, zap.String(logFieldSubject, s.event.ToString()), zap.Error(err)) } s.logger.Info(logIngestConsumerStopped, zap.String(logFieldSubject, s.event.ToString())) } func (s *service) setConsumer(consumer pkgmsg.Consumer) { s.mu.Lock() s.consumer = consumer s.mu.Unlock() } func (s *service) closeConsumer() { s.mu.Lock() consumer := s.consumer s.consumer = nil s.mu.Unlock() if consumer != nil { consumer.Close() } } func (s *service) handlePaymentStatusUpdated(ctx context.Context, msg *model.PaymentStatusUpdated) error { start := time.Now() result := ingestResultOK defer func() { if s.deps.Observer != nil { s.deps.Observer.ObserveIngest(result, time.Since(start)) } }() if msg == nil { result = ingestResultEmptyPayload return nil } if strings.TrimSpace(msg.EventID) == "" || msg.Data.OrganizationRef == bson.NilObjectID || msg.OccurredAt.IsZero() { result = ingestResultInvalidEvent return nil } eventType := strings.TrimSpace(msg.Type) if eventType == "" { eventType = model.PaymentStatusUpdatedType } data, err := json.Marshal(msg.Data) if err != nil { result = ingestResultPayloadError return err } parsed := &events.Envelope{ EventID: strings.TrimSpace(msg.EventID), Type: eventType, OrganizationRef: msg.Data.OrganizationRef, OccurredAt: msg.OccurredAt.UTC(), PublishedAt: msg.PublishedAt.UTC(), Data: data, } inserted, err := s.deps.InboxRepo.TryInsert(ctx, parsed.EventID, parsed.Type, parsed.OrganizationRef, time.Now().UTC()) if err != nil { result = ingestResultInboxError return err } if !inserted { result = ingestResultDuplicate return nil } endpoints, err := s.deps.Resolver.Resolve(ctx, parsed.Type, parsed.OrganizationRef) if err != nil { result = ingestResultResolveError return err } if len(endpoints) == 0 { result = ingestResultNoEndpoints return nil } payload, err := s.deps.Events.BuildPayload(ctx, parsed) if err != nil { result = ingestResultPayloadError return err } if err := s.deps.TaskRepo.UpsertTasks(ctx, parsed.EventID, endpoints, payload, s.deps.TaskDefaults, time.Now().UTC()); err != nil { result = ingestResultTaskError return err } return nil }