package ingest import ( "context" "errors" "strings" "sync" "time" "github.com/nats-io/nats.go" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) type service struct { logger mlogger.Logger js nats.JetStreamContext cfg Config deps Dependencies cancel context.CancelFunc wg sync.WaitGroup once sync.Once stop sync.Once } func newService(deps Dependencies) (Service, error) { if deps.JetStream == nil { return nil, merrors.InvalidArgument("ingest: jetstream context is required", "jetstream") } if deps.Events == nil { return nil, merrors.InvalidArgument("ingest: events service is required", "events") } if deps.Resolver == nil { return nil, merrors.InvalidArgument("ingest: subscriptions resolver is required", "resolver") } if deps.InboxRepo == nil { return nil, merrors.InvalidArgument("ingest: inbox repo is required", "inboxRepo") } if deps.TaskRepo == nil { return nil, merrors.InvalidArgument("ingest: task repo is required", "taskRepo") } if strings.TrimSpace(deps.Config.Subject) == "" { return nil, merrors.InvalidArgument("ingest: subject is required", "config.subject") } if strings.TrimSpace(deps.Config.Durable) == "" { return nil, merrors.InvalidArgument("ingest: durable is required", "config.durable") } if deps.Config.BatchSize <= 0 { deps.Config.BatchSize = 1 } if deps.Config.FetchTimeout <= 0 { deps.Config.FetchTimeout = 2 * time.Second } if deps.Config.IdleSleep <= 0 { deps.Config.IdleSleep = 500 * time.Millisecond } logger := deps.Logger if logger == nil { logger = zap.NewNop() } return &service{ logger: logger.Named("ingest"), js: deps.JetStream, cfg: deps.Config, deps: deps, }, nil } func (s *service) Start(ctx context.Context) { s.once.Do(func() { runCtx := ctx if runCtx == nil { runCtx = context.Background() } runCtx, s.cancel = context.WithCancel(runCtx) s.wg.Add(1) go func() { defer s.wg.Done() s.run(runCtx) }() }) } func (s *service) Stop() { s.stop.Do(func() { if s.cancel != nil { s.cancel() } s.wg.Wait() }) } func (s *service) run(ctx context.Context) { subOpts := []nats.SubOpt{} if stream := strings.TrimSpace(s.cfg.Stream); stream != "" { subOpts = append(subOpts, nats.BindStream(stream)) } sub, err := s.js.PullSubscribe(strings.TrimSpace(s.cfg.Subject), strings.TrimSpace(s.cfg.Durable), subOpts...) if err != nil { s.logger.Error("Failed to start JetStream subscription", zap.String("subject", s.cfg.Subject), zap.String("durable", s.cfg.Durable), zap.Error(err)) return } s.logger.Info("Ingest consumer started", zap.String("subject", s.cfg.Subject), zap.String("durable", s.cfg.Durable), zap.Int("batch_size", s.cfg.BatchSize)) for { select { case <-ctx.Done(): s.logger.Info("Ingest consumer stopped") return default: } msgs, err := sub.Fetch(s.cfg.BatchSize, nats.MaxWait(s.cfg.FetchTimeout)) if err != nil { if errors.Is(err, nats.ErrTimeout) { time.Sleep(s.cfg.IdleSleep) continue } if ctx.Err() != nil { return } s.logger.Warn("Failed to fetch JetStream messages", zap.Error(err)) time.Sleep(s.cfg.IdleSleep) continue } for _, msg := range msgs { s.handleMessage(ctx, msg) } } } func (s *service) handleMessage(ctx context.Context, msg *nats.Msg) { start := time.Now() result := "ok" nak := false defer func() { if s.deps.Observer != nil { s.deps.Observer.ObserveIngest(result, time.Since(start)) } var ackErr error if nak { ackErr = msg.Nak() } else { ackErr = msg.Ack() } if ackErr != nil { s.logger.Warn("Failed to ack ingest message", zap.Bool("nak", nak), zap.Error(ackErr)) } }() envelope, err := s.deps.Events.Parse(msg.Data) if err != nil { result = "invalid_event" nak = false return } inserted, err := s.deps.InboxRepo.TryInsert(ctx, envelope.EventID, envelope.ClientID, envelope.Type, time.Now().UTC()) if err != nil { result = "inbox_error" nak = true return } if !inserted { result = "duplicate" nak = false return } endpoints, err := s.deps.Resolver.Resolve(ctx, envelope.ClientID, envelope.Type) if err != nil { result = "resolve_error" nak = true return } if len(endpoints) == 0 { result = "no_endpoints" nak = false return } payload, err := s.deps.Events.BuildPayload(ctx, envelope) if err != nil { result = "payload_error" nak = true return } if err := s.deps.TaskRepo.UpsertTasks(ctx, envelope.EventID, endpoints, payload, s.deps.TaskDefaults, time.Now().UTC()); err != nil { result = "task_error" nak = true return } }