package storage import ( "context" "errors" "strings" "time" "github.com/tech/sendico/pkg/db" "github.com/tech/sendico/pkg/db/repository" "github.com/tech/sendico/pkg/db/repository/builder" ri "github.com/tech/sendico/pkg/db/repository/index" "github.com/tech/sendico/pkg/db/storable" "github.com/tech/sendico/pkg/merrors" "github.com/tech/sendico/pkg/mlogger" "github.com/tech/sendico/pkg/mservice" mutil "github.com/tech/sendico/pkg/mutil/db" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.uber.org/zap" ) const ( inboxCollection string = "inbox" tasksCollection string = "tasks" endpointsCollection string = mservice.Callbacks ) type mongoRepository struct { logger mlogger.Logger inboxRepo repository.Repository tasksRepo repository.Repository endpointsRepo repository.Repository inbox InboxRepo endpoints EndpointRepo tasks TaskRepo } type inboxDoc struct { storable.Base `bson:",inline"` EventID string `bson:"event_id"` ClientID string `bson:"client_id"` EventType string `bson:"event_type"` } func (d *inboxDoc) Collection() string { return inboxCollection } type delayConfig struct { MinDelayMS int `bson:"min_ms"` MaxDelayMS int `bson:"max_ms"` } type deliveryPolicy struct { delayConfig `bson:",inline"` SigningMode string `bson:"signing_mode"` SecretRef string `bson:"secret_ref"` Headers map[string]string `bson:"headers"` MaxAttempts int `bson:"max_attempts"` RequestTimeoutMS int `bson:"request_timeout_ms"` } type endpointDoc struct { storable.Base `bson:",inline"` deliveryPolicy `bson:"retry_policy"` ClientID string `bson:"client_id"` Status string `bson:"status"` URL string `bson:"url"` EventTypes []string `bson:"event_types"` } func (d *endpointDoc) Collection() string { return endpointsCollection } type taskDoc struct { storable.Base `bson:",inline"` deliveryPolicy `bson:"retry_policy"` EventID string `bson:"event_id"` EndpointID bson.ObjectID `bson:"endpoint_id"` EndpointURL string `bson:"endpoint_url"` Payload []byte `bson:"payload"` Status TaskStatus `bson:"status"` Attempt int `bson:"attempt"` LastError string `bson:"last_error,omitempty"` LastHTTPCode int `bson:"last_http_code,omitempty"` NextAttemptAt time.Time `bson:"next_attempt_at"` LockedUntil *time.Time `bson:"locked_until,omitempty"` WorkerID string `bson:"worker_id,omitempty"` DeliveredAt *time.Time `bson:"delivered_at,omitempty"` } func (d *taskDoc) Collection() string { return tasksCollection } func newMongoRepository(logger mlogger.Logger, conn *db.MongoConnection) (Repository, error) { if logger == nil { logger = zap.NewNop() } if conn == nil { return nil, merrors.InvalidArgument("callbacks storage: mongo connection is required", "conn") } repo := &mongoRepository{ logger: logger.Named("storage"), inboxRepo: repository.CreateMongoRepository(conn.Database(), inboxCollection), tasksRepo: repository.CreateMongoRepository(conn.Database(), tasksCollection), endpointsRepo: repository.CreateMongoRepository(conn.Database(), endpointsCollection), } if err := repo.ensureIndexes(); err != nil { return nil, err } repo.inbox = &inboxStore{logger: repo.logger.Named(repo.inboxRepo.Collection()), repo: repo.inboxRepo} repo.endpoints = &endpointStore{logger: repo.logger.Named(repo.endpointsRepo.Collection()), repo: repo.endpointsRepo} repo.tasks = &taskStore{logger: repo.logger.Named(repo.tasksRepo.Collection()), repo: repo.tasksRepo} return repo, nil } func (m *mongoRepository) Inbox() InboxRepo { return m.inbox } func (m *mongoRepository) Endpoints() EndpointRepo { return m.endpoints } func (m *mongoRepository) Tasks() TaskRepo { return m.tasks } func (m *mongoRepository) ensureIndexes() error { if err := m.inboxRepo.CreateIndex(&ri.Definition{ Name: "uq_event_id", Unique: true, Keys: []ri.Key{ {Field: "event_id", Sort: ri.Asc}, }, }); err != nil { return merrors.InternalWrap(err, "callbacks storage: failed to create inbox indexes") } for _, def := range []*ri.Definition{ { Name: "uq_event_endpoint", Unique: true, Keys: []ri.Key{ {Field: "event_id", Sort: ri.Asc}, {Field: "endpoint_id", Sort: ri.Asc}, }, }, { Name: "idx_dispatch_scan", Keys: []ri.Key{ {Field: "status", Sort: ri.Asc}, {Field: "next_attempt_at", Sort: ri.Asc}, {Field: "locked_until", Sort: ri.Asc}, }, }, } { if err := m.tasksRepo.CreateIndex(def); err != nil { return merrors.InternalWrap(err, "callbacks storage: failed to create tasks indexes") } } if err := m.endpointsRepo.CreateIndex(&ri.Definition{ Name: "idx_client_event", Keys: []ri.Key{ {Field: "client_id", Sort: ri.Asc}, {Field: "status", Sort: ri.Asc}, {Field: "event_types", Sort: ri.Asc}, }, }); err != nil { return merrors.InternalWrap(err, "callbacks storage: failed to create endpoint indexes") } return nil } type inboxStore struct { logger mlogger.Logger repo repository.Repository } func (r *inboxStore) TryInsert(ctx context.Context, eventID, clientID, eventType string, at time.Time) (bool, error) { doc := &inboxDoc{ EventID: strings.TrimSpace(eventID), ClientID: strings.TrimSpace(clientID), EventType: strings.TrimSpace(eventType), } filter := repository.Filter("event_id", doc.EventID) if err := r.repo.Insert(ctx, doc, filter); err != nil { if errors.Is(err, merrors.ErrDataConflict) { return false, nil } r.logger.Warn("Failed to insert inbox dedupe marker", zap.String("event_id", eventID), zap.Error(err)) return false, merrors.InternalWrap(err, "callbacks inbox insert failed") } return true, nil } type endpointStore struct { logger mlogger.Logger repo repository.Repository } func (r *endpointStore) FindActiveByClientAndType(ctx context.Context, clientID, eventType string) ([]Endpoint, error) { clientID = strings.TrimSpace(clientID) eventType = strings.TrimSpace(eventType) if clientID == "" { return nil, merrors.InvalidArgument("client_id is required", "client_id") } if eventType == "" { return nil, merrors.InvalidArgument("event type is required", "event_type") } query := repository.Query(). Filter(repository.Field("client_id"), clientID). In(repository.Field("status"), "active", "enabled") out := make([]Endpoint, 0) err := r.repo.FindManyByFilter(ctx, query, func(cur *mongo.Cursor) error { doc := &endpointDoc{} if err := cur.Decode(doc); err != nil { return err } if strings.TrimSpace(doc.URL) == "" { return nil } if !supportsEventType(doc.EventTypes, eventType) { return nil } out = append(out, Endpoint{ ID: doc.ID, ClientID: doc.ClientID, URL: strings.TrimSpace(doc.URL), SigningMode: strings.TrimSpace(doc.SigningMode), SecretRef: strings.TrimSpace(doc.SecretRef), Headers: cloneHeaders(doc.Headers), MaxAttempts: doc.MaxAttempts, MinDelay: time.Duration(doc.MinDelayMS) * time.Millisecond, MaxDelay: time.Duration(doc.MaxDelayMS) * time.Millisecond, RequestTimeout: time.Duration(doc.RequestTimeoutMS) * time.Millisecond, }) return nil }) if err != nil && !errors.Is(err, merrors.ErrNoData) { return nil, merrors.InternalWrap(err, "callbacks endpoint lookup failed") } return out, nil } func supportsEventType(eventTypes []string, eventType string) bool { if len(eventTypes) == 0 { return true } eventType = strings.TrimSpace(eventType) for _, t := range eventTypes { current := strings.TrimSpace(t) if current == "" { continue } if current == "*" || current == eventType { return true } } return false } type taskStore struct { logger mlogger.Logger repo repository.Repository } func (r *taskStore) UpsertTasks(ctx context.Context, eventID string, endpoints []Endpoint, payload []byte, defaults TaskDefaults, at time.Time) error { eventID = strings.TrimSpace(eventID) if eventID == "" { return merrors.InvalidArgument("event id is required", "event_id") } if len(endpoints) == 0 { return nil } now := at.UTC() for _, endpoint := range endpoints { if endpoint.ID == bson.NilObjectID { continue } maxAttempts := endpoint.MaxAttempts if maxAttempts <= 0 { maxAttempts = defaults.MaxAttempts } if maxAttempts <= 0 { maxAttempts = 1 } minDelay := endpoint.MinDelay if minDelay <= 0 { minDelay = defaults.MinDelay } if minDelay <= 0 { minDelay = time.Second } maxDelay := endpoint.MaxDelay if maxDelay <= 0 { maxDelay = defaults.MaxDelay } if maxDelay < minDelay { maxDelay = minDelay } requestTimeout := endpoint.RequestTimeout if requestTimeout <= 0 { requestTimeout = defaults.RequestTimeout } doc := &taskDoc{} doc.EventID = eventID doc.EndpointID = endpoint.ID doc.EndpointURL = strings.TrimSpace(endpoint.URL) doc.SigningMode = strings.TrimSpace(endpoint.SigningMode) doc.SecretRef = strings.TrimSpace(endpoint.SecretRef) doc.Headers = cloneHeaders(endpoint.Headers) doc.Payload = append([]byte(nil), payload...) doc.Status = TaskStatusPending doc.Attempt = 0 doc.MaxAttempts = maxAttempts doc.MinDelayMS = int(minDelay / time.Millisecond) doc.MaxDelayMS = int(maxDelay / time.Millisecond) doc.RequestTimeoutMS = int(requestTimeout / time.Millisecond) doc.NextAttemptAt = now filter := repository.Filter("event_id", eventID).And(repository.Filter("endpoint_id", endpoint.ID)) if err := r.repo.Insert(ctx, doc, filter); err != nil { if errors.Is(err, merrors.ErrDataConflict) { continue } return merrors.InternalWrap(err, "callbacks task upsert failed") } } return nil } func (r *taskStore) LockNextTask(ctx context.Context, now time.Time, workerID string, lockTTL time.Duration) (*Task, error) { workerID = strings.TrimSpace(workerID) if workerID == "" { return nil, merrors.InvalidArgument("worker id is required", "worker_id") } now = now.UTC() limit := int64(32) lockFilter := repository.Query().Or( repository.Query().Comparison(repository.Field("locked_until"), builder.Exists, false), repository.Query().Filter(repository.Field("locked_until"), nil), repository.Query().Comparison(repository.Field("locked_until"), builder.Lte, now), ) query := repository.Query(). In(repository.Field("status"), string(TaskStatusPending), string(TaskStatusRetry)). Comparison(repository.Field("next_attempt_at"), builder.Lte, now). And(lockFilter). Sort(repository.Field("next_attempt_at"), true). Sort(repository.Field("created_at"), true). Limit(&limit) candidates, err := mutil.GetObjects[taskDoc](ctx, r.logger, query, nil, r.repo) if err != nil { if errors.Is(err, merrors.ErrNoData) { return nil, nil } return nil, merrors.InternalWrap(err, "callbacks task query failed") } lockedUntil := now.Add(lockTTL) for _, candidate := range candidates { patch := repository.Patch(). Set(repository.Field("locked_until"), lockedUntil). Set(repository.Field("worker_id"), workerID) conditional := repository.IDFilter(candidate.ID).And( repository.Query().In(repository.Field("status"), string(TaskStatusPending), string(TaskStatusRetry)), repository.Query().Comparison(repository.Field("next_attempt_at"), builder.Lte, now), lockFilter, ) updated, err := r.repo.PatchMany(ctx, conditional, patch) if err != nil { return nil, merrors.InternalWrap(err, "callbacks task lock update failed") } if updated == 0 { continue } locked := &taskDoc{} if err := r.repo.Get(ctx, candidate.ID, locked); err != nil { if errors.Is(err, merrors.ErrNoData) { continue } return nil, merrors.InternalWrap(err, "callbacks task lock reload failed") } if strings.TrimSpace(locked.WorkerID) != workerID { continue } return mapTaskDoc(locked), nil } return nil, nil } func (r *taskStore) MarkDelivered(ctx context.Context, taskID bson.ObjectID, httpCode int, latency time.Duration, at time.Time) error { _ = latency if taskID == bson.NilObjectID { return merrors.InvalidArgument("task id is required", "task_id") } patch := repository.Patch(). Set(repository.Field("status"), TaskStatusDelivered). Set(repository.Field("last_http_code"), httpCode). Set(repository.Field("delivered_at"), time.Now()). Set(repository.Field("locked_until"), nil). Set(repository.Field("worker_id"), ""). Set(repository.Field("last_error"), "") if err := r.repo.Patch(ctx, taskID, patch); err != nil { return merrors.InternalWrap(err, "callbacks task mark delivered failed") } return nil } func (r *taskStore) MarkRetry(ctx context.Context, taskID bson.ObjectID, attempt int, nextAttemptAt time.Time, lastError string, httpCode int, at time.Time) error { if taskID == bson.NilObjectID { return merrors.InvalidArgument("task id is required", "task_id") } patch := repository.Patch(). Set(repository.Field("status"), TaskStatusRetry). Set(repository.Field("attempt"), attempt). Set(repository.Field("next_attempt_at"), nextAttemptAt.UTC()). Set(repository.Field("last_error"), strings.TrimSpace(lastError)). Set(repository.Field("last_http_code"), httpCode). Set(repository.Field("locked_until"), nil). Set(repository.Field("worker_id"), "") if err := r.repo.Patch(ctx, taskID, patch); err != nil { return merrors.InternalWrap(err, "callbacks task mark retry failed") } return nil } func (r *taskStore) MarkFailed(ctx context.Context, taskID bson.ObjectID, attempt int, lastError string, httpCode int, at time.Time) error { if taskID == bson.NilObjectID { return merrors.InvalidArgument("task id is required", "task_id") } patch := repository.Patch(). Set(repository.Field("status"), TaskStatusFailed). Set(repository.Field("attempt"), attempt). Set(repository.Field("last_error"), strings.TrimSpace(lastError)). Set(repository.Field("last_http_code"), httpCode). Set(repository.Field("locked_until"), nil). Set(repository.Field("worker_id"), "") if err := r.repo.Patch(ctx, taskID, patch); err != nil { return merrors.InternalWrap(err, "callbacks task mark failed failed") } return nil } func mapTaskDoc(doc *taskDoc) *Task { if doc == nil { return nil } return &Task{ ID: doc.ID, EventID: doc.EventID, EndpointID: doc.EndpointID, EndpointURL: doc.EndpointURL, SigningMode: doc.SigningMode, SecretRef: doc.SecretRef, Headers: cloneHeaders(doc.Headers), Payload: append([]byte(nil), doc.Payload...), Attempt: doc.Attempt, MaxAttempts: doc.MaxAttempts, MinDelay: time.Duration(doc.MinDelayMS) * time.Millisecond, MaxDelay: time.Duration(doc.MaxDelayMS) * time.Millisecond, RequestTimeout: time.Duration(doc.RequestTimeoutMS) * time.Millisecond, Status: doc.Status, NextAttemptAt: doc.NextAttemptAt, } } func cloneHeaders(in map[string]string) map[string]string { if len(in) == 0 { return map[string]string{} } out := make(map[string]string, len(in)) for key, val := range in { out[key] = val } return out }