208 lines
5.9 KiB
Go
208 lines
5.9 KiB
Go
package ledger
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/ledger/storage"
|
|
ledgerModel "github.com/tech/sendico/ledger/storage/model"
|
|
"github.com/tech/sendico/pkg/merrors"
|
|
pmessaging "github.com/tech/sendico/pkg/messaging"
|
|
me "github.com/tech/sendico/pkg/messaging/envelope"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
domainmodel "github.com/tech/sendico/pkg/model"
|
|
notification "github.com/tech/sendico/pkg/model/notification"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
defaultOutboxBatchSize = 100
|
|
defaultOutboxPollInterval = time.Second
|
|
maxOutboxDeliveryAttempts = 5
|
|
outboxPublisherSender = "ledger.outbox.publisher"
|
|
)
|
|
|
|
type outboxPublisher struct {
|
|
logger mlogger.Logger
|
|
store storage.OutboxStore
|
|
producer pmessaging.Producer
|
|
|
|
batchSize int
|
|
pollInterval time.Duration
|
|
}
|
|
|
|
func newOutboxPublisher(logger mlogger.Logger, store storage.OutboxStore, producer pmessaging.Producer) *outboxPublisher {
|
|
return &outboxPublisher{
|
|
logger: logger.Named("outbox.publisher"),
|
|
store: store,
|
|
producer: producer,
|
|
batchSize: defaultOutboxBatchSize,
|
|
pollInterval: defaultOutboxPollInterval,
|
|
}
|
|
}
|
|
|
|
func (p *outboxPublisher) run(ctx context.Context) {
|
|
p.logger.Info("started")
|
|
defer p.logger.Info("stopped")
|
|
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
processed, err := p.dispatchPending(ctx)
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
p.logger.Warn("failed to dispatch ledger outbox events", zap.Error(err))
|
|
}
|
|
if processed > 0 {
|
|
p.logger.Debug("dispatched ledger outbox events",
|
|
zap.Int("count", processed),
|
|
zap.Int("batch_size", p.batchSize))
|
|
}
|
|
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
if processed == 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(p.pollInterval):
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *outboxPublisher) dispatchPending(ctx context.Context) (int, error) {
|
|
if p.store == nil || p.producer == nil {
|
|
return 0, nil
|
|
}
|
|
|
|
events, err := p.store.ListPending(ctx, p.batchSize)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
for _, event := range events {
|
|
if ctx.Err() != nil {
|
|
return len(events), ctx.Err()
|
|
}
|
|
if err := p.publishEvent(ctx, event); err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return len(events), err
|
|
}
|
|
p.logger.Warn("failed to publish outbox event",
|
|
zap.Error(err),
|
|
zap.String("eventId", event.EventID),
|
|
zap.String("subject", event.Subject),
|
|
zap.String("organizationRef", event.OrganizationRef.Hex()),
|
|
zap.Int("attempts", event.Attempts))
|
|
p.handleFailure(ctx, event)
|
|
continue
|
|
}
|
|
if err := p.markSent(ctx, event); err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return len(events), err
|
|
}
|
|
p.logger.Warn("failed to mark outbox event as sent",
|
|
zap.Error(err),
|
|
zap.String("eventId", event.EventID),
|
|
zap.String("subject", event.Subject),
|
|
zap.String("organizationRef", event.OrganizationRef.Hex()))
|
|
} else {
|
|
p.logger.Debug("outbox event marked sent",
|
|
zap.String("eventId", event.EventID),
|
|
zap.String("subject", event.Subject),
|
|
zap.String("organizationRef", event.OrganizationRef.Hex()))
|
|
}
|
|
}
|
|
|
|
return len(events), nil
|
|
}
|
|
|
|
func (p *outboxPublisher) publishEvent(_ context.Context, event *ledgerModel.OutboxEvent) error {
|
|
docID := event.GetID()
|
|
if docID == nil || docID.IsZero() {
|
|
return merrors.InvalidArgument("outbox event missing identifier")
|
|
}
|
|
|
|
payload, err := p.wrapPayload(event)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
env := me.CreateEnvelope(outboxPublisherSender, domainmodel.NewNotification(mservice.LedgerOutbox, notification.NASent))
|
|
if _, err = env.Wrap(payload); err != nil {
|
|
return err
|
|
}
|
|
|
|
return p.producer.SendMessage(env)
|
|
}
|
|
|
|
func (p *outboxPublisher) wrapPayload(event *ledgerModel.OutboxEvent) ([]byte, error) {
|
|
message := ledgerOutboxMessage{
|
|
EventID: event.EventID,
|
|
Subject: event.Subject,
|
|
Payload: json.RawMessage(event.Payload),
|
|
Attempts: event.Attempts,
|
|
OrganizationRef: event.OrganizationRef.Hex(),
|
|
CreatedAt: event.CreatedAt,
|
|
}
|
|
return json.Marshal(message)
|
|
}
|
|
|
|
func (p *outboxPublisher) markSent(ctx context.Context, event *ledgerModel.OutboxEvent) error {
|
|
eventRef := event.GetID()
|
|
if eventRef == nil || eventRef.IsZero() {
|
|
return merrors.InvalidArgument("outbox event missing identifier")
|
|
}
|
|
|
|
return p.store.MarkSent(ctx, *eventRef, time.Now().UTC())
|
|
}
|
|
|
|
func (p *outboxPublisher) handleFailure(ctx context.Context, event *ledgerModel.OutboxEvent) {
|
|
eventRef := event.GetID()
|
|
if eventRef == nil || eventRef.IsZero() {
|
|
p.logger.Warn("cannot record outbox failure: missing identifier", zap.String("eventId", event.EventID))
|
|
return
|
|
}
|
|
|
|
if err := p.store.IncrementAttempts(ctx, *eventRef); err != nil && !errors.Is(err, context.Canceled) {
|
|
p.logger.Warn("failed to increment outbox attempts",
|
|
zap.Error(err),
|
|
zap.String("eventId", event.EventID),
|
|
zap.String("subject", event.Subject),
|
|
zap.String("organizationRef", event.OrganizationRef.Hex()))
|
|
}
|
|
|
|
if event.Attempts+1 >= maxOutboxDeliveryAttempts {
|
|
if err := p.store.MarkFailed(ctx, *eventRef); err != nil && !errors.Is(err, context.Canceled) {
|
|
p.logger.Warn("failed to mark outbox event failed",
|
|
zap.Error(err),
|
|
zap.String("eventId", event.EventID),
|
|
zap.String("subject", event.Subject),
|
|
zap.String("organizationRef", event.OrganizationRef.Hex()),
|
|
zap.Int("attempts", event.Attempts+1))
|
|
} else {
|
|
p.logger.Warn("ledger outbox event marked as failed",
|
|
zap.String("eventId", event.EventID),
|
|
zap.String("subject", event.Subject),
|
|
zap.String("organizationRef", event.OrganizationRef.Hex()),
|
|
zap.Int("attempts", event.Attempts+1))
|
|
}
|
|
}
|
|
}
|
|
|
|
type ledgerOutboxMessage struct {
|
|
EventID string `json:"eventId"`
|
|
Subject string `json:"subject"`
|
|
Payload json.RawMessage `json:"payload"`
|
|
Attempts int `json:"attempts"`
|
|
OrganizationRef string `json:"organizationRef"`
|
|
CreatedAt time.Time `json:"createdAt"`
|
|
}
|