package ledger import ( "context" "encoding/json" "errors" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tech/sendico/ledger/storage/model" me "github.com/tech/sendico/pkg/messaging/envelope" "go.mongodb.org/mongo-driver/bson/primitive" "go.uber.org/zap" ) func TestOutboxPublisherDispatchSuccess(t *testing.T) { logger := zap.NewNop() event := &model.OutboxEvent{ EventID: "entry-1", Subject: "ledger.entry.posted", Payload: []byte(`{"journalEntryRef":"abc123"}`), Attempts: 0, } event.SetID(primitive.NewObjectID()) event.OrganizationRef = primitive.NewObjectID() store := &recordingOutboxStore{ pending: []*model.OutboxEvent{event}, } producer := &stubProducer{} publisher := newOutboxPublisher(logger, store, producer) processed, err := publisher.dispatchPending(context.Background()) require.NoError(t, err) assert.Equal(t, 1, processed) require.Len(t, producer.envelopes, 1) env := producer.envelopes[0] assert.Equal(t, outboxPublisherSender, env.GetSender()) assert.Equal(t, "ledger_outbox_sent", env.GetSignature().ToString()) var message ledgerOutboxMessage require.NoError(t, json.Unmarshal(env.GetData(), &message)) assert.Equal(t, event.EventID, message.EventID) assert.Equal(t, event.Subject, message.Subject) assert.Equal(t, event.OrganizationRef.Hex(), message.OrganizationRef) require.Len(t, store.markedSent, 1) assert.Equal(t, *event.GetID(), store.markedSent[0]) assert.Empty(t, store.markedFailed) assert.Empty(t, store.incremented) } func TestOutboxPublisherDispatchFailureMarksAttempts(t *testing.T) { logger := zap.NewNop() event := &model.OutboxEvent{ EventID: "entry-2", Subject: "ledger.entry.posted", Payload: []byte(`{"journalEntryRef":"xyz789"}`), Attempts: maxOutboxDeliveryAttempts - 1, } event.SetID(primitive.NewObjectID()) event.OrganizationRef = primitive.NewObjectID() store := &recordingOutboxStore{ pending: []*model.OutboxEvent{event}, } producer := &stubProducer{err: errors.New("publish failed")} publisher := newOutboxPublisher(logger, store, producer) processed, err := publisher.dispatchPending(context.Background()) require.NoError(t, err) assert.Equal(t, 1, processed) require.Len(t, store.incremented, 1) assert.Equal(t, *event.GetID(), store.incremented[0]) require.Len(t, store.markedFailed, 1) assert.Equal(t, *event.GetID(), store.markedFailed[0]) assert.Empty(t, store.markedSent) } type recordingOutboxStore struct { mu sync.Mutex pending []*model.OutboxEvent markedSent []primitive.ObjectID markedFailed []primitive.ObjectID incremented []primitive.ObjectID } func (s *recordingOutboxStore) Create(context.Context, *model.OutboxEvent) error { return nil } func (s *recordingOutboxStore) ListPending(context.Context, int) ([]*model.OutboxEvent, error) { s.mu.Lock() defer s.mu.Unlock() events := s.pending s.pending = nil return events, nil } func (s *recordingOutboxStore) MarkSent(_ context.Context, eventRef primitive.ObjectID, sentAt time.Time) error { _ = sentAt s.mu.Lock() defer s.mu.Unlock() s.markedSent = append(s.markedSent, eventRef) return nil } func (s *recordingOutboxStore) MarkFailed(_ context.Context, eventRef primitive.ObjectID) error { s.mu.Lock() defer s.mu.Unlock() s.markedFailed = append(s.markedFailed, eventRef) return nil } func (s *recordingOutboxStore) IncrementAttempts(_ context.Context, eventRef primitive.ObjectID) error { s.mu.Lock() defer s.mu.Unlock() s.incremented = append(s.incremented, eventRef) return nil } type stubProducer struct { mu sync.Mutex envelopes []me.Envelope err error } func (p *stubProducer) SendMessage(env me.Envelope) error { p.mu.Lock() defer p.mu.Unlock() p.envelopes = append(p.envelopes, env) return p.err }