refactored orchestrator and callbacks service to use pkg messsaging + envelope factory / handler

This commit is contained in:
Stephan D
2026-02-28 20:56:26 +01:00
parent 363d6474f2
commit 12c67361dd
14 changed files with 316 additions and 311 deletions

View File

@@ -4,10 +4,10 @@ import (
"context"
"time"
"github.com/nats-io/nats.go"
"github.com/tech/sendico/edge/callbacks/internal/events"
"github.com/tech/sendico/edge/callbacks/internal/storage"
"github.com/tech/sendico/edge/callbacks/internal/subscriptions"
mb "github.com/tech/sendico/pkg/messaging/broker"
"github.com/tech/sendico/pkg/mlogger"
)
@@ -16,21 +16,10 @@ type Observer interface {
ObserveIngest(result string, duration time.Duration)
}
// Config contains JetStream ingest settings.
type Config struct {
Stream string
Subject string
Durable string
BatchSize int
FetchTimeout time.Duration
IdleSleep time.Duration
}
// Dependencies configure the ingest service.
type Dependencies struct {
Logger mlogger.Logger
JetStream nats.JetStreamContext
Config Config
Broker mb.Broker
Events events.Service
Resolver subscriptions.Resolver
InboxRepo storage.InboxRepo
@@ -39,7 +28,7 @@ type Dependencies struct {
Observer Observer
}
// Service runs JetStream ingest workers.
// Service runs ingest workers.
type Service interface {
Start(ctx context.Context)
Stop()

View File

@@ -2,59 +2,86 @@ package ingest
import (
"context"
"encoding/json"
"errors"
"strings"
"sync"
"time"
"github.com/nats-io/nats.go"
"github.com/tech/sendico/edge/callbacks/internal/events"
"github.com/tech/sendico/pkg/merrors"
pkgmsg "github.com/tech/sendico/pkg/messaging"
cons "github.com/tech/sendico/pkg/messaging/consumer"
me "github.com/tech/sendico/pkg/messaging/envelope"
pon "github.com/tech/sendico/pkg/messaging/notifications/paymentorchestrator"
np "github.com/tech/sendico/pkg/messaging/notifications/processor"
"github.com/tech/sendico/pkg/mlogger"
"github.com/tech/sendico/pkg/model"
"go.uber.org/zap"
)
const (
loggerNameIngest = "ingest"
logFieldSubject = "subject"
errBrokerRequired = "ingest: broker is required"
errEventsRequired = "ingest: events service is required"
errResolverRequired = "ingest: subscriptions resolver is required"
errInboxRepoRequired = "ingest: inbox repo is required"
errTaskRepoRequired = "ingest: task repo is required"
configFieldBroker = "broker"
configFieldEvents = "events"
configFieldResolver = "resolver"
configFieldInboxRepo = "inboxRepo"
configFieldTaskRepo = "taskRepo"
logFailedStartConsumer = "Failed to start messaging consumer"
logIngestConsumerStarted = "Ingest consumer started"
logIngestConsumerStopped = "Ingest consumer stopped"
logIngestConsumerWarn = "Ingest consumer stopped with error"
ingestResultOK = "ok"
ingestResultEmptyPayload = "empty_payload"
ingestResultInvalidEvent = "invalid_event"
ingestResultPayloadError = "payload_error"
ingestResultInboxError = "inbox_error"
ingestResultDuplicate = "duplicate"
ingestResultResolveError = "resolve_error"
ingestResultNoEndpoints = "no_endpoints"
ingestResultTaskError = "task_error"
)
type service struct {
logger mlogger.Logger
js nats.JetStreamContext
cfg Config
deps Dependencies
event model.NotificationEvent
cancel context.CancelFunc
wg sync.WaitGroup
once sync.Once
stop sync.Once
mu sync.Mutex
consumer pkgmsg.Consumer
processor np.EnvelopeProcessor
}
func newService(deps Dependencies) (Service, error) {
if deps.JetStream == nil {
return nil, merrors.InvalidArgument("ingest: jetstream context is required", "jetstream")
if deps.Broker == nil {
return nil, merrors.InvalidArgument(errBrokerRequired, configFieldBroker)
}
if deps.Events == nil {
return nil, merrors.InvalidArgument("ingest: events service is required", "events")
return nil, merrors.InvalidArgument(errEventsRequired, configFieldEvents)
}
if deps.Resolver == nil {
return nil, merrors.InvalidArgument("ingest: subscriptions resolver is required", "resolver")
return nil, merrors.InvalidArgument(errResolverRequired, configFieldResolver)
}
if deps.InboxRepo == nil {
return nil, merrors.InvalidArgument("ingest: inbox repo is required", "inboxRepo")
return nil, merrors.InvalidArgument(errInboxRepoRequired, configFieldInboxRepo)
}
if deps.TaskRepo == nil {
return nil, merrors.InvalidArgument("ingest: task repo is required", "taskRepo")
}
if strings.TrimSpace(deps.Config.Subject) == "" {
return nil, merrors.InvalidArgument("ingest: subject is required", "config.subject")
}
if strings.TrimSpace(deps.Config.Durable) == "" {
return nil, merrors.InvalidArgument("ingest: durable is required", "config.durable")
}
if deps.Config.BatchSize <= 0 {
deps.Config.BatchSize = 1
}
if deps.Config.FetchTimeout <= 0 {
deps.Config.FetchTimeout = 2 * time.Second
}
if deps.Config.IdleSleep <= 0 {
deps.Config.IdleSleep = 500 * time.Millisecond
return nil, merrors.InvalidArgument(errTaskRepoRequired, configFieldTaskRepo)
}
logger := deps.Logger
@@ -62,12 +89,14 @@ func newService(deps Dependencies) (Service, error) {
logger = zap.NewNop()
}
return &service{
logger: logger.Named("ingest"),
js: deps.JetStream,
cfg: deps.Config,
svc := &service{
logger: logger.Named(loggerNameIngest),
deps: deps,
}, nil
}
svc.processor = pon.NewPaymentStatusUpdatedProcessor(svc.logger, svc.handlePaymentStatusUpdated)
svc.event = svc.processor.GetSubject()
return svc, nil
}
func (s *service) Start(ctx context.Context) {
@@ -91,114 +120,119 @@ func (s *service) Stop() {
if s.cancel != nil {
s.cancel()
}
s.closeConsumer()
s.wg.Wait()
})
}
func (s *service) run(ctx context.Context) {
subOpts := []nats.SubOpt{}
if stream := strings.TrimSpace(s.cfg.Stream); stream != "" {
subOpts = append(subOpts, nats.BindStream(stream))
}
sub, err := s.js.PullSubscribe(strings.TrimSpace(s.cfg.Subject), strings.TrimSpace(s.cfg.Durable), subOpts...)
consumer, err := cons.NewConsumer(s.logger, s.deps.Broker, s.event)
if err != nil {
s.logger.Error("Failed to start JetStream subscription", zap.String("subject", s.cfg.Subject), zap.String("durable", s.cfg.Durable), zap.Error(err))
s.logger.Error(logFailedStartConsumer, zap.String(logFieldSubject, s.event.ToString()), zap.Error(err))
return
}
s.setConsumer(consumer)
defer s.closeConsumer()
s.logger.Info("Ingest consumer started", zap.String("subject", s.cfg.Subject), zap.String("durable", s.cfg.Durable), zap.Int("batch_size", s.cfg.BatchSize))
for {
s.logger.Info(logIngestConsumerStarted, zap.String(logFieldSubject, s.event.ToString()))
if err := consumer.ConsumeMessages(func(messageCtx context.Context, envelope me.Envelope) error {
select {
case <-ctx.Done():
s.logger.Info("Ingest consumer stopped")
return
return ctx.Err()
default:
}
return s.processor.Process(messageCtx, envelope)
}); err != nil && !errors.Is(err, context.Canceled) {
s.logger.Warn(logIngestConsumerWarn, zap.String(logFieldSubject, s.event.ToString()), zap.Error(err))
}
s.logger.Info(logIngestConsumerStopped, zap.String(logFieldSubject, s.event.ToString()))
}
msgs, err := sub.Fetch(s.cfg.BatchSize, nats.MaxWait(s.cfg.FetchTimeout))
if err != nil {
if errors.Is(err, nats.ErrTimeout) {
time.Sleep(s.cfg.IdleSleep)
continue
}
if ctx.Err() != nil {
return
}
s.logger.Warn("Failed to fetch JetStream messages", zap.Error(err))
time.Sleep(s.cfg.IdleSleep)
continue
}
func (s *service) setConsumer(consumer pkgmsg.Consumer) {
s.mu.Lock()
s.consumer = consumer
s.mu.Unlock()
}
for _, msg := range msgs {
s.handleMessage(ctx, msg)
}
func (s *service) closeConsumer() {
s.mu.Lock()
consumer := s.consumer
s.consumer = nil
s.mu.Unlock()
if consumer != nil {
consumer.Close()
}
}
func (s *service) handleMessage(ctx context.Context, msg *nats.Msg) {
func (s *service) handlePaymentStatusUpdated(ctx context.Context, msg *model.PaymentStatusUpdated) error {
start := time.Now()
result := "ok"
nak := false
result := ingestResultOK
defer func() {
if s.deps.Observer != nil {
s.deps.Observer.ObserveIngest(result, time.Since(start))
}
var ackErr error
if nak {
ackErr = msg.Nak()
} else {
ackErr = msg.Ack()
}
if ackErr != nil {
s.logger.Warn("Failed to ack ingest message", zap.Bool("nak", nak), zap.Error(ackErr))
}
}()
envelope, err := s.deps.Events.Parse(msg.Data)
if err != nil {
result = "invalid_event"
nak = false
return
if msg == nil {
result = ingestResultEmptyPayload
return nil
}
if strings.TrimSpace(msg.EventID) == "" || strings.TrimSpace(msg.ClientID) == "" || msg.OccurredAt.IsZero() {
result = ingestResultInvalidEvent
return nil
}
inserted, err := s.deps.InboxRepo.TryInsert(ctx, envelope.EventID, envelope.ClientID, envelope.Type, time.Now().UTC())
eventType := strings.TrimSpace(msg.Type)
if eventType == "" {
eventType = model.PaymentStatusUpdatedType
}
data, err := json.Marshal(msg.Data)
if err != nil {
result = "inbox_error"
nak = true
return
result = ingestResultPayloadError
return err
}
parsed := &events.Envelope{
EventID: strings.TrimSpace(msg.EventID),
Type: eventType,
ClientID: strings.TrimSpace(msg.ClientID),
OccurredAt: msg.OccurredAt.UTC(),
PublishedAt: msg.PublishedAt.UTC(),
Data: data,
}
inserted, err := s.deps.InboxRepo.TryInsert(ctx, parsed.EventID, parsed.ClientID, parsed.Type, time.Now().UTC())
if err != nil {
result = ingestResultInboxError
return err
}
if !inserted {
result = "duplicate"
nak = false
return
result = ingestResultDuplicate
return nil
}
endpoints, err := s.deps.Resolver.Resolve(ctx, envelope.ClientID, envelope.Type)
endpoints, err := s.deps.Resolver.Resolve(ctx, parsed.ClientID, parsed.Type)
if err != nil {
result = "resolve_error"
nak = true
return
result = ingestResultResolveError
return err
}
if len(endpoints) == 0 {
result = "no_endpoints"
nak = false
return
result = ingestResultNoEndpoints
return nil
}
payload, err := s.deps.Events.BuildPayload(ctx, envelope)
payload, err := s.deps.Events.BuildPayload(ctx, parsed)
if err != nil {
result = "payload_error"
nak = true
return
result = ingestResultPayloadError
return err
}
if err := s.deps.TaskRepo.UpsertTasks(ctx, envelope.EventID, endpoints, payload, s.deps.TaskDefaults, time.Now().UTC()); err != nil {
result = "task_error"
nak = true
return
if err := s.deps.TaskRepo.UpsertTasks(ctx, parsed.EventID, endpoints, payload, s.deps.TaskDefaults, time.Now().UTC()); err != nil {
result = ingestResultTaskError
return err
}
return nil
}