Files
sendico/api/pkg/messaging/reliable/producer.go
2026-02-18 01:35:28 +01:00

239 lines
6.1 KiB
Go

package reliable
import (
"context"
"errors"
"time"
"github.com/google/uuid"
"github.com/tech/sendico/pkg/merrors"
pmessaging "github.com/tech/sendico/pkg/messaging"
me "github.com/tech/sendico/pkg/messaging/envelope"
"github.com/tech/sendico/pkg/mlogger"
"go.uber.org/zap"
)
const (
defaultBatchSize = 100
defaultPollInterval = time.Second
defaultMaxAttempts = 5
)
type OutboxMessage struct {
Reference string
EventID string
Subject string
Payload []byte
Attempts int
OrganizationRef string
CreatedAt time.Time
}
type OutboxStore interface {
Enqueue(ctx context.Context, msg OutboxMessage) error
ListPending(ctx context.Context, limit int) ([]OutboxMessage, error)
MarkSent(ctx context.Context, reference string, sentAt time.Time) error
MarkFailed(ctx context.Context, reference string) error
IncrementAttempts(ctx context.Context, reference string) error
}
type EnvelopeDecoder func(record OutboxMessage) (me.Envelope, error)
type Option func(*ReliableProducer)
type ReliableProducer struct {
logger mlogger.Logger
direct pmessaging.Producer
outbox OutboxStore
batchSize int
pollInterval time.Duration
maxAttempts int
decode EnvelopeDecoder
}
func NewReliableProducer(logger mlogger.Logger, direct pmessaging.Producer, outbox OutboxStore, opts ...Option) *ReliableProducer {
if logger == nil {
logger = zap.NewNop()
}
res := &ReliableProducer{
logger: logger.Named("reliable_producer"),
direct: direct,
outbox: outbox,
batchSize: defaultBatchSize,
pollInterval: defaultPollInterval,
maxAttempts: defaultMaxAttempts,
decode: defaultEnvelopeDecoder,
}
for _, opt := range opts {
if opt != nil {
opt(res)
}
}
return res
}
func WithBatchSize(size int) Option {
return func(p *ReliableProducer) {
if size > 0 {
p.batchSize = size
}
}
}
func WithPollInterval(interval time.Duration) Option {
return func(p *ReliableProducer) {
if interval > 0 {
p.pollInterval = interval
}
}
}
func WithMaxAttempts(maxAttempts int) Option {
return func(p *ReliableProducer) {
p.maxAttempts = maxAttempts
}
}
func WithEnvelopeDecoder(decoder EnvelopeDecoder) Option {
return func(p *ReliableProducer) {
if decoder != nil {
p.decode = decoder
}
}
}
func (p *ReliableProducer) SendMessage(envelope me.Envelope) error {
if p == nil {
return merrors.Internal("reliable producer is nil")
}
if p.direct == nil {
return merrors.Internal("reliable producer direct publisher is not configured")
}
return p.direct.SendMessage(envelope)
}
func (p *ReliableProducer) SendWithOutbox(ctx context.Context, envelope me.Envelope) error {
if p == nil {
return merrors.Internal("reliable producer is nil")
}
if envelope == nil {
return merrors.InvalidArgument("envelope is required")
}
if p.outbox == nil {
return merrors.Internal("reliable producer outbox store is not configured")
}
data, err := envelope.Serialize()
if err != nil {
return err
}
eventID := envelope.GetMessageId().String()
if _, err = uuid.Parse(eventID); err != nil {
return merrors.InvalidArgument("envelope message id is invalid")
}
return p.outbox.Enqueue(ctx, OutboxMessage{
EventID: eventID,
Subject: envelope.GetSignature().ToString(),
Payload: data,
})
}
func (p *ReliableProducer) Run(ctx context.Context) {
if p == nil {
return
}
if p.outbox == nil {
p.logger.Warn("Outbox dispatcher disabled: store is not configured")
return
}
if p.direct == nil {
p.logger.Warn("Outbox dispatcher disabled: direct producer is not configured")
return
}
p.logger.Info("Outbox dispatcher started")
defer p.logger.Info("Outbox dispatcher stopped")
for {
if ctx.Err() != nil {
return
}
processed, err := p.DispatchPending(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
p.logger.Warn("Failed to dispatch outbox events", zap.Error(err))
}
if processed == 0 {
select {
case <-ctx.Done():
return
case <-time.After(p.pollInterval):
}
}
}
}
func (p *ReliableProducer) DispatchPending(ctx context.Context) (int, error) {
if p == nil || p.outbox == nil || p.direct == nil {
return 0, nil
}
events, err := p.outbox.ListPending(ctx, p.batchSize)
if err != nil {
return 0, err
}
for _, event := range events {
if ctx.Err() != nil {
return len(events), ctx.Err()
}
env, decodeErr := p.decode(event)
if decodeErr != nil {
p.logger.Warn("Failed to decode outbox envelope", zap.String("event_id", event.EventID), zap.Error(decodeErr))
p.handleFailure(ctx, event, decodeErr)
continue
}
if sendErr := p.direct.SendMessage(env); sendErr != nil {
p.logger.Warn("Failed to publish outbox event", zap.String("event_id", event.EventID), zap.String("subject", event.Subject), zap.Error(sendErr))
p.handleFailure(ctx, event, sendErr)
continue
}
if markErr := p.outbox.MarkSent(ctx, event.Reference, time.Now().UTC()); markErr != nil {
p.logger.Warn("Failed to mark outbox event sent", zap.String("event_id", event.EventID), zap.String("reference", event.Reference), zap.Error(markErr))
}
}
return len(events), nil
}
func (p *ReliableProducer) handleFailure(ctx context.Context, event OutboxMessage, _ error) {
if p == nil || p.outbox == nil {
return
}
if event.Reference == "" {
p.logger.Warn("Cannot record outbox failure: missing record reference", zap.String("event_id", event.EventID))
return
}
if err := p.outbox.IncrementAttempts(ctx, event.Reference); err != nil && !errors.Is(err, context.Canceled) {
p.logger.Warn("Failed to increment outbox attempts", zap.String("event_id", event.EventID), zap.String("reference", event.Reference), zap.Error(err))
}
if p.maxAttempts > 0 && event.Attempts+1 >= p.maxAttempts {
if err := p.outbox.MarkFailed(ctx, event.Reference); err != nil && !errors.Is(err, context.Canceled) {
p.logger.Warn("Failed to mark outbox event failed", zap.String("event_id", event.EventID), zap.String("reference", event.Reference), zap.Error(err))
}
}
}
func defaultEnvelopeDecoder(record OutboxMessage) (me.Envelope, error) {
return me.Deserialize(record.Payload)
}
var _ pmessaging.ReliableProducer = (*ReliableProducer)(nil)