169 lines
5.1 KiB
Go
169 lines
5.1 KiB
Go
package ledger
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/tech/sendico/ledger/storage"
|
|
"github.com/tech/sendico/ledger/storage/model"
|
|
pmessaging "github.com/tech/sendico/pkg/messaging"
|
|
me "github.com/tech/sendico/pkg/messaging/envelope"
|
|
pmessagingreliable "github.com/tech/sendico/pkg/messaging/reliable"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
cfgmodel "github.com/tech/sendico/pkg/model"
|
|
domainmodel "github.com/tech/sendico/pkg/model"
|
|
notification "github.com/tech/sendico/pkg/model/notification"
|
|
"github.com/tech/sendico/pkg/mservice"
|
|
)
|
|
|
|
const (
|
|
outboxPublisherSender = "ledger.outbox.publisher"
|
|
)
|
|
|
|
type ledgerOutboxMessage struct {
|
|
EventID string `json:"eventId"`
|
|
Subject string `json:"subject"`
|
|
Payload json.RawMessage `json:"payload"`
|
|
Attempts int `json:"attempts"`
|
|
OrganizationRef string `json:"organizationRef"`
|
|
CreatedAt time.Time `json:"createdAt"`
|
|
}
|
|
|
|
type ledgerOutboxStoreAdapter struct {
|
|
store storage.OutboxStore
|
|
}
|
|
|
|
func newLedgerReliableProducer(logger mlogger.Logger, direct pmessaging.Producer, store storage.OutboxStore, messagingSettings cfgmodel.SettingsT) (*pmessagingreliable.ReliableProducer, pmessagingreliable.Settings, error) {
|
|
if store == nil {
|
|
return nil, pmessagingreliable.DefaultSettings(), nil
|
|
}
|
|
producer, settings, err := pmessagingreliable.NewReliableProducerFromConfig(logger, direct, &ledgerOutboxStoreAdapter{store: store}, messagingSettings,
|
|
pmessagingreliable.WithEnvelopeDecoder(ledgerOutboxDecoder),
|
|
)
|
|
if err != nil {
|
|
return nil, pmessagingreliable.Settings{}, err
|
|
}
|
|
return producer, settings, nil
|
|
}
|
|
|
|
func (a *ledgerOutboxStoreAdapter) Enqueue(ctx context.Context, msg pmessagingreliable.OutboxMessage) error {
|
|
if a == nil || a.store == nil {
|
|
return nil
|
|
}
|
|
event := &model.OutboxEvent{
|
|
EventID: strings.TrimSpace(msg.EventID),
|
|
Subject: strings.TrimSpace(msg.Subject),
|
|
Payload: append([]byte(nil), msg.Payload...),
|
|
Status: model.OutboxStatusPending,
|
|
Attempts: msg.Attempts,
|
|
}
|
|
if organizationRef := strings.TrimSpace(msg.OrganizationRef); organizationRef != "" {
|
|
orgRef, err := parseObjectID(organizationRef)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
event.OrganizationRef = orgRef
|
|
}
|
|
return a.store.Create(ctx, event)
|
|
}
|
|
|
|
func (a *ledgerOutboxStoreAdapter) ListPending(ctx context.Context, limit int) ([]pmessagingreliable.OutboxMessage, error) {
|
|
if a == nil || a.store == nil {
|
|
return nil, nil
|
|
}
|
|
events, err := a.store.ListPending(ctx, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make([]pmessagingreliable.OutboxMessage, 0, len(events))
|
|
for _, event := range events {
|
|
if event == nil {
|
|
continue
|
|
}
|
|
reference := ""
|
|
if eventRef := event.GetID(); eventRef != nil && !eventRef.IsZero() {
|
|
reference = eventRef.Hex()
|
|
}
|
|
result = append(result, pmessagingreliable.OutboxMessage{
|
|
Reference: reference,
|
|
EventID: strings.TrimSpace(event.EventID),
|
|
Subject: strings.TrimSpace(event.Subject),
|
|
Payload: append([]byte(nil), event.Payload...),
|
|
Attempts: event.Attempts,
|
|
OrganizationRef: event.OrganizationRef.Hex(),
|
|
CreatedAt: event.CreatedAt,
|
|
})
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (a *ledgerOutboxStoreAdapter) MarkSent(ctx context.Context, reference string, sentAt time.Time) error {
|
|
if a == nil || a.store == nil {
|
|
return nil
|
|
}
|
|
eventRef, err := parseObjectID(strings.TrimSpace(reference))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return a.store.MarkSent(ctx, eventRef, sentAt)
|
|
}
|
|
|
|
func (a *ledgerOutboxStoreAdapter) MarkFailed(ctx context.Context, reference string) error {
|
|
if a == nil || a.store == nil {
|
|
return nil
|
|
}
|
|
eventRef, err := parseObjectID(strings.TrimSpace(reference))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return a.store.MarkFailed(ctx, eventRef)
|
|
}
|
|
|
|
func (a *ledgerOutboxStoreAdapter) IncrementAttempts(ctx context.Context, reference string) error {
|
|
if a == nil || a.store == nil {
|
|
return nil
|
|
}
|
|
eventRef, err := parseObjectID(strings.TrimSpace(reference))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return a.store.IncrementAttempts(ctx, eventRef)
|
|
}
|
|
|
|
func ledgerOutboxDecoder(record pmessagingreliable.OutboxMessage) (me.Envelope, error) {
|
|
env, err := me.Deserialize(record.Payload)
|
|
if err == nil {
|
|
return env, nil
|
|
}
|
|
if strings.TrimSpace(record.Subject) != ledgerOutboxSubject {
|
|
return nil, err
|
|
}
|
|
return buildLedgerOutboxEnvelope(record.EventID, record.Payload, record.Attempts, record.OrganizationRef, record.CreatedAt)
|
|
}
|
|
|
|
func buildLedgerOutboxEnvelope(eventID string, payload []byte, attempts int, organizationRef string, createdAt time.Time) (me.Envelope, error) {
|
|
msg := ledgerOutboxMessage{
|
|
EventID: strings.TrimSpace(eventID),
|
|
Subject: ledgerOutboxSubject,
|
|
Payload: append([]byte(nil), payload...),
|
|
Attempts: attempts,
|
|
OrganizationRef: strings.TrimSpace(organizationRef),
|
|
CreatedAt: createdAt,
|
|
}
|
|
|
|
body, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
env := me.CreateEnvelope(outboxPublisherSender, domainmodel.NewNotification(mservice.LedgerOutbox, notification.NASent))
|
|
if _, err = env.Wrap(body); err != nil {
|
|
return nil, err
|
|
}
|
|
return env, nil
|
|
}
|