package reliable import ( "context" "errors" "time" "github.com/google/uuid" "github.com/tech/sendico/pkg/merrors" pmessaging "github.com/tech/sendico/pkg/messaging" me "github.com/tech/sendico/pkg/messaging/envelope" "github.com/tech/sendico/pkg/mlogger" "go.uber.org/zap" ) const ( defaultBatchSize = 100 defaultPollInterval = time.Second defaultMaxAttempts = 5 ) type OutboxMessage struct { Reference string EventID string Subject string Payload []byte Attempts int OrganizationRef string CreatedAt time.Time } type OutboxStore interface { Enqueue(ctx context.Context, msg OutboxMessage) error ListPending(ctx context.Context, limit int) ([]OutboxMessage, error) MarkSent(ctx context.Context, reference string, sentAt time.Time) error MarkFailed(ctx context.Context, reference string) error IncrementAttempts(ctx context.Context, reference string) error } type EnvelopeDecoder func(record OutboxMessage) (me.Envelope, error) type Option func(*ReliableProducer) type ReliableProducer struct { logger mlogger.Logger direct pmessaging.Producer outbox OutboxStore batchSize int pollInterval time.Duration maxAttempts int decode EnvelopeDecoder } func NewReliableProducer(logger mlogger.Logger, direct pmessaging.Producer, outbox OutboxStore, opts ...Option) *ReliableProducer { if logger == nil { logger = zap.NewNop() } res := &ReliableProducer{ logger: logger.Named("reliable_producer"), direct: direct, outbox: outbox, batchSize: defaultBatchSize, pollInterval: defaultPollInterval, maxAttempts: defaultMaxAttempts, decode: defaultEnvelopeDecoder, } for _, opt := range opts { if opt != nil { opt(res) } } return res } func WithBatchSize(size int) Option { return func(p *ReliableProducer) { if size > 0 { p.batchSize = size } } } func WithPollInterval(interval time.Duration) Option { return func(p *ReliableProducer) { if interval > 0 { p.pollInterval = interval } } } func WithMaxAttempts(maxAttempts int) Option { return func(p *ReliableProducer) { p.maxAttempts = maxAttempts } } func WithEnvelopeDecoder(decoder EnvelopeDecoder) Option { return func(p *ReliableProducer) { if decoder != nil { p.decode = decoder } } } func (p *ReliableProducer) SendMessage(envelope me.Envelope) error { if p == nil { return merrors.Internal("reliable producer is nil") } if p.direct == nil { return merrors.Internal("reliable producer direct publisher is not configured") } return p.direct.SendMessage(envelope) } func (p *ReliableProducer) SendWithOutbox(ctx context.Context, envelope me.Envelope) error { if p == nil { return merrors.Internal("reliable producer is nil") } if envelope == nil { return merrors.InvalidArgument("envelope is required") } if p.outbox == nil { return merrors.Internal("reliable producer outbox store is not configured") } data, err := envelope.Serialize() if err != nil { return err } eventID := envelope.GetMessageId().String() if _, err = uuid.Parse(eventID); err != nil { return merrors.InvalidArgument("envelope message id is invalid") } return p.outbox.Enqueue(ctx, OutboxMessage{ EventID: eventID, Subject: envelope.GetSignature().ToString(), Payload: data, }) } func (p *ReliableProducer) Run(ctx context.Context) { if p == nil { return } if p.outbox == nil { p.logger.Warn("Outbox dispatcher disabled: store is not configured") return } if p.direct == nil { p.logger.Warn("Outbox dispatcher disabled: direct producer is not configured") return } p.logger.Info("Outbox dispatcher started") defer p.logger.Info("Outbox dispatcher stopped") for { if ctx.Err() != nil { return } processed, err := p.DispatchPending(ctx) if err != nil && !errors.Is(err, context.Canceled) { p.logger.Warn("Failed to dispatch outbox events", zap.Error(err)) } if processed == 0 { select { case <-ctx.Done(): return case <-time.After(p.pollInterval): } } } } func (p *ReliableProducer) DispatchPending(ctx context.Context) (int, error) { if p == nil || p.outbox == nil || p.direct == nil { return 0, nil } events, err := p.outbox.ListPending(ctx, p.batchSize) if err != nil { return 0, err } for _, event := range events { if ctx.Err() != nil { return len(events), ctx.Err() } env, decodeErr := p.decode(event) if decodeErr != nil { p.logger.Warn("Failed to decode outbox envelope", zap.String("event_id", event.EventID), zap.Error(decodeErr)) p.handleFailure(ctx, event, decodeErr) continue } if sendErr := p.direct.SendMessage(env); sendErr != nil { p.logger.Warn("Failed to publish outbox event", zap.String("event_id", event.EventID), zap.String("subject", event.Subject), zap.Error(sendErr)) p.handleFailure(ctx, event, sendErr) continue } if markErr := p.outbox.MarkSent(ctx, event.Reference, time.Now().UTC()); markErr != nil { p.logger.Warn("Failed to mark outbox event sent", zap.String("event_id", event.EventID), zap.String("reference", event.Reference), zap.Error(markErr)) } } return len(events), nil } func (p *ReliableProducer) handleFailure(ctx context.Context, event OutboxMessage, _ error) { if p == nil || p.outbox == nil { return } if event.Reference == "" { p.logger.Warn("Cannot record outbox failure: missing record reference", zap.String("event_id", event.EventID)) return } if err := p.outbox.IncrementAttempts(ctx, event.Reference); err != nil && !errors.Is(err, context.Canceled) { p.logger.Warn("Failed to increment outbox attempts", zap.String("event_id", event.EventID), zap.String("reference", event.Reference), zap.Error(err)) } if p.maxAttempts > 0 && event.Attempts+1 >= p.maxAttempts { if err := p.outbox.MarkFailed(ctx, event.Reference); err != nil && !errors.Is(err, context.Canceled) { p.logger.Warn("Failed to mark outbox event failed", zap.String("event_id", event.EventID), zap.String("reference", event.Reference), zap.Error(err)) } } } func defaultEnvelopeDecoder(record OutboxMessage) (me.Envelope, error) { return me.Deserialize(record.Payload) } var _ pmessaging.ReliableProducer = (*ReliableProducer)(nil)