Skip to content

MQE: Count samples saved on the CSE #12169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 236 additions & 34 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3236,9 +3236,6 @@ func TestQueryStats(t *testing.T) {
expectedTotalSamples int64
expectedTotalSamplesPerStep []int64
skipCompareWithPrometheus string
// ...WithMQE expectations are optional and should be set only if a query with MQE reports different stats (eg. due to optimisations like common subexpression elimination)
expectedTotalSamplesWithMQE int64
expectedTotalSamplesPerStepWithMQE []int64
}{
"instant vector selector with point at every time step": {
expr: `dense_series{}`,
Expand Down Expand Up @@ -3281,7 +3278,6 @@ func TestQueryStats(t *testing.T) {
expectedTotalSamples: 0,
expectedTotalSamplesPerStep: []int64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
},

"raw range vector selector with single point": {
expr: `dense_series[45s]`,
isInstantQuery: true,
Expand Down Expand Up @@ -3432,14 +3428,6 @@ func TestQueryStats(t *testing.T) {
expectedTotalSamples: 270,
expectedTotalSamplesPerStep: []int64{6, 12, 18, 24, 30, 30, 30, 30, 30, 30, 30},
},
"common subexpression elimination": {
expr: `sum(dense_series) + sum(dense_series)`,
isInstantQuery: true,
expectedTotalSamples: 2,
expectedTotalSamplesPerStep: []int64{2},
expectedTotalSamplesWithMQE: 1,
expectedTotalSamplesPerStepWithMQE: []int64{1},
},
// Three tests below cover PQE bug: sample counting is incorrect when subqueries with range vector selectors are wrapped in functions.
// In MQE it's fixed, so that's why cases have a skipCompareWithPrometheus set.
// See this for details: https://github.com/prometheus/prometheus/issues/16638
Expand Down Expand Up @@ -3473,14 +3461,232 @@ func TestQueryStats(t *testing.T) {
require.Equal(t, testCase.expectedTotalSamplesPerStep, prometheusSamplesStats.TotalSamplesPerStep, "invalid test case: expected per stepsamples does not match value from Prometheus' engine")
}

mimirSamplesStatsWithPlanning := runQueryAndGetSamplesStats(t, mimirEngine, testCase.expr, testCase.isInstantQuery)
if testCase.expectedTotalSamplesWithMQE != 0 {
require.Equal(t, testCase.expectedTotalSamplesWithMQE, mimirSamplesStatsWithPlanning.TotalSamples)
require.Equal(t, testCase.expectedTotalSamplesPerStepWithMQE, mimirSamplesStatsWithPlanning.TotalSamplesPerStep)
} else {
require.Equal(t, testCase.expectedTotalSamples, mimirSamplesStatsWithPlanning.TotalSamples)
require.Equal(t, testCase.expectedTotalSamplesPerStep, mimirSamplesStatsWithPlanning.TotalSamplesPerStep)
mimirSamplesStats := runQueryAndGetSamplesStats(t, mimirEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, mimirSamplesStats.TotalSamples)
require.Equal(t, testCase.expectedTotalSamplesPerStep, mimirSamplesStats.TotalSamplesPerStep)
})
}
}

func TestQueryStatsAreSameWhenCommonSubexpressionEliminationApplied(t *testing.T) {
opts := NewTestEngineOpts()
opts.CommonOpts.EnablePerStepStats = true
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), NewQueryPlanner(opts), log.NewNopLogger())
require.NoError(t, err)
prometheusEngine := promql.NewEngine(opts.CommonOpts)

storage := promqltest.LoadedStorage(t, `
load 1m
foo 0 1 2 3 4 5 6 7 8 9 10
bar 0 1 2 3 4 5 6 7 8 9 10
baz{a="1",b="1"} 0 1 2 3 4 5 6 7 8 9 10
baz{a="2",b="2"} 0 1 2 3 4 5 6 7 8 9 10
classic_histogram_series{le="0.1"} 0+1x10
classic_histogram_series{le="1"} 0+5x10
classic_histogram_series{le="10"} 0+8x10
classic_histogram_series{le="100"} 0+12x10
classic_histogram_series{le="1000"} 0+21x10
classic_histogram_series{le="+Inf"} 0+21x10
`)

start := timestamp.Time(0)
end := start.Add(10 * time.Minute)
runQueryAndGetSamplesStats := func(t *testing.T, engine promql.QueryEngine, expr string, isInstantQuery bool) *promstats.QuerySamples {
var q promql.Query
var err error
opts := promql.NewPrometheusQueryOpts(true, 0)
if isInstantQuery {
q, err = engine.NewInstantQuery(context.Background(), storage, opts, expr, end)
} else {
q, err = engine.NewRangeQuery(context.Background(), storage, opts, expr, start, end, time.Minute)
}

require.NoError(t, err)

defer q.Close()

res := q.Exec(context.Background())
require.NoError(t, res.Err)

return q.Stats().Samples
}

testCases := map[string]struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the testCases below are for instant query. Should we have testCase for range query too? So that we can see the case where totalSamples is not equal with totalSamplesPerStep.

expr string
isInstantQuery bool
expectedTotalSamples int64
expectedTotalSamplesPerStep []int64
skipCompareWithPrometheus string
skip string
}{

"vector selector duplicated twice": {
expr: `foo + foo`,
isInstantQuery: true,
expectedTotalSamples: 2,
expectedTotalSamplesPerStep: []int64{2},
},
"vector selector with several series duplicated twice": {
expr: `baz + baz`,
isInstantQuery: true,
expectedTotalSamples: 4,
expectedTotalSamplesPerStep: []int64{4},
},

"vector selector duplicated twice with other selector": {
expr: `foo + foo + bar`,
isInstantQuery: true,
expectedTotalSamples: 3,
expectedTotalSamplesPerStep: []int64{3},
},
"vector selector duplicated three times": {
expr: `foo + foo + foo`,
isInstantQuery: true,
expectedTotalSamples: 3,
expectedTotalSamplesPerStep: []int64{3},
},
"vector selector duplicated many times": {
expr: `foo + foo + foo + bar + foo`,
isInstantQuery: true,
expectedTotalSamples: 5,
expectedTotalSamplesPerStep: []int64{5},
},
"duplicated vector selector with different aggregations": {
expr: `max(foo) - min(foo)`,
isInstantQuery: true,
expectedTotalSamples: 2,
expectedTotalSamplesPerStep: []int64{2},
},
"duplicated vector selector with same aggregations": {
expr: `max(foo) + max(foo)`,
isInstantQuery: true,
expectedTotalSamples: 2,
expectedTotalSamplesPerStep: []int64{2},
},
"multiple levels of duplication: vector selector and aggregation": {
expr: `foo + sum(foo) + sum(foo) + bar + bar`,
isInstantQuery: true,
expectedTotalSamples: 5,
expectedTotalSamplesPerStep: []int64{5},
},
"multiple levels of duplication: vector selector and binary operation": {
expr: `(foo - foo) + (foo - foo)`,
isInstantQuery: true,
expectedTotalSamples: 4,
expectedTotalSamplesPerStep: []int64{4},
},
"multiple levels of duplication: multiple vector selectors and binary operation": {
expr: `(foo - foo) + (foo - foo) + (foo * bar) + (foo * bar)`,
isInstantQuery: true,
expectedTotalSamples: 8,
expectedTotalSamplesPerStep: []int64{8},
},
" duplicated binary operation with different vector selectors": {
expr: `(foo - bar) + (foo - bar)`,
isInstantQuery: true,
expectedTotalSamples: 4,
expectedTotalSamplesPerStep: []int64{4},
},
"duplicated binary operation with different matrix selectors": {
expr: `(rate(foo[5m]) - rate(bar[5m])) + (rate(foo[5m]) - rate(bar[5m]))`,
isInstantQuery: true,
expectedTotalSamples: 20,
expectedTotalSamplesPerStep: []int64{20},
},
"duplicate matrix selectors, some with different outer function": {
expr: `rate(foo[5m]) + increase(foo[5m]) + rate(foo[5m])`,
isInstantQuery: true,
expectedTotalSamples: 15,
expectedTotalSamplesPerStep: []int64{15},
},
"duplicate matrix selectors with same outer function": {
expr: `rate(foo[5m]) + rate(foo[5m])`,
isInstantQuery: true,
expectedTotalSamples: 10,
expectedTotalSamplesPerStep: []int64{10},
},
"duplicate subqueries with different outer function": {
expr: `rate(foo[5m:]) + increase(foo[5m:])`,
isInstantQuery: true,
expectedTotalSamples: 10,
expectedTotalSamplesPerStep: []int64{10},
skip: "Bug with samples count for duplicated subqueries - MQE returns 0.",
},
"duplicate subqueries with different outer function and multiple child selectors": {
expr: `rate((foo - bar)[5m:]) + increase((foo - bar)[5m:])`,
isInstantQuery: true,
expectedTotalSamples: 44,
expectedTotalSamplesPerStep: []int64{44},
skip: "Bug with samples count for duplicated subqueries – MQE returns 0.",
skipCompareWithPrometheus: "Prometheus undercounts samples when range vector selector wrapped in function inside subquery",
},
"duplicate subqueries with same outer function": {
expr: `rate(foo[5m:]) + rate(foo[5m:])`,
isInstantQuery: true,
expectedTotalSamples: 10,
expectedTotalSamplesPerStep: []int64{10},
},
"duplicate nested subqueries": {
expr: `max_over_time(rate(foo[5m:])[10m:])`,
isInstantQuery: true,
expectedTotalSamples: 44,
expectedTotalSamplesPerStep: []int64{44},
skipCompareWithPrometheus: "Prometheus undercounts samples when range vector selector wrapped in function inside subquery",
},
"duplicate selectors, both with timestamp()": {
expr: `timestamp(foo) + timestamp(foo)`,
isInstantQuery: true,
expectedTotalSamples: 2,
expectedTotalSamplesPerStep: []int64{2},
},
"duplicate selectors, one with timestamp() over an intermediate expression": {
expr: `timestamp(abs(foo)) + foo`,
isInstantQuery: true,
expectedTotalSamples: 2,
expectedTotalSamplesPerStep: []int64{2},
},
"duplicate operation with different children": {
expr: `topk(3, foo) + topk(5, foo)`,
isInstantQuery: true,
expectedTotalSamples: 2,
expectedTotalSamplesPerStep: []int64{2},
},
"duplicate expression with multiple children": {
expr: `topk(5, foo) + topk(5, foo)`,
isInstantQuery: true,
expectedTotalSamples: 2,
expectedTotalSamplesPerStep: []int64{2},
},
"duplicate expression where 'skip histogram decoding' applies to one expression": {
expr: `histogram_count(classic_histogram_series) * histogram_quantile(0.5, classic_histogram_series)`,
isInstantQuery: true,
expectedTotalSamples: 12,
expectedTotalSamplesPerStep: []int64{12},
},
"duplicate expression where 'skip histogram decoding' applies to both expressions": {
expr: `histogram_count(classic_histogram_series) * histogram_sum(classic_histogram_series)`,
isInstantQuery: true,
expectedTotalSamples: 12,
expectedTotalSamplesPerStep: []int64{12},
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
if testCase.skip != "" {
t.Skip(testCase.skip)
}

prometheusSamplesStats := runQueryAndGetSamplesStats(t, prometheusEngine, testCase.expr, testCase.isInstantQuery)

if testCase.skipCompareWithPrometheus == "" {
require.Equal(t, testCase.expectedTotalSamples, prometheusSamplesStats.TotalSamples, "invalid test case: expected total samples does not match value from Prometheus' engine")
require.Equal(t, testCase.expectedTotalSamplesPerStep, prometheusSamplesStats.TotalSamplesPerStep, "invalid test case: expected per stepsamples does not match value from Prometheus' engine")
}
Copy link
Contributor

@56quarters 56quarters Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log the reason for skipping here?

} else {
    t.Log(testCase.skipCompareWithPrometheus)
}


mimirSamplesStats := runQueryAndGetSamplesStats(t, mimirEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, mimirSamplesStats.TotalSamples)
require.Equal(t, testCase.expectedTotalSamplesPerStep, mimirSamplesStats.TotalSamplesPerStep)
})
}
}
Expand Down Expand Up @@ -3533,7 +3739,7 @@ func TestQueryStatsUpstreamTestCases(t *testing.T) {
interval time.Duration
expectedTotalSamples int64
expectedTotalSamplesPerStep []int64
// ...WithMQE expectations are optional and should be set only if a query with MQE reports different stats (eg. due to optimisations like common subexpression elimination)
// ...WithMQE expectations are optional and should be set only if a query with MQE reports different stats
expectedTotalSamplesWithMQE int64
expectedTotalSamplesPerStepWithMQE []int64
}{
Expand Down Expand Up @@ -3719,12 +3925,10 @@ func TestQueryStatsUpstreamTestCases(t *testing.T) {
expectedTotalSamplesPerStep: []int64{36},
},
{
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))",
start: time.Unix(201, 0),
expectedTotalSamples: 72, // 2 * (3 sample per query * 12 queries (60/5))
expectedTotalSamplesPerStep: []int64{72},
expectedTotalSamplesWithMQE: 36, // 72/2 due to common subexpression elimination
expectedTotalSamplesPerStepWithMQE: []int64{36},
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))",
start: time.Unix(201, 0),
expectedTotalSamples: 72, // 2 * (3 sample per query * 12 queries (60/5))
expectedTotalSamplesPerStep: []int64{72},
},
{
query: `metricWith3SampleEvery10Seconds{a="1"}`,
Expand Down Expand Up @@ -3833,14 +4037,12 @@ func TestQueryStatsUpstreamTestCases(t *testing.T) {
expectedTotalSamplesPerStep: []int64{12, 12, 12, 12},
},
{
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))",
start: time.Unix(201, 0),
end: time.Unix(220, 0),
interval: 5 * time.Second,
expectedTotalSamples: 288, // 2 * (3 sample per query * 12 queries (60/5) * 4 steps)
expectedTotalSamplesPerStep: []int64{72, 72, 72, 72},
expectedTotalSamplesWithMQE: 144, // 288/2 due to common sub-expression elimination
expectedTotalSamplesPerStepWithMQE: []int64{36, 36, 36, 36},
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))",
start: time.Unix(201, 0),
end: time.Unix(220, 0),
interval: 5 * time.Second,
expectedTotalSamples: 288, // 2 * (3 sample per query * 12 queries (60/5) * 4 steps)
expectedTotalSamplesPerStep: []int64{72, 72, 72, 72},
},
{
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith1SampleEvery10Seconds[60s:5s]))",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
lastHistogram = h

// For consistency with Prometheus' engine, we convert each histogram point to an equivalent number of float points.
v.Stats.IncrementSamplesAtStep(stepIndex, types.EquivalentFloatSampleCount(h))
v.Stats.IncrementSamplesAtStep(stepIndex, types.EquivalentFloatSampleCount(h)*int64(v.Selector.GetSampleCountFactor()))

} else {
// Only create the slice once we know the series is a histogram or not.
Expand All @@ -174,7 +174,7 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
return types.InstantVectorSeriesData{}, err
}
}
v.Stats.IncrementSamplesAtStep(stepIndex, 1)
v.Stats.IncrementSamplesAtStep(stepIndex, int64(v.Selector.GetSampleCountFactor()))
data.Floats = append(data.Floats, promql.FPoint{T: stepT, F: f})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (m *RangeVectorSelector) NextStepSamples() (*types.RangeVectorStepData, err
m.stepData.RangeStart = rangeStart
m.stepData.RangeEnd = rangeEnd

m.Stats.IncrementSamplesAtTimestamp(m.stepData.StepT, int64(m.stepData.Floats.Count())+m.stepData.Histograms.EquivalentFloatSampleCount())
m.Stats.IncrementSamplesAtTimestamp(m.stepData.StepT, (int64(m.stepData.Floats.Count())+m.stepData.Histograms.EquivalentFloatSampleCount())*int64(m.Selector.GetSampleCountFactor()))
Copy link
Contributor

@lamida lamida Jul 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With some castings, it takes sometime to see what is being computed. What if we extract some of the operation to local variables to make it clearer?

Suggested change
m.Stats.IncrementSamplesAtTimestamp(m.stepData.StepT, (int64(m.stepData.Floats.Count())+m.stepData.Histograms.EquivalentFloatSampleCount())*int64(m.Selector.GetSampleCountFactor()))
floatCount := int64(m.stepData.Floats.Count())
histogramCount := m.stepData.Histograms.EquivalentFloatSampleCount()
sampleFactor := int64(m.Selector.GetSampleCountFactor())
totalSamples := (floatCount + histogramCount) * sampleFactor
m.Stats.IncrementSamplesAtTimestamp(m.stepData.StepT, totalSamples)```


return m.stepData, nil
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/streamingpromql/operators/selectors/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ type Selector struct {
series *seriesList

seriesIdx int

// Sample count factor for stats counting when applying CSE.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Sample count factor for stats counting when applying CSE.
// Sample count factor for stats counting at each step when applying common sub-expression elimination.

sampleCountFactor uint32
}

func (s *Selector) SetSampleCountFactor(factor uint32) {
s.sampleCountFactor = factor
}
Comment on lines +50 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in another comment, since we have setter method here, just add logic to set only for non-zero factor. Hence we can remove non-zero check in the caller.

Suggested change
func (s *Selector) SetSampleCountFactor(factor uint32) {
s.sampleCountFactor = factor
}
func (s *Selector) SetSampleCountFactor(factor uint32) {
if factor != 0 {
s.sampleCountFactor = factor
}
}


func (s *Selector) GetSampleCountFactor() uint32 {
if s.sampleCountFactor == 0 {
return 1
}
return s.sampleCountFactor
}

func (s *Selector) Prepare(ctx context.Context, _ *types.PrepareParams) error {
Expand Down
Loading
Loading