109 lines
3.1 KiB
Go
109 lines
3.1 KiB
Go
package outbox
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
pmessaging "github.com/tech/sendico/pkg/messaging"
|
|
pmessagingreliable "github.com/tech/sendico/pkg/messaging/reliable"
|
|
"github.com/tech/sendico/pkg/mlogger"
|
|
cfgmodel "github.com/tech/sendico/pkg/model"
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
)
|
|
|
|
type reliableStoreAdapter struct {
|
|
store Store
|
|
}
|
|
|
|
func NewReliableProducer(logger mlogger.Logger, direct pmessaging.Producer, store Store, messagingSettings cfgmodel.SettingsT, opts ...pmessagingreliable.Option) (*pmessagingreliable.ReliableProducer, pmessagingreliable.Settings, error) {
|
|
if store == nil {
|
|
return nil, pmessagingreliable.DefaultSettings(), nil
|
|
}
|
|
producer, settings, err := pmessagingreliable.NewReliableProducerFromConfig(logger, direct, &reliableStoreAdapter{store: store}, messagingSettings, opts...)
|
|
if err != nil {
|
|
return nil, pmessagingreliable.Settings{}, err
|
|
}
|
|
return producer, settings, nil
|
|
}
|
|
|
|
func (a *reliableStoreAdapter) Enqueue(ctx context.Context, msg pmessagingreliable.OutboxMessage) error {
|
|
if a == nil || a.store == nil {
|
|
return nil
|
|
}
|
|
return a.store.Create(ctx, &Event{
|
|
EventID: strings.TrimSpace(msg.EventID),
|
|
Subject: strings.TrimSpace(msg.Subject),
|
|
Payload: append([]byte(nil), msg.Payload...),
|
|
Status: StatusPending,
|
|
Attempts: msg.Attempts,
|
|
})
|
|
}
|
|
|
|
func (a *reliableStoreAdapter) ListPending(ctx context.Context, limit int) ([]pmessagingreliable.OutboxMessage, error) {
|
|
if a == nil || a.store == nil {
|
|
return nil, nil
|
|
}
|
|
events, err := a.store.ListPending(ctx, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make([]pmessagingreliable.OutboxMessage, 0, len(events))
|
|
for _, event := range events {
|
|
if event == nil {
|
|
continue
|
|
}
|
|
reference := ""
|
|
if eventRef := event.GetID(); eventRef != nil && !eventRef.IsZero() {
|
|
reference = eventRef.Hex()
|
|
}
|
|
result = append(result, pmessagingreliable.OutboxMessage{
|
|
Reference: reference,
|
|
EventID: strings.TrimSpace(event.EventID),
|
|
Subject: strings.TrimSpace(event.Subject),
|
|
Payload: append([]byte(nil), event.Payload...),
|
|
Attempts: event.Attempts,
|
|
CreatedAt: event.CreatedAt,
|
|
})
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (a *reliableStoreAdapter) MarkSent(ctx context.Context, reference string, sentAt time.Time) error {
|
|
if a == nil || a.store == nil {
|
|
return nil
|
|
}
|
|
eventRef, err := parseObjectID(strings.TrimSpace(reference))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return a.store.MarkSent(ctx, eventRef, sentAt)
|
|
}
|
|
|
|
func (a *reliableStoreAdapter) MarkFailed(ctx context.Context, reference string) error {
|
|
if a == nil || a.store == nil {
|
|
return nil
|
|
}
|
|
eventRef, err := parseObjectID(strings.TrimSpace(reference))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return a.store.MarkFailed(ctx, eventRef)
|
|
}
|
|
|
|
func (a *reliableStoreAdapter) IncrementAttempts(ctx context.Context, reference string) error {
|
|
if a == nil || a.store == nil {
|
|
return nil
|
|
}
|
|
eventRef, err := parseObjectID(strings.TrimSpace(reference))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return a.store.IncrementAttempts(ctx, eventRef)
|
|
}
|
|
|
|
func parseObjectID(raw string) (bson.ObjectID, error) {
|
|
return bson.ObjectIDFromHex(raw)
|
|
}
|