outbox for gateways
This commit is contained in:
238
api/pkg/messaging/reliable/producer.go
Normal file
238
api/pkg/messaging/reliable/producer.go
Normal file
@@ -0,0 +1,238 @@
|
||||
package reliable
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/tech/sendico/pkg/merrors"
|
||||
pmessaging "github.com/tech/sendico/pkg/messaging"
|
||||
me "github.com/tech/sendico/pkg/messaging/envelope"
|
||||
"github.com/tech/sendico/pkg/mlogger"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultBatchSize = 100
|
||||
defaultPollInterval = time.Second
|
||||
defaultMaxAttempts = 5
|
||||
)
|
||||
|
||||
type OutboxMessage struct {
|
||||
Reference string
|
||||
EventID string
|
||||
Subject string
|
||||
Payload []byte
|
||||
Attempts int
|
||||
OrganizationRef string
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
type OutboxStore interface {
|
||||
Enqueue(ctx context.Context, msg OutboxMessage) error
|
||||
ListPending(ctx context.Context, limit int) ([]OutboxMessage, error)
|
||||
MarkSent(ctx context.Context, reference string, sentAt time.Time) error
|
||||
MarkFailed(ctx context.Context, reference string) error
|
||||
IncrementAttempts(ctx context.Context, reference string) error
|
||||
}
|
||||
|
||||
type EnvelopeDecoder func(record OutboxMessage) (me.Envelope, error)
|
||||
|
||||
type Option func(*ReliableProducer)
|
||||
|
||||
type ReliableProducer struct {
|
||||
logger mlogger.Logger
|
||||
direct pmessaging.Producer
|
||||
outbox OutboxStore
|
||||
batchSize int
|
||||
pollInterval time.Duration
|
||||
maxAttempts int
|
||||
decode EnvelopeDecoder
|
||||
}
|
||||
|
||||
func NewReliableProducer(logger mlogger.Logger, direct pmessaging.Producer, outbox OutboxStore, opts ...Option) *ReliableProducer {
|
||||
if logger == nil {
|
||||
logger = zap.NewNop()
|
||||
}
|
||||
res := &ReliableProducer{
|
||||
logger: logger.Named("reliable_producer"),
|
||||
direct: direct,
|
||||
outbox: outbox,
|
||||
batchSize: defaultBatchSize,
|
||||
pollInterval: defaultPollInterval,
|
||||
maxAttempts: defaultMaxAttempts,
|
||||
decode: defaultEnvelopeDecoder,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
if opt != nil {
|
||||
opt(res)
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func WithBatchSize(size int) Option {
|
||||
return func(p *ReliableProducer) {
|
||||
if size > 0 {
|
||||
p.batchSize = size
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithPollInterval(interval time.Duration) Option {
|
||||
return func(p *ReliableProducer) {
|
||||
if interval > 0 {
|
||||
p.pollInterval = interval
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxAttempts(maxAttempts int) Option {
|
||||
return func(p *ReliableProducer) {
|
||||
p.maxAttempts = maxAttempts
|
||||
}
|
||||
}
|
||||
|
||||
func WithEnvelopeDecoder(decoder EnvelopeDecoder) Option {
|
||||
return func(p *ReliableProducer) {
|
||||
if decoder != nil {
|
||||
p.decode = decoder
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ReliableProducer) SendMessage(envelope me.Envelope) error {
|
||||
if p == nil {
|
||||
return merrors.Internal("reliable producer is nil")
|
||||
}
|
||||
if p.direct == nil {
|
||||
return merrors.Internal("reliable producer direct publisher is not configured")
|
||||
}
|
||||
return p.direct.SendMessage(envelope)
|
||||
}
|
||||
|
||||
func (p *ReliableProducer) SendWithOutbox(ctx context.Context, envelope me.Envelope) error {
|
||||
if p == nil {
|
||||
return merrors.Internal("reliable producer is nil")
|
||||
}
|
||||
if envelope == nil {
|
||||
return merrors.InvalidArgument("envelope is required")
|
||||
}
|
||||
if p.outbox == nil {
|
||||
return merrors.Internal("reliable producer outbox store is not configured")
|
||||
}
|
||||
|
||||
data, err := envelope.Serialize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eventID := envelope.GetMessageId().String()
|
||||
if _, err = uuid.Parse(eventID); err != nil {
|
||||
return merrors.InvalidArgument("envelope message id is invalid")
|
||||
}
|
||||
|
||||
return p.outbox.Enqueue(ctx, OutboxMessage{
|
||||
EventID: eventID,
|
||||
Subject: envelope.GetSignature().ToString(),
|
||||
Payload: data,
|
||||
})
|
||||
}
|
||||
|
||||
func (p *ReliableProducer) Run(ctx context.Context) {
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
if p.outbox == nil {
|
||||
p.logger.Warn("Outbox dispatcher disabled: store is not configured")
|
||||
return
|
||||
}
|
||||
if p.direct == nil {
|
||||
p.logger.Warn("Outbox dispatcher disabled: direct producer is not configured")
|
||||
return
|
||||
}
|
||||
|
||||
p.logger.Info("Outbox dispatcher started")
|
||||
defer p.logger.Info("Outbox dispatcher stopped")
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
processed, err := p.DispatchPending(ctx)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
p.logger.Warn("Failed to dispatch outbox events", zap.Error(err))
|
||||
}
|
||||
if processed == 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(p.pollInterval):
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ReliableProducer) DispatchPending(ctx context.Context) (int, error) {
|
||||
if p == nil || p.outbox == nil || p.direct == nil {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
events, err := p.outbox.ListPending(ctx, p.batchSize)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
if ctx.Err() != nil {
|
||||
return len(events), ctx.Err()
|
||||
}
|
||||
|
||||
env, decodeErr := p.decode(event)
|
||||
if decodeErr != nil {
|
||||
p.logger.Warn("Failed to decode outbox envelope", zap.String("event_id", event.EventID), zap.Error(decodeErr))
|
||||
p.handleFailure(ctx, event, decodeErr)
|
||||
continue
|
||||
}
|
||||
|
||||
if sendErr := p.direct.SendMessage(env); sendErr != nil {
|
||||
p.logger.Warn("Failed to publish outbox event", zap.String("event_id", event.EventID), zap.String("subject", event.Subject), zap.Error(sendErr))
|
||||
p.handleFailure(ctx, event, sendErr)
|
||||
continue
|
||||
}
|
||||
|
||||
if markErr := p.outbox.MarkSent(ctx, event.Reference, time.Now().UTC()); markErr != nil {
|
||||
p.logger.Warn("Failed to mark outbox event sent", zap.String("event_id", event.EventID), zap.String("reference", event.Reference), zap.Error(markErr))
|
||||
}
|
||||
}
|
||||
|
||||
return len(events), nil
|
||||
}
|
||||
|
||||
func (p *ReliableProducer) handleFailure(ctx context.Context, event OutboxMessage, _ error) {
|
||||
if p == nil || p.outbox == nil {
|
||||
return
|
||||
}
|
||||
if event.Reference == "" {
|
||||
p.logger.Warn("Cannot record outbox failure: missing record reference", zap.String("event_id", event.EventID))
|
||||
return
|
||||
}
|
||||
|
||||
if err := p.outbox.IncrementAttempts(ctx, event.Reference); err != nil && !errors.Is(err, context.Canceled) {
|
||||
p.logger.Warn("Failed to increment outbox attempts", zap.String("event_id", event.EventID), zap.String("reference", event.Reference), zap.Error(err))
|
||||
}
|
||||
|
||||
if p.maxAttempts > 0 && event.Attempts+1 >= p.maxAttempts {
|
||||
if err := p.outbox.MarkFailed(ctx, event.Reference); err != nil && !errors.Is(err, context.Canceled) {
|
||||
p.logger.Warn("Failed to mark outbox event failed", zap.String("event_id", event.EventID), zap.String("reference", event.Reference), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func defaultEnvelopeDecoder(record OutboxMessage) (me.Envelope, error) {
|
||||
return me.Deserialize(record.Payload)
|
||||
}
|
||||
|
||||
var _ pmessaging.ReliableProducer = (*ReliableProducer)(nil)
|
||||
Reference in New Issue
Block a user