205 lines
4.6 KiB
Go
205 lines
4.6 KiB
Go
package ingest
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type service struct {
|
|
logger mlogger.Logger
|
|
js nats.JetStreamContext
|
|
cfg Config
|
|
deps Dependencies
|
|
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
once sync.Once
|
|
stop sync.Once
|
|
}
|
|
|
|
func newService(deps Dependencies) (Service, error) {
|
|
if deps.JetStream == nil {
|
|
return nil, merrors.InvalidArgument("ingest: jetstream context is required", "jetstream")
|
|
}
|
|
if deps.Events == nil {
|
|
return nil, merrors.InvalidArgument("ingest: events service is required", "events")
|
|
}
|
|
if deps.Resolver == nil {
|
|
return nil, merrors.InvalidArgument("ingest: subscriptions resolver is required", "resolver")
|
|
}
|
|
if deps.InboxRepo == nil {
|
|
return nil, merrors.InvalidArgument("ingest: inbox repo is required", "inboxRepo")
|
|
}
|
|
if deps.TaskRepo == nil {
|
|
return nil, merrors.InvalidArgument("ingest: task repo is required", "taskRepo")
|
|
}
|
|
if strings.TrimSpace(deps.Config.Subject) == "" {
|
|
return nil, merrors.InvalidArgument("ingest: subject is required", "config.subject")
|
|
}
|
|
if strings.TrimSpace(deps.Config.Durable) == "" {
|
|
return nil, merrors.InvalidArgument("ingest: durable is required", "config.durable")
|
|
}
|
|
if deps.Config.BatchSize <= 0 {
|
|
deps.Config.BatchSize = 1
|
|
}
|
|
if deps.Config.FetchTimeout <= 0 {
|
|
deps.Config.FetchTimeout = 2 * time.Second
|
|
}
|
|
if deps.Config.IdleSleep <= 0 {
|
|
deps.Config.IdleSleep = 500 * time.Millisecond
|
|
}
|
|
|
|
logger := deps.Logger
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
return &service{
|
|
logger: logger.Named("ingest"),
|
|
js: deps.JetStream,
|
|
cfg: deps.Config,
|
|
deps: deps,
|
|
}, nil
|
|
}
|
|
|
|
func (s *service) Start(ctx context.Context) {
|
|
s.once.Do(func() {
|
|
runCtx := ctx
|
|
if runCtx == nil {
|
|
runCtx = context.Background()
|
|
}
|
|
runCtx, s.cancel = context.WithCancel(runCtx)
|
|
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
s.run(runCtx)
|
|
}()
|
|
})
|
|
}
|
|
|
|
func (s *service) Stop() {
|
|
s.stop.Do(func() {
|
|
if s.cancel != nil {
|
|
s.cancel()
|
|
}
|
|
s.wg.Wait()
|
|
})
|
|
}
|
|
|
|
func (s *service) run(ctx context.Context) {
|
|
subOpts := []nats.SubOpt{}
|
|
if stream := strings.TrimSpace(s.cfg.Stream); stream != "" {
|
|
subOpts = append(subOpts, nats.BindStream(stream))
|
|
}
|
|
|
|
sub, err := s.js.PullSubscribe(strings.TrimSpace(s.cfg.Subject), strings.TrimSpace(s.cfg.Durable), subOpts...)
|
|
if err != nil {
|
|
s.logger.Error("Failed to start JetStream subscription", zap.String("subject", s.cfg.Subject), zap.String("durable", s.cfg.Durable), zap.Error(err))
|
|
return
|
|
}
|
|
|
|
s.logger.Info("Ingest consumer started", zap.String("subject", s.cfg.Subject), zap.String("durable", s.cfg.Durable), zap.Int("batch_size", s.cfg.BatchSize))
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
s.logger.Info("Ingest consumer stopped")
|
|
return
|
|
default:
|
|
}
|
|
|
|
msgs, err := sub.Fetch(s.cfg.BatchSize, nats.MaxWait(s.cfg.FetchTimeout))
|
|
if err != nil {
|
|
if errors.Is(err, nats.ErrTimeout) {
|
|
time.Sleep(s.cfg.IdleSleep)
|
|
continue
|
|
}
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
s.logger.Warn("Failed to fetch JetStream messages", zap.Error(err))
|
|
time.Sleep(s.cfg.IdleSleep)
|
|
continue
|
|
}
|
|
|
|
for _, msg := range msgs {
|
|
s.handleMessage(ctx, msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *service) handleMessage(ctx context.Context, msg *nats.Msg) {
|
|
start := time.Now()
|
|
result := "ok"
|
|
nak := false
|
|
|
|
defer func() {
|
|
if s.deps.Observer != nil {
|
|
s.deps.Observer.ObserveIngest(result, time.Since(start))
|
|
}
|
|
|
|
var ackErr error
|
|
if nak {
|
|
ackErr = msg.Nak()
|
|
} else {
|
|
ackErr = msg.Ack()
|
|
}
|
|
if ackErr != nil {
|
|
s.logger.Warn("Failed to ack ingest message", zap.Bool("nak", nak), zap.Error(ackErr))
|
|
}
|
|
}()
|
|
|
|
envelope, err := s.deps.Events.Parse(msg.Data)
|
|
if err != nil {
|
|
result = "invalid_event"
|
|
nak = false
|
|
return
|
|
}
|
|
|
|
inserted, err := s.deps.InboxRepo.TryInsert(ctx, envelope.EventID, envelope.ClientID, envelope.Type, time.Now().UTC())
|
|
if err != nil {
|
|
result = "inbox_error"
|
|
nak = true
|
|
return
|
|
}
|
|
if !inserted {
|
|
result = "duplicate"
|
|
nak = false
|
|
return
|
|
}
|
|
|
|
endpoints, err := s.deps.Resolver.Resolve(ctx, envelope.ClientID, envelope.Type)
|
|
if err != nil {
|
|
result = "resolve_error"
|
|
nak = true
|
|
return
|
|
}
|
|
if len(endpoints) == 0 {
|
|
result = "no_endpoints"
|
|
nak = false
|
|
return
|
|
}
|
|
|
|
payload, err := s.deps.Events.BuildPayload(ctx, envelope)
|
|
if err != nil {
|
|
result = "payload_error"
|
|
nak = true
|
|
return
|
|
}
|
|
|
|
if err := s.deps.TaskRepo.UpsertTasks(ctx, envelope.EventID, endpoints, payload, s.deps.TaskDefaults, time.Now().UTC()); err != nil {
|
|
result = "task_error"
|
|
nak = true
|
|
return
|
|
}
|
|
}
|