-
Notifications
You must be signed in to change notification settings - Fork 621
MQE: Count samples saved on the CSE #12169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3bab689
ed3cbe6
c8da97a
de2cfc6
3afbb54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3236,9 +3236,6 @@ func TestQueryStats(t *testing.T) { | |
expectedTotalSamples int64 | ||
expectedTotalSamplesPerStep []int64 | ||
skipCompareWithPrometheus string | ||
// ...WithMQE expectations are optional and should be set only if a query with MQE reports different stats (eg. due to optimisations like common subexpression elimination) | ||
expectedTotalSamplesWithMQE int64 | ||
expectedTotalSamplesPerStepWithMQE []int64 | ||
}{ | ||
"instant vector selector with point at every time step": { | ||
expr: `dense_series{}`, | ||
|
@@ -3281,7 +3278,6 @@ func TestQueryStats(t *testing.T) { | |
expectedTotalSamples: 0, | ||
expectedTotalSamplesPerStep: []int64{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, | ||
}, | ||
|
||
"raw range vector selector with single point": { | ||
expr: `dense_series[45s]`, | ||
isInstantQuery: true, | ||
|
@@ -3432,14 +3428,6 @@ func TestQueryStats(t *testing.T) { | |
expectedTotalSamples: 270, | ||
expectedTotalSamplesPerStep: []int64{6, 12, 18, 24, 30, 30, 30, 30, 30, 30, 30}, | ||
}, | ||
"common subexpression elimination": { | ||
expr: `sum(dense_series) + sum(dense_series)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 2, | ||
expectedTotalSamplesPerStep: []int64{2}, | ||
expectedTotalSamplesWithMQE: 1, | ||
expectedTotalSamplesPerStepWithMQE: []int64{1}, | ||
}, | ||
// Three tests below cover PQE bug: sample counting is incorrect when subqueries with range vector selectors are wrapped in functions. | ||
// In MQE it's fixed, so that's why cases have a skipCompareWithPrometheus set. | ||
// See this for details: https://github.com/prometheus/prometheus/issues/16638 | ||
|
@@ -3473,14 +3461,232 @@ func TestQueryStats(t *testing.T) { | |
require.Equal(t, testCase.expectedTotalSamplesPerStep, prometheusSamplesStats.TotalSamplesPerStep, "invalid test case: expected per stepsamples does not match value from Prometheus' engine") | ||
} | ||
|
||
mimirSamplesStatsWithPlanning := runQueryAndGetSamplesStats(t, mimirEngine, testCase.expr, testCase.isInstantQuery) | ||
if testCase.expectedTotalSamplesWithMQE != 0 { | ||
require.Equal(t, testCase.expectedTotalSamplesWithMQE, mimirSamplesStatsWithPlanning.TotalSamples) | ||
require.Equal(t, testCase.expectedTotalSamplesPerStepWithMQE, mimirSamplesStatsWithPlanning.TotalSamplesPerStep) | ||
} else { | ||
require.Equal(t, testCase.expectedTotalSamples, mimirSamplesStatsWithPlanning.TotalSamples) | ||
require.Equal(t, testCase.expectedTotalSamplesPerStep, mimirSamplesStatsWithPlanning.TotalSamplesPerStep) | ||
mimirSamplesStats := runQueryAndGetSamplesStats(t, mimirEngine, testCase.expr, testCase.isInstantQuery) | ||
require.Equal(t, testCase.expectedTotalSamples, mimirSamplesStats.TotalSamples) | ||
require.Equal(t, testCase.expectedTotalSamplesPerStep, mimirSamplesStats.TotalSamplesPerStep) | ||
}) | ||
} | ||
} | ||
|
||
func TestQueryStatsAreSameWhenCommonSubexpressionEliminationApplied(t *testing.T) { | ||
opts := NewTestEngineOpts() | ||
opts.CommonOpts.EnablePerStepStats = true | ||
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), NewQueryPlanner(opts), log.NewNopLogger()) | ||
require.NoError(t, err) | ||
prometheusEngine := promql.NewEngine(opts.CommonOpts) | ||
|
||
storage := promqltest.LoadedStorage(t, ` | ||
load 1m | ||
foo 0 1 2 3 4 5 6 7 8 9 10 | ||
bar 0 1 2 3 4 5 6 7 8 9 10 | ||
baz{a="1",b="1"} 0 1 2 3 4 5 6 7 8 9 10 | ||
baz{a="2",b="2"} 0 1 2 3 4 5 6 7 8 9 10 | ||
classic_histogram_series{le="0.1"} 0+1x10 | ||
classic_histogram_series{le="1"} 0+5x10 | ||
classic_histogram_series{le="10"} 0+8x10 | ||
classic_histogram_series{le="100"} 0+12x10 | ||
classic_histogram_series{le="1000"} 0+21x10 | ||
classic_histogram_series{le="+Inf"} 0+21x10 | ||
`) | ||
|
||
start := timestamp.Time(0) | ||
end := start.Add(10 * time.Minute) | ||
runQueryAndGetSamplesStats := func(t *testing.T, engine promql.QueryEngine, expr string, isInstantQuery bool) *promstats.QuerySamples { | ||
var q promql.Query | ||
var err error | ||
opts := promql.NewPrometheusQueryOpts(true, 0) | ||
if isInstantQuery { | ||
q, err = engine.NewInstantQuery(context.Background(), storage, opts, expr, end) | ||
} else { | ||
q, err = engine.NewRangeQuery(context.Background(), storage, opts, expr, start, end, time.Minute) | ||
} | ||
|
||
require.NoError(t, err) | ||
|
||
defer q.Close() | ||
|
||
res := q.Exec(context.Background()) | ||
require.NoError(t, res.Err) | ||
|
||
return q.Stats().Samples | ||
} | ||
|
||
testCases := map[string]struct { | ||
expr string | ||
isInstantQuery bool | ||
expectedTotalSamples int64 | ||
expectedTotalSamplesPerStep []int64 | ||
skipCompareWithPrometheus string | ||
skip string | ||
}{ | ||
|
||
"vector selector duplicated twice": { | ||
expr: `foo + foo`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 2, | ||
expectedTotalSamplesPerStep: []int64{2}, | ||
}, | ||
"vector selector with several series duplicated twice": { | ||
expr: `baz + baz`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 4, | ||
expectedTotalSamplesPerStep: []int64{4}, | ||
}, | ||
|
||
"vector selector duplicated twice with other selector": { | ||
expr: `foo + foo + bar`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 3, | ||
expectedTotalSamplesPerStep: []int64{3}, | ||
}, | ||
"vector selector duplicated three times": { | ||
expr: `foo + foo + foo`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 3, | ||
expectedTotalSamplesPerStep: []int64{3}, | ||
}, | ||
"vector selector duplicated many times": { | ||
expr: `foo + foo + foo + bar + foo`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 5, | ||
expectedTotalSamplesPerStep: []int64{5}, | ||
}, | ||
"duplicated vector selector with different aggregations": { | ||
expr: `max(foo) - min(foo)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 2, | ||
expectedTotalSamplesPerStep: []int64{2}, | ||
}, | ||
"duplicated vector selector with same aggregations": { | ||
expr: `max(foo) + max(foo)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 2, | ||
expectedTotalSamplesPerStep: []int64{2}, | ||
}, | ||
"multiple levels of duplication: vector selector and aggregation": { | ||
expr: `foo + sum(foo) + sum(foo) + bar + bar`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 5, | ||
expectedTotalSamplesPerStep: []int64{5}, | ||
}, | ||
"multiple levels of duplication: vector selector and binary operation": { | ||
expr: `(foo - foo) + (foo - foo)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 4, | ||
expectedTotalSamplesPerStep: []int64{4}, | ||
}, | ||
"multiple levels of duplication: multiple vector selectors and binary operation": { | ||
expr: `(foo - foo) + (foo - foo) + (foo * bar) + (foo * bar)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 8, | ||
expectedTotalSamplesPerStep: []int64{8}, | ||
}, | ||
" duplicated binary operation with different vector selectors": { | ||
expr: `(foo - bar) + (foo - bar)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 4, | ||
expectedTotalSamplesPerStep: []int64{4}, | ||
}, | ||
"duplicated binary operation with different matrix selectors": { | ||
expr: `(rate(foo[5m]) - rate(bar[5m])) + (rate(foo[5m]) - rate(bar[5m]))`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 20, | ||
expectedTotalSamplesPerStep: []int64{20}, | ||
}, | ||
"duplicate matrix selectors, some with different outer function": { | ||
expr: `rate(foo[5m]) + increase(foo[5m]) + rate(foo[5m])`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 15, | ||
expectedTotalSamplesPerStep: []int64{15}, | ||
}, | ||
"duplicate matrix selectors with same outer function": { | ||
expr: `rate(foo[5m]) + rate(foo[5m])`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 10, | ||
expectedTotalSamplesPerStep: []int64{10}, | ||
}, | ||
"duplicate subqueries with different outer function": { | ||
expr: `rate(foo[5m:]) + increase(foo[5m:])`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 10, | ||
expectedTotalSamplesPerStep: []int64{10}, | ||
skip: "Bug with samples count for duplicated subqueries - MQE returns 0.", | ||
}, | ||
"duplicate subqueries with different outer function and multiple child selectors": { | ||
expr: `rate((foo - bar)[5m:]) + increase((foo - bar)[5m:])`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 44, | ||
expectedTotalSamplesPerStep: []int64{44}, | ||
skip: "Bug with samples count for duplicated subqueries – MQE returns 0.", | ||
skipCompareWithPrometheus: "Prometheus undercounts samples when range vector selector wrapped in function inside subquery", | ||
}, | ||
"duplicate subqueries with same outer function": { | ||
expr: `rate(foo[5m:]) + rate(foo[5m:])`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 10, | ||
expectedTotalSamplesPerStep: []int64{10}, | ||
}, | ||
"duplicate nested subqueries": { | ||
expr: `max_over_time(rate(foo[5m:])[10m:])`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 44, | ||
expectedTotalSamplesPerStep: []int64{44}, | ||
skipCompareWithPrometheus: "Prometheus undercounts samples when range vector selector wrapped in function inside subquery", | ||
}, | ||
"duplicate selectors, both with timestamp()": { | ||
expr: `timestamp(foo) + timestamp(foo)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 2, | ||
expectedTotalSamplesPerStep: []int64{2}, | ||
}, | ||
"duplicate selectors, one with timestamp() over an intermediate expression": { | ||
expr: `timestamp(abs(foo)) + foo`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 2, | ||
expectedTotalSamplesPerStep: []int64{2}, | ||
}, | ||
"duplicate operation with different children": { | ||
expr: `topk(3, foo) + topk(5, foo)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 2, | ||
expectedTotalSamplesPerStep: []int64{2}, | ||
}, | ||
"duplicate expression with multiple children": { | ||
expr: `topk(5, foo) + topk(5, foo)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 2, | ||
expectedTotalSamplesPerStep: []int64{2}, | ||
}, | ||
"duplicate expression where 'skip histogram decoding' applies to one expression": { | ||
expr: `histogram_count(classic_histogram_series) * histogram_quantile(0.5, classic_histogram_series)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 12, | ||
expectedTotalSamplesPerStep: []int64{12}, | ||
}, | ||
"duplicate expression where 'skip histogram decoding' applies to both expressions": { | ||
expr: `histogram_count(classic_histogram_series) * histogram_sum(classic_histogram_series)`, | ||
isInstantQuery: true, | ||
expectedTotalSamples: 12, | ||
expectedTotalSamplesPerStep: []int64{12}, | ||
}, | ||
} | ||
|
||
for name, testCase := range testCases { | ||
t.Run(name, func(t *testing.T) { | ||
if testCase.skip != "" { | ||
t.Skip(testCase.skip) | ||
} | ||
|
||
prometheusSamplesStats := runQueryAndGetSamplesStats(t, prometheusEngine, testCase.expr, testCase.isInstantQuery) | ||
|
||
if testCase.skipCompareWithPrometheus == "" { | ||
require.Equal(t, testCase.expectedTotalSamples, prometheusSamplesStats.TotalSamples, "invalid test case: expected total samples does not match value from Prometheus' engine") | ||
require.Equal(t, testCase.expectedTotalSamplesPerStep, prometheusSamplesStats.TotalSamplesPerStep, "invalid test case: expected per stepsamples does not match value from Prometheus' engine") | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we log the reason for skipping here? } else {
t.Log(testCase.skipCompareWithPrometheus)
} |
||
|
||
mimirSamplesStats := runQueryAndGetSamplesStats(t, mimirEngine, testCase.expr, testCase.isInstantQuery) | ||
require.Equal(t, testCase.expectedTotalSamples, mimirSamplesStats.TotalSamples) | ||
require.Equal(t, testCase.expectedTotalSamplesPerStep, mimirSamplesStats.TotalSamplesPerStep) | ||
}) | ||
} | ||
} | ||
|
@@ -3533,7 +3739,7 @@ func TestQueryStatsUpstreamTestCases(t *testing.T) { | |
interval time.Duration | ||
expectedTotalSamples int64 | ||
expectedTotalSamplesPerStep []int64 | ||
// ...WithMQE expectations are optional and should be set only if a query with MQE reports different stats (eg. due to optimisations like common subexpression elimination) | ||
// ...WithMQE expectations are optional and should be set only if a query with MQE reports different stats | ||
expectedTotalSamplesWithMQE int64 | ||
expectedTotalSamplesPerStepWithMQE []int64 | ||
}{ | ||
|
@@ -3719,12 +3925,10 @@ func TestQueryStatsUpstreamTestCases(t *testing.T) { | |
expectedTotalSamplesPerStep: []int64{36}, | ||
}, | ||
{ | ||
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))", | ||
start: time.Unix(201, 0), | ||
expectedTotalSamples: 72, // 2 * (3 sample per query * 12 queries (60/5)) | ||
expectedTotalSamplesPerStep: []int64{72}, | ||
expectedTotalSamplesWithMQE: 36, // 72/2 due to common subexpression elimination | ||
expectedTotalSamplesPerStepWithMQE: []int64{36}, | ||
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))", | ||
start: time.Unix(201, 0), | ||
expectedTotalSamples: 72, // 2 * (3 sample per query * 12 queries (60/5)) | ||
expectedTotalSamplesPerStep: []int64{72}, | ||
}, | ||
{ | ||
query: `metricWith3SampleEvery10Seconds{a="1"}`, | ||
|
@@ -3833,14 +4037,12 @@ func TestQueryStatsUpstreamTestCases(t *testing.T) { | |
expectedTotalSamplesPerStep: []int64{12, 12, 12, 12}, | ||
}, | ||
{ | ||
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))", | ||
start: time.Unix(201, 0), | ||
end: time.Unix(220, 0), | ||
interval: 5 * time.Second, | ||
expectedTotalSamples: 288, // 2 * (3 sample per query * 12 queries (60/5) * 4 steps) | ||
expectedTotalSamplesPerStep: []int64{72, 72, 72, 72}, | ||
expectedTotalSamplesWithMQE: 144, // 288/2 due to common sub-expression elimination | ||
expectedTotalSamplesPerStepWithMQE: []int64{36, 36, 36, 36}, | ||
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s]))", | ||
start: time.Unix(201, 0), | ||
end: time.Unix(220, 0), | ||
interval: 5 * time.Second, | ||
expectedTotalSamples: 288, // 2 * (3 sample per query * 12 queries (60/5) * 4 steps) | ||
expectedTotalSamplesPerStep: []int64{72, 72, 72, 72}, | ||
}, | ||
{ | ||
query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s:5s])) + sum(max_over_time(metricWith1SampleEvery10Seconds[60s:5s]))", | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -103,7 +103,7 @@ func (m *RangeVectorSelector) NextStepSamples() (*types.RangeVectorStepData, err | |||||||||||||||
m.stepData.RangeStart = rangeStart | ||||||||||||||||
m.stepData.RangeEnd = rangeEnd | ||||||||||||||||
|
||||||||||||||||
m.Stats.IncrementSamplesAtTimestamp(m.stepData.StepT, int64(m.stepData.Floats.Count())+m.stepData.Histograms.EquivalentFloatSampleCount()) | ||||||||||||||||
m.Stats.IncrementSamplesAtTimestamp(m.stepData.StepT, (int64(m.stepData.Floats.Count())+m.stepData.Histograms.EquivalentFloatSampleCount())*int64(m.Selector.GetSampleCountFactor())) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With some castings, it takes sometime to see what is being computed. What if we extract some of the operation to local variables to make it clearer?
Suggested change
|
||||||||||||||||
|
||||||||||||||||
return m.stepData, nil | ||||||||||||||||
} | ||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -42,6 +42,20 @@ type Selector struct { | |||||||||||||||||
series *seriesList | ||||||||||||||||||
|
||||||||||||||||||
seriesIdx int | ||||||||||||||||||
|
||||||||||||||||||
// Sample count factor for stats counting when applying CSE. | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
sampleCountFactor uint32 | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
func (s *Selector) SetSampleCountFactor(factor uint32) { | ||||||||||||||||||
s.sampleCountFactor = factor | ||||||||||||||||||
} | ||||||||||||||||||
Comment on lines
+50
to
+52
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I mentioned in another comment, since we have setter method here, just add logic to set only for non-zero factor. Hence we can remove non-zero check in the caller.
Suggested change
|
||||||||||||||||||
|
||||||||||||||||||
func (s *Selector) GetSampleCountFactor() uint32 { | ||||||||||||||||||
if s.sampleCountFactor == 0 { | ||||||||||||||||||
return 1 | ||||||||||||||||||
} | ||||||||||||||||||
return s.sampleCountFactor | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
func (s *Selector) Prepare(ctx context.Context, _ *types.PrepareParams) error { | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the testCases below are for instant query. Should we have testCase for range query too? So that we can see the case where totalSamples is not equal with totalSamplesPerStep.