331 lines
8.3 KiB
Go
331 lines
8.3 KiB
Go
package outbox
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
me "github.com/tech/sendico/pkg/messaging/envelope"
|
|
pmessagingreliable "github.com/tech/sendico/pkg/messaging/reliable"
|
|
domainmodel "github.com/tech/sendico/pkg/model"
|
|
notification "github.com/tech/sendico/pkg/model/notification"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func TestGatewayReliableProducerPersistsAndRetriesOnBrokerFailure(t *testing.T) {
|
|
store := newMemoryOutboxStore()
|
|
broker := &flakyDirectProducer{failuresRemaining: 1}
|
|
|
|
producer, _, err := NewReliableProducer(
|
|
zap.NewNop(),
|
|
broker,
|
|
store,
|
|
nil,
|
|
pmessagingreliable.WithBatchSize(1),
|
|
pmessagingreliable.WithMaxAttempts(3),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("failed to create reliable producer: %v", err)
|
|
}
|
|
|
|
env := newTestEnvelope(t, []byte(`{"transferRef":"tx-1","status":"pending"}`))
|
|
if err := producer.SendWithOutbox(context.Background(), env); err != nil {
|
|
t.Fatalf("failed to enqueue envelope into outbox: %v", err)
|
|
}
|
|
|
|
eventID := env.GetMessageId().String()
|
|
persisted := store.EventByID(eventID)
|
|
if persisted == nil {
|
|
t.Fatalf("expected outbox event %s to be persisted", eventID)
|
|
}
|
|
if persisted.Status != StatusPending {
|
|
t.Fatalf("expected pending status after enqueue, got %q", persisted.Status)
|
|
}
|
|
if persisted.Attempts != 0 {
|
|
t.Fatalf("expected zero attempts after enqueue, got %d", persisted.Attempts)
|
|
}
|
|
|
|
processed, err := producer.DispatchPending(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("first dispatch failed: %v", err)
|
|
}
|
|
if processed != 1 {
|
|
t.Fatalf("expected first dispatch to process 1 event, got %d", processed)
|
|
}
|
|
|
|
afterFailure := store.EventByID(eventID)
|
|
if afterFailure == nil {
|
|
t.Fatalf("expected outbox event %s to exist after broker failure", eventID)
|
|
}
|
|
if afterFailure.Status != StatusPending {
|
|
t.Fatalf("expected event to stay pending after transient broker error, got %q", afterFailure.Status)
|
|
}
|
|
if afterFailure.Attempts != 1 {
|
|
t.Fatalf("expected attempts to increment to 1 after failure, got %d", afterFailure.Attempts)
|
|
}
|
|
if afterFailure.SentAt != nil {
|
|
t.Fatalf("expected sentAt to be empty after failed publish")
|
|
}
|
|
|
|
processed, err = producer.DispatchPending(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("second dispatch failed: %v", err)
|
|
}
|
|
if processed != 1 {
|
|
t.Fatalf("expected second dispatch to process 1 event, got %d", processed)
|
|
}
|
|
|
|
afterRetry := store.EventByID(eventID)
|
|
if afterRetry == nil {
|
|
t.Fatalf("expected outbox event %s to exist after retry", eventID)
|
|
}
|
|
if afterRetry.Status != StatusSent {
|
|
t.Fatalf("expected event to be sent after retry, got %q", afterRetry.Status)
|
|
}
|
|
if afterRetry.Attempts != 1 {
|
|
t.Fatalf("expected attempts to remain 1 after successful retry, got %d", afterRetry.Attempts)
|
|
}
|
|
if afterRetry.SentAt == nil {
|
|
t.Fatalf("expected sentAt to be set after successful publish")
|
|
}
|
|
|
|
if attempts := broker.Attempts(); attempts != 2 {
|
|
t.Fatalf("expected two broker attempts (fail then success), got %d", attempts)
|
|
}
|
|
}
|
|
|
|
func TestGatewayReliableProducerMarksFailedAfterMaxAttempts(t *testing.T) {
|
|
store := newMemoryOutboxStore()
|
|
broker := &flakyDirectProducer{failuresRemaining: 10}
|
|
|
|
producer, _, err := NewReliableProducer(
|
|
zap.NewNop(),
|
|
broker,
|
|
store,
|
|
nil,
|
|
pmessagingreliable.WithBatchSize(1),
|
|
pmessagingreliable.WithMaxAttempts(2),
|
|
)
|
|
if err != nil {
|
|
t.Fatalf("failed to create reliable producer: %v", err)
|
|
}
|
|
|
|
env := newTestEnvelope(t, []byte(`{"transferRef":"tx-2","status":"pending"}`))
|
|
if err := producer.SendWithOutbox(context.Background(), env); err != nil {
|
|
t.Fatalf("failed to enqueue envelope into outbox: %v", err)
|
|
}
|
|
|
|
eventID := env.GetMessageId().String()
|
|
|
|
processed, err := producer.DispatchPending(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("first dispatch failed: %v", err)
|
|
}
|
|
if processed != 1 {
|
|
t.Fatalf("expected first dispatch to process 1 event, got %d", processed)
|
|
}
|
|
|
|
processed, err = producer.DispatchPending(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("second dispatch failed: %v", err)
|
|
}
|
|
if processed != 1 {
|
|
t.Fatalf("expected second dispatch to process 1 event, got %d", processed)
|
|
}
|
|
|
|
processed, err = producer.DispatchPending(context.Background())
|
|
if err != nil {
|
|
t.Fatalf("third dispatch failed: %v", err)
|
|
}
|
|
if processed != 0 {
|
|
t.Fatalf("expected failed event to be excluded from pending queue, got processed=%d", processed)
|
|
}
|
|
|
|
final := store.EventByID(eventID)
|
|
if final == nil {
|
|
t.Fatalf("expected outbox event %s to exist", eventID)
|
|
}
|
|
if final.Status != StatusFailed {
|
|
t.Fatalf("expected event to be marked failed after max attempts, got %q", final.Status)
|
|
}
|
|
if final.Attempts != 2 {
|
|
t.Fatalf("expected attempts to equal max attempts (2), got %d", final.Attempts)
|
|
}
|
|
if final.SentAt != nil {
|
|
t.Fatalf("expected sentAt to remain empty for failed event")
|
|
}
|
|
}
|
|
|
|
func newTestEnvelope(t *testing.T, payload []byte) me.Envelope {
|
|
t.Helper()
|
|
|
|
env := me.CreateEnvelope("gateway.common.outbox.test", domainmodel.NewNotification(mservice.ChainGateway, notification.NAUpdated))
|
|
if _, err := env.Wrap(payload); err != nil {
|
|
t.Fatalf("failed to wrap test payload: %v", err)
|
|
}
|
|
|
|
return env
|
|
}
|
|
|
|
type memoryOutboxStore struct {
|
|
mu sync.Mutex
|
|
|
|
eventsByRef map[bson.ObjectID]*Event
|
|
refByEvent map[string]bson.ObjectID
|
|
}
|
|
|
|
func newMemoryOutboxStore() *memoryOutboxStore {
|
|
return &memoryOutboxStore{
|
|
eventsByRef: make(map[bson.ObjectID]*Event),
|
|
refByEvent: make(map[string]bson.ObjectID),
|
|
}
|
|
}
|
|
|
|
func (s *memoryOutboxStore) Create(_ context.Context, event *Event) error {
|
|
if event == nil {
|
|
return errors.New("event is nil")
|
|
}
|
|
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
eventID := strings.TrimSpace(event.EventID)
|
|
if eventID == "" {
|
|
return errors.New("event id is required")
|
|
}
|
|
if _, exists := s.refByEvent[eventID]; exists {
|
|
return errors.New("duplicate event id")
|
|
}
|
|
|
|
stored := cloneEvent(event)
|
|
stored.SetID(bson.NewObjectID())
|
|
if stored.Status == "" {
|
|
stored.Status = StatusPending
|
|
}
|
|
|
|
ref := *stored.GetID()
|
|
s.eventsByRef[ref] = stored
|
|
s.refByEvent[eventID] = ref
|
|
return nil
|
|
}
|
|
|
|
func (s *memoryOutboxStore) ListPending(_ context.Context, limit int) ([]*Event, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
pending := make([]*Event, 0, len(s.eventsByRef))
|
|
for _, event := range s.eventsByRef {
|
|
if event.Status == StatusPending {
|
|
pending = append(pending, cloneEvent(event))
|
|
}
|
|
}
|
|
sort.Slice(pending, func(i, j int) bool {
|
|
return pending[i].CreatedAt.Before(pending[j].CreatedAt)
|
|
})
|
|
|
|
if limit > 0 && len(pending) > limit {
|
|
pending = pending[:limit]
|
|
}
|
|
return pending, nil
|
|
}
|
|
|
|
func (s *memoryOutboxStore) MarkSent(_ context.Context, eventRef bson.ObjectID, sentAt time.Time) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
event, ok := s.eventsByRef[eventRef]
|
|
if !ok {
|
|
return errors.New("event not found")
|
|
}
|
|
event.Status = StatusSent
|
|
when := sentAt.UTC()
|
|
event.SentAt = &when
|
|
event.Update()
|
|
return nil
|
|
}
|
|
|
|
func (s *memoryOutboxStore) MarkFailed(_ context.Context, eventRef bson.ObjectID) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
event, ok := s.eventsByRef[eventRef]
|
|
if !ok {
|
|
return errors.New("event not found")
|
|
}
|
|
event.Status = StatusFailed
|
|
event.Update()
|
|
return nil
|
|
}
|
|
|
|
func (s *memoryOutboxStore) IncrementAttempts(_ context.Context, eventRef bson.ObjectID) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
event, ok := s.eventsByRef[eventRef]
|
|
if !ok {
|
|
return errors.New("event not found")
|
|
}
|
|
event.Attempts++
|
|
event.Update()
|
|
return nil
|
|
}
|
|
|
|
func (s *memoryOutboxStore) EventByID(eventID string) *Event {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
ref, ok := s.refByEvent[eventID]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
event, ok := s.eventsByRef[ref]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return cloneEvent(event)
|
|
}
|
|
|
|
func cloneEvent(event *Event) *Event {
|
|
if event == nil {
|
|
return nil
|
|
}
|
|
copyEvent := *event
|
|
copyEvent.Payload = append([]byte(nil), event.Payload...)
|
|
if event.SentAt != nil {
|
|
sentAt := *event.SentAt
|
|
copyEvent.SentAt = &sentAt
|
|
}
|
|
return ©Event
|
|
}
|
|
|
|
type flakyDirectProducer struct {
|
|
mu sync.Mutex
|
|
failuresRemaining int
|
|
attempts int
|
|
}
|
|
|
|
func (p *flakyDirectProducer) SendMessage(_ me.Envelope) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
p.attempts++
|
|
if p.failuresRemaining > 0 {
|
|
p.failuresRemaining--
|
|
return errors.New("broker unavailable")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *flakyDirectProducer) Attempts() int {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
return p.attempts
|
|
}
|