240 lines
6.1 KiB
Go
240 lines
6.1 KiB
Go
package ingest
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/edge/callbacks/internal/events"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
pkgmsg "github.com/tech/sendico/pkg/messaging"
|
|
cons "github.com/tech/sendico/pkg/messaging/consumer"
|
|
me "github.com/tech/sendico/pkg/messaging/envelope"
|
|
pon "github.com/tech/sendico/pkg/messaging/notifications/paymentorchestrator"
|
|
np "github.com/tech/sendico/pkg/messaging/notifications/processor"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"github.com/tech/sendico/pkg/model"
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
loggerNameIngest = "ingest"
|
|
logFieldSubject = "subject"
|
|
|
|
errBrokerRequired = "ingest: broker is required"
|
|
errEventsRequired = "ingest: events service is required"
|
|
errResolverRequired = "ingest: subscriptions resolver is required"
|
|
errInboxRepoRequired = "ingest: inbox repo is required"
|
|
errTaskRepoRequired = "ingest: task repo is required"
|
|
configFieldBroker = "broker"
|
|
configFieldEvents = "events"
|
|
configFieldResolver = "resolver"
|
|
configFieldInboxRepo = "inboxRepo"
|
|
configFieldTaskRepo = "taskRepo"
|
|
|
|
logFailedStartConsumer = "Failed to start messaging consumer"
|
|
logIngestConsumerStarted = "Ingest consumer started"
|
|
logIngestConsumerStopped = "Ingest consumer stopped"
|
|
logIngestConsumerWarn = "Ingest consumer stopped with error"
|
|
|
|
ingestResultOK = "ok"
|
|
ingestResultEmptyPayload = "empty_payload"
|
|
ingestResultInvalidEvent = "invalid_event"
|
|
ingestResultPayloadError = "payload_error"
|
|
ingestResultInboxError = "inbox_error"
|
|
ingestResultDuplicate = "duplicate"
|
|
ingestResultResolveError = "resolve_error"
|
|
ingestResultNoEndpoints = "no_endpoints"
|
|
ingestResultTaskError = "task_error"
|
|
)
|
|
|
|
type service struct {
|
|
logger mlogger.Logger
|
|
deps Dependencies
|
|
event model.NotificationEvent
|
|
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
once sync.Once
|
|
stop sync.Once
|
|
|
|
mu sync.Mutex
|
|
consumer pkgmsg.Consumer
|
|
|
|
processor np.EnvelopeProcessor
|
|
}
|
|
|
|
func newService(deps Dependencies) (Service, error) {
|
|
if deps.Broker == nil {
|
|
return nil, merrors.InvalidArgument(errBrokerRequired, configFieldBroker)
|
|
}
|
|
if deps.Events == nil {
|
|
return nil, merrors.InvalidArgument(errEventsRequired, configFieldEvents)
|
|
}
|
|
if deps.Resolver == nil {
|
|
return nil, merrors.InvalidArgument(errResolverRequired, configFieldResolver)
|
|
}
|
|
if deps.InboxRepo == nil {
|
|
return nil, merrors.InvalidArgument(errInboxRepoRequired, configFieldInboxRepo)
|
|
}
|
|
if deps.TaskRepo == nil {
|
|
return nil, merrors.InvalidArgument(errTaskRepoRequired, configFieldTaskRepo)
|
|
}
|
|
|
|
logger := deps.Logger
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
svc := &service{
|
|
logger: logger.Named(loggerNameIngest),
|
|
deps: deps,
|
|
}
|
|
svc.processor = pon.NewPaymentStatusUpdatedProcessor(svc.logger, svc.handlePaymentStatusUpdated)
|
|
svc.event = svc.processor.GetSubject()
|
|
|
|
return svc, nil
|
|
}
|
|
|
|
func (s *service) Start(ctx context.Context) {
|
|
s.once.Do(func() {
|
|
runCtx := ctx
|
|
if runCtx == nil {
|
|
runCtx = context.Background()
|
|
}
|
|
runCtx, s.cancel = context.WithCancel(runCtx)
|
|
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
s.run(runCtx)
|
|
}()
|
|
})
|
|
}
|
|
|
|
func (s *service) Stop() {
|
|
s.stop.Do(func() {
|
|
if s.cancel != nil {
|
|
s.cancel()
|
|
}
|
|
s.closeConsumer()
|
|
s.wg.Wait()
|
|
})
|
|
}
|
|
|
|
func (s *service) run(ctx context.Context) {
|
|
consumer, err := cons.NewConsumer(s.logger, s.deps.Broker, s.event)
|
|
if err != nil {
|
|
s.logger.Error(logFailedStartConsumer, zap.String(logFieldSubject, s.event.ToString()), zap.Error(err))
|
|
return
|
|
}
|
|
s.setConsumer(consumer)
|
|
defer s.closeConsumer()
|
|
|
|
s.logger.Info(logIngestConsumerStarted, zap.String(logFieldSubject, s.event.ToString()))
|
|
if err := consumer.ConsumeMessages(func(messageCtx context.Context, envelope me.Envelope) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
return s.processor.Process(messageCtx, envelope)
|
|
}); err != nil && !errors.Is(err, context.Canceled) {
|
|
s.logger.Warn(logIngestConsumerWarn, zap.String(logFieldSubject, s.event.ToString()), zap.Error(err))
|
|
}
|
|
s.logger.Info(logIngestConsumerStopped, zap.String(logFieldSubject, s.event.ToString()))
|
|
}
|
|
|
|
func (s *service) setConsumer(consumer pkgmsg.Consumer) {
|
|
s.mu.Lock()
|
|
s.consumer = consumer
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
func (s *service) closeConsumer() {
|
|
s.mu.Lock()
|
|
consumer := s.consumer
|
|
s.consumer = nil
|
|
s.mu.Unlock()
|
|
if consumer != nil {
|
|
consumer.Close()
|
|
}
|
|
}
|
|
|
|
func (s *service) handlePaymentStatusUpdated(ctx context.Context, msg *model.PaymentStatusUpdated) error {
|
|
start := time.Now()
|
|
result := ingestResultOK
|
|
|
|
defer func() {
|
|
if s.deps.Observer != nil {
|
|
s.deps.Observer.ObserveIngest(result, time.Since(start))
|
|
}
|
|
}()
|
|
|
|
if msg == nil {
|
|
result = ingestResultEmptyPayload
|
|
return nil
|
|
}
|
|
if strings.TrimSpace(msg.EventID) == "" || msg.Data.OrganizationRef == bson.NilObjectID || msg.OccurredAt.IsZero() {
|
|
result = ingestResultInvalidEvent
|
|
return nil
|
|
}
|
|
|
|
eventType := strings.TrimSpace(msg.Type)
|
|
if eventType == "" {
|
|
eventType = model.PaymentStatusUpdatedType
|
|
}
|
|
|
|
data, err := json.Marshal(msg.Data)
|
|
if err != nil {
|
|
result = ingestResultPayloadError
|
|
return err
|
|
}
|
|
|
|
parsed := &events.Envelope{
|
|
EventID: strings.TrimSpace(msg.EventID),
|
|
Type: eventType,
|
|
OrganizationRef: msg.Data.OrganizationRef,
|
|
OccurredAt: msg.OccurredAt.UTC(),
|
|
PublishedAt: msg.PublishedAt.UTC(),
|
|
Data: data,
|
|
}
|
|
|
|
inserted, err := s.deps.InboxRepo.TryInsert(ctx, parsed.EventID, parsed.Type, parsed.OrganizationRef, time.Now().UTC())
|
|
if err != nil {
|
|
result = ingestResultInboxError
|
|
return err
|
|
}
|
|
if !inserted {
|
|
result = ingestResultDuplicate
|
|
return nil
|
|
}
|
|
|
|
endpoints, err := s.deps.Resolver.Resolve(ctx, parsed.Type, parsed.OrganizationRef)
|
|
if err != nil {
|
|
result = ingestResultResolveError
|
|
return err
|
|
}
|
|
if len(endpoints) == 0 {
|
|
result = ingestResultNoEndpoints
|
|
return nil
|
|
}
|
|
|
|
payload, err := s.deps.Events.BuildPayload(ctx, parsed)
|
|
if err != nil {
|
|
result = ingestResultPayloadError
|
|
return err
|
|
}
|
|
|
|
if err := s.deps.TaskRepo.UpsertTasks(ctx, parsed.EventID, endpoints, payload, s.deps.TaskDefaults, time.Now().UTC()); err != nil {
|
|
result = ingestResultTaskError
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|