Files
sendico/api/pkg/db/internal/mongo/repositoryimp/builderimp/pipeline.go
Stephan D 62a6631b9a
All checks were successful
ci/woodpecker/push/db Pipeline was successful
ci/woodpecker/push/nats Pipeline was successful
service backend
2025-11-07 18:35:26 +01:00

132 lines
3.8 KiB
Go

package builderimp
import (
"github.com/tech/sendico/pkg/db/repository/builder"
"github.com/tech/sendico/pkg/mservice"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
type unwindOpts = builder.UnwindOpts
// UnwindOption is the same type defined in the builder package.
type UnwindOption = builder.UnwindOption
// NewUnwindOpts applies all UnwindOption's to a fresh unwindOpts.
func NewUnwindOpts(opts ...UnwindOption) *unwindOpts {
cfg := &unwindOpts{}
for _, opt := range opts {
opt(cfg)
}
return cfg
}
type PipelineImp struct {
pipeline mongo.Pipeline
}
func (b *PipelineImp) Match(filter builder.Query) builder.Pipeline {
b.pipeline = append(b.pipeline, filter.BuildPipeline())
return b
}
func (b *PipelineImp) Lookup(from mservice.Type, localField, foreignField, as builder.Field) builder.Pipeline {
b.pipeline = append(b.pipeline, bson.D{{Key: string(builder.Lookup), Value: bson.D{
{Key: string(builder.MKFrom), Value: from},
{Key: string(builder.MKLocalField), Value: localField.Build()},
{Key: string(builder.MKForeignField), Value: foreignField.Build()},
{Key: string(builder.MKAs), Value: as.Build()},
}}})
return b
}
func (b *PipelineImp) LookupWithPipeline(
from mservice.Type,
nested builder.Pipeline,
as builder.Field,
let *map[string]builder.Field,
) builder.Pipeline {
lookupStage := bson.D{
{Key: string(builder.MKFrom), Value: from},
{Key: string(builder.MKPipeline), Value: nested.Build()},
{Key: string(builder.MKAs), Value: as.Build()},
}
// only add "let" if provided and not empty
if let != nil && len(*let) > 0 {
letDoc := bson.D{}
for varName, fld := range *let {
letDoc = append(letDoc, bson.E{Key: varName, Value: fld.Build()})
}
lookupStage = append(lookupStage, bson.E{Key: string(builder.MKLet), Value: letDoc})
}
b.pipeline = append(b.pipeline, bson.D{{Key: string(builder.Lookup), Value: lookupStage}})
return b
}
func (b *PipelineImp) Unwind(path builder.Field, opts ...UnwindOption) builder.Pipeline {
cfg := NewUnwindOpts(opts...)
var stageValue interface{}
// if no options, shorthand
if !cfg.PreserveNullAndEmptyArrays && cfg.IncludeArrayIndex == "" {
stageValue = path.Build()
} else {
d := bson.D{{Key: string(builder.MKPath), Value: path.Build()}}
if cfg.PreserveNullAndEmptyArrays {
d = append(d, bson.E{Key: string(builder.MKPreserveNullAndEmptyArrays), Value: true})
}
if cfg.IncludeArrayIndex != "" {
d = append(d, bson.E{Key: string(builder.MKIncludeArrayIndex), Value: cfg.IncludeArrayIndex})
}
stageValue = d
}
b.pipeline = append(b.pipeline, bson.D{{Key: string(builder.Unwind), Value: stageValue}})
return b
}
func (b *PipelineImp) Count(field builder.Field) builder.Pipeline {
b.pipeline = append(b.pipeline, bson.D{{Key: string(builder.Count), Value: field.Build()}})
return b
}
func (b *PipelineImp) Group(groupBy builder.Alias, accumulators ...builder.GroupAccumulator) builder.Pipeline {
groupDoc := groupBy.Build()
for _, acc := range accumulators {
groupDoc = append(groupDoc, acc.Build()...)
}
b.pipeline = append(b.pipeline, bson.D{
{Key: string(builder.Group), Value: groupDoc},
})
return b
}
func (b *PipelineImp) Project(projections ...builder.Projection) builder.Pipeline {
projDoc := bson.D{}
for _, pr := range projections {
projDoc = append(projDoc, pr.Build()...)
}
b.pipeline = append(b.pipeline, bson.D{{Key: string(builder.Project), Value: projDoc}})
return b
}
func (b *PipelineImp) ReplaceRoot(newRoot builder.Expression) builder.Pipeline {
b.pipeline = append(b.pipeline, bson.D{{Key: string(builder.ReplaceRoot), Value: bson.D{
{Key: string(builder.MKNewRoot), Value: newRoot.Build()},
}}})
return b
}
func (b *PipelineImp) Build() mongo.Pipeline {
return b.pipeline
}
func NewPipelineImp() builder.Pipeline {
return &PipelineImp{
pipeline: mongo.Pipeline{},
}
}