package reliable import ( "context" "errors" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" me "github.com/tech/sendico/pkg/messaging/envelope" domainmodel "github.com/tech/sendico/pkg/model" notification "github.com/tech/sendico/pkg/model/notification" "github.com/tech/sendico/pkg/mservice" "go.uber.org/zap" ) func TestReliableProducerSendWithOutbox(t *testing.T) { store := &recordingStore{} producer := NewReliableProducer(zap.NewNop(), nil, store) env := me.CreateEnvelope("test-sender", domainmodel.NewNotification(mservice.Payments, notification.NACreated)) _, err := env.Wrap([]byte(`{"ok":true}`)) require.NoError(t, err) err = producer.SendWithOutbox(context.Background(), env) require.NoError(t, err) require.Len(t, store.enqueued, 1) record := store.enqueued[0] assert.Equal(t, env.GetMessageId().String(), record.EventID) assert.Equal(t, "payments_created", record.Subject) decoded, err := me.Deserialize(record.Payload) require.NoError(t, err) assert.Equal(t, env.GetMessageId(), decoded.GetMessageId()) assert.Equal(t, env.GetSignature().ToString(), decoded.GetSignature().ToString()) } func TestReliableProducerDispatchPendingSuccess(t *testing.T) { env := me.CreateEnvelope("test-sender", domainmodel.NewNotification(mservice.Payments, notification.NAUpdated)) _, err := env.Wrap([]byte(`{"event":"updated"}`)) require.NoError(t, err) payload, err := env.Serialize() require.NoError(t, err) store := &recordingStore{ pending: []OutboxMessage{ { Reference: "ref-1", EventID: env.GetMessageId().String(), Subject: env.GetSignature().ToString(), Payload: payload, Attempts: 0, }, }, } direct := &recordingDirectProducer{} producer := NewReliableProducer(zap.NewNop(), direct, store) processed, err := producer.DispatchPending(context.Background()) require.NoError(t, err) assert.Equal(t, 1, processed) require.Len(t, direct.sent, 1) assert.Equal(t, env.GetMessageId(), direct.sent[0].GetMessageId()) require.Len(t, store.markedSent, 1) assert.Equal(t, "ref-1", store.markedSent[0]) assert.Empty(t, store.incremented) assert.Empty(t, store.markedFailed) } func TestReliableProducerDispatchPendingFailureMarksFailed(t *testing.T) { env := me.CreateEnvelope("test-sender", domainmodel.NewNotification(mservice.Payments, notification.NAUpdated)) _, err := env.Wrap([]byte(`{"event":"updated"}`)) require.NoError(t, err) payload, err := env.Serialize() require.NoError(t, err) store := &recordingStore{ pending: []OutboxMessage{ { Reference: "ref-2", EventID: env.GetMessageId().String(), Subject: env.GetSignature().ToString(), Payload: payload, Attempts: 4, }, }, } direct := &recordingDirectProducer{err: errors.New("publish failed")} producer := NewReliableProducer(zap.NewNop(), direct, store, WithMaxAttempts(5)) processed, err := producer.DispatchPending(context.Background()) require.NoError(t, err) assert.Equal(t, 1, processed) require.Len(t, store.incremented, 1) assert.Equal(t, "ref-2", store.incremented[0]) require.Len(t, store.markedFailed, 1) assert.Equal(t, "ref-2", store.markedFailed[0]) assert.Empty(t, store.markedSent) } type recordingStore struct { mu sync.Mutex enqueued []OutboxMessage pending []OutboxMessage markedSent []string markedFailed []string incremented []string } func (s *recordingStore) Enqueue(_ context.Context, msg OutboxMessage) error { s.mu.Lock() defer s.mu.Unlock() s.enqueued = append(s.enqueued, msg) return nil } func (s *recordingStore) ListPending(_ context.Context, _ int) ([]OutboxMessage, error) { s.mu.Lock() defer s.mu.Unlock() events := append([]OutboxMessage(nil), s.pending...) s.pending = nil return events, nil } func (s *recordingStore) MarkSent(_ context.Context, reference string, _ time.Time) error { s.mu.Lock() defer s.mu.Unlock() s.markedSent = append(s.markedSent, reference) return nil } func (s *recordingStore) MarkFailed(_ context.Context, reference string) error { s.mu.Lock() defer s.mu.Unlock() s.markedFailed = append(s.markedFailed, reference) return nil } func (s *recordingStore) IncrementAttempts(_ context.Context, reference string) error { s.mu.Lock() defer s.mu.Unlock() s.incremented = append(s.incremented, reference) return nil } type recordingDirectProducer struct { mu sync.Mutex sent []me.Envelope err error } func (p *recordingDirectProducer) SendMessage(env me.Envelope) error { p.mu.Lock() defer p.mu.Unlock() p.sent = append(p.sent, env) return p.err }