Files
sendico/api/ledger/internal/service/ledger/outbox_publisher_test.go
Stephan D 2ee17b0c46
Some checks failed
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/fx/1 Pipeline failed
ci/woodpecker/push/fx/2 Pipeline failed
ci/woodpecker/push/nats Pipeline was successful
fx build fix
2025-11-07 23:50:48 +01:00

143 lines
3.8 KiB
Go

package ledger
import (
"context"
"encoding/json"
"errors"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tech/sendico/ledger/storage/model"
me "github.com/tech/sendico/pkg/messaging/envelope"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.uber.org/zap"
)
func TestOutboxPublisherDispatchSuccess(t *testing.T) {
logger := zap.NewNop()
event := &model.OutboxEvent{
EventID: "entry-1",
Subject: "ledger.entry.posted",
Payload: []byte(`{"journalEntryRef":"abc123"}`),
Attempts: 0,
}
event.SetID(primitive.NewObjectID())
event.OrganizationRef = primitive.NewObjectID()
store := &recordingOutboxStore{
pending: []*model.OutboxEvent{event},
}
producer := &stubProducer{}
publisher := newOutboxPublisher(logger, store, producer)
processed, err := publisher.dispatchPending(context.Background())
require.NoError(t, err)
assert.Equal(t, 1, processed)
require.Len(t, producer.envelopes, 1)
env := producer.envelopes[0]
assert.Equal(t, outboxPublisherSender, env.GetSender())
assert.Equal(t, "ledger_outbox_sent", env.GetSignature().ToString())
var message ledgerOutboxMessage
require.NoError(t, json.Unmarshal(env.GetData(), &message))
assert.Equal(t, event.EventID, message.EventID)
assert.Equal(t, event.Subject, message.Subject)
assert.Equal(t, event.OrganizationRef.Hex(), message.OrganizationRef)
require.Len(t, store.markedSent, 1)
assert.Equal(t, *event.GetID(), store.markedSent[0])
assert.Empty(t, store.markedFailed)
assert.Empty(t, store.incremented)
}
func TestOutboxPublisherDispatchFailureMarksAttempts(t *testing.T) {
logger := zap.NewNop()
event := &model.OutboxEvent{
EventID: "entry-2",
Subject: "ledger.entry.posted",
Payload: []byte(`{"journalEntryRef":"xyz789"}`),
Attempts: maxOutboxDeliveryAttempts - 1,
}
event.SetID(primitive.NewObjectID())
event.OrganizationRef = primitive.NewObjectID()
store := &recordingOutboxStore{
pending: []*model.OutboxEvent{event},
}
producer := &stubProducer{err: errors.New("publish failed")}
publisher := newOutboxPublisher(logger, store, producer)
processed, err := publisher.dispatchPending(context.Background())
require.NoError(t, err)
assert.Equal(t, 1, processed)
require.Len(t, store.incremented, 1)
assert.Equal(t, *event.GetID(), store.incremented[0])
require.Len(t, store.markedFailed, 1)
assert.Equal(t, *event.GetID(), store.markedFailed[0])
assert.Empty(t, store.markedSent)
}
type recordingOutboxStore struct {
mu sync.Mutex
pending []*model.OutboxEvent
markedSent []primitive.ObjectID
markedFailed []primitive.ObjectID
incremented []primitive.ObjectID
}
func (s *recordingOutboxStore) Create(context.Context, *model.OutboxEvent) error {
return nil
}
func (s *recordingOutboxStore) ListPending(context.Context, int) ([]*model.OutboxEvent, error) {
s.mu.Lock()
defer s.mu.Unlock()
events := s.pending
s.pending = nil
return events, nil
}
func (s *recordingOutboxStore) MarkSent(_ context.Context, eventRef primitive.ObjectID, sentAt time.Time) error {
_ = sentAt
s.mu.Lock()
defer s.mu.Unlock()
s.markedSent = append(s.markedSent, eventRef)
return nil
}
func (s *recordingOutboxStore) MarkFailed(_ context.Context, eventRef primitive.ObjectID) error {
s.mu.Lock()
defer s.mu.Unlock()
s.markedFailed = append(s.markedFailed, eventRef)
return nil
}
func (s *recordingOutboxStore) IncrementAttempts(_ context.Context, eventRef primitive.ObjectID) error {
s.mu.Lock()
defer s.mu.Unlock()
s.incremented = append(s.incremented, eventRef)
return nil
}
type stubProducer struct {
mu sync.Mutex
envelopes []me.Envelope
err error
}
func (p *stubProducer) SendMessage(env me.Envelope) error {
p.mu.Lock()
defer p.mu.Unlock()
p.envelopes = append(p.envelopes, env)
return p.err
}