package outbox import ( "context" "strings" "time" pmessaging "github.com/tech/sendico/pkg/messaging" pmessagingreliable "github.com/tech/sendico/pkg/messaging/reliable" "github.com/tech/sendico/pkg/mlogger" cfgmodel "github.com/tech/sendico/pkg/model" "go.mongodb.org/mongo-driver/v2/bson" ) type reliableStoreAdapter struct { store Store } func NewReliableProducer(logger mlogger.Logger, direct pmessaging.Producer, store Store, messagingSettings cfgmodel.SettingsT, opts ...pmessagingreliable.Option) (*pmessagingreliable.ReliableProducer, pmessagingreliable.Settings, error) { if store == nil { return nil, pmessagingreliable.DefaultSettings(), nil } producer, settings, err := pmessagingreliable.NewReliableProducerFromConfig(logger, direct, &reliableStoreAdapter{store: store}, messagingSettings, opts...) if err != nil { return nil, pmessagingreliable.Settings{}, err } return producer, settings, nil } func (a *reliableStoreAdapter) Enqueue(ctx context.Context, msg pmessagingreliable.OutboxMessage) error { if a == nil || a.store == nil { return nil } return a.store.Create(ctx, &Event{ EventID: strings.TrimSpace(msg.EventID), Subject: strings.TrimSpace(msg.Subject), Payload: append([]byte(nil), msg.Payload...), Status: StatusPending, Attempts: msg.Attempts, }) } func (a *reliableStoreAdapter) ListPending(ctx context.Context, limit int) ([]pmessagingreliable.OutboxMessage, error) { if a == nil || a.store == nil { return nil, nil } events, err := a.store.ListPending(ctx, limit) if err != nil { return nil, err } result := make([]pmessagingreliable.OutboxMessage, 0, len(events)) for _, event := range events { if event == nil { continue } reference := "" if eventRef := event.GetID(); eventRef != nil && !eventRef.IsZero() { reference = eventRef.Hex() } result = append(result, pmessagingreliable.OutboxMessage{ Reference: reference, EventID: strings.TrimSpace(event.EventID), Subject: strings.TrimSpace(event.Subject), Payload: append([]byte(nil), event.Payload...), Attempts: event.Attempts, CreatedAt: event.CreatedAt, }) } return result, nil } func (a *reliableStoreAdapter) MarkSent(ctx context.Context, reference string, sentAt time.Time) error { if a == nil || a.store == nil { return nil } eventRef, err := parseObjectID(strings.TrimSpace(reference)) if err != nil { return err } return a.store.MarkSent(ctx, eventRef, sentAt) } func (a *reliableStoreAdapter) MarkFailed(ctx context.Context, reference string) error { if a == nil || a.store == nil { return nil } eventRef, err := parseObjectID(strings.TrimSpace(reference)) if err != nil { return err } return a.store.MarkFailed(ctx, eventRef) } func (a *reliableStoreAdapter) IncrementAttempts(ctx context.Context, reference string) error { if a == nil || a.store == nil { return nil } eventRef, err := parseObjectID(strings.TrimSpace(reference)) if err != nil { return err } return a.store.IncrementAttempts(ctx, eventRef) } func parseObjectID(raw string) (bson.ObjectID, error) { return bson.ObjectIDFromHex(raw) }