Skip to content

Track stale series in the Head block of TSDB #16925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
Expand Down Expand Up @@ -68,6 +69,7 @@ var (
type Head struct {
chunkRange atomic.Int64
numSeries atomic.Uint64
numStaleSeries atomic.Uint64
minOOOTime, maxOOOTime atomic.Int64 // TODO(jesusvazquez) These should be updated after garbage collection.
minTime, maxTime atomic.Int64 // Current min and max of the samples included in the head. TODO(jesusvazquez) Ensure these are properly tracked.
minValidTime atomic.Int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
Expand Down Expand Up @@ -360,6 +362,7 @@ func (h *Head) resetWLReplayResources() {
type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.GaugeFunc
staleSeries prometheus.GaugeFunc
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
Expand Down Expand Up @@ -406,6 +409,12 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
}, func() float64 {
return float64(h.NumSeries())
}),
staleSeries: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_stale_series",
Help: "Total number of stale series in the head block.",
}, func() float64 {
return float64(h.NumStaleSeries())
}),
seriesCreated: prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total",
Help: "Total number of series created in the head",
Expand Down Expand Up @@ -1607,7 +1616,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {

// Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef, &h.numStaleSeries)
seriesRemoved := len(deleted)

h.metrics.seriesRemoved.Add(float64(seriesRemoved))
Expand Down Expand Up @@ -1645,11 +1654,16 @@ func (h *Head) Tombstones() (tombstones.Reader, error) {
return h.tombstones, nil
}

// NumSeries returns the number of active series in the head.
// NumSeries returns the number of series tracked in the head.
func (h *Head) NumSeries() uint64 {
return h.numSeries.Load()
}

// NumStaleSeries returns the number of stale series in the head.
func (h *Head) NumStaleSeries() uint64 {
return h.numStaleSeries.Load()
}

var headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD")

// Meta returns meta information about the head.
Expand Down Expand Up @@ -1929,7 +1943,7 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
// and there's no easy way to cast maps.
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef, numStaleSeries *atomic.Uint64) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
var (
deleted = map[storage.SeriesRef]struct{}{}
affected = map[labels.Label]struct{}{}
Expand Down Expand Up @@ -1987,6 +2001,12 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
defer s.locks[refShard].Unlock()
}

if value.IsStaleNaN(series.lastValue) ||
(series.lastHistogramValue != nil && value.IsStaleNaN(series.lastHistogramValue.Sum)) ||
(series.lastFloatHistogramValue != nil && value.IsStaleNaN(series.lastFloatHistogramValue.Sum)) {
numStaleSeries.Dec()
}

deleted[storage.SeriesRef(series.ref)] = struct{}{}
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
s.hashes[hashShard].del(hash, series.ref)
Expand Down
32 changes: 32 additions & 0 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,8 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
acc.floatsAppended--
}
default:
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
Expand All @@ -1230,6 +1232,12 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
if newlyStale {
a.head.numStaleSeries.Inc()
}
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
// The sample is an exact duplicate, and should be silently dropped.
acc.floatsAppended--
Expand Down Expand Up @@ -1310,6 +1318,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
acc.histogramsAppended--
}
default:
newlyStale := value.IsStaleNaN(s.H.Sum)
staleToNonStale := false
if series.lastHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum)
}
ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
Expand All @@ -1318,6 +1332,12 @@ func (a *headAppender) commitHistograms(acc *appenderCommitContext) {
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
if newlyStale {
a.head.numStaleSeries.Inc()
}
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
acc.histogramsAppended--
acc.histoOOORejected++
Expand Down Expand Up @@ -1398,6 +1418,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
acc.histogramsAppended--
}
default:
newlyStale := value.IsStaleNaN(s.FH.Sum)
staleToNonStale := false
if series.lastFloatHistogramValue != nil {
newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum)
staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum)
}
ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
Expand All @@ -1406,6 +1432,12 @@ func (a *headAppender) commitFloatHistograms(acc *appenderCommitContext) {
if s.T > acc.inOrderMaxt {
acc.inOrderMaxt = s.T
}
if newlyStale {
a.head.numStaleSeries.Inc()
}
if staleToNonStale {
a.head.numStaleSeries.Dec()
}
} else {
acc.histogramsAppended--
acc.histoOOORejected++
Expand Down
129 changes: 128 additions & 1 deletion tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6435,7 +6435,7 @@ func TestStripeSeries_gc(t *testing.T) {
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
hash := ms1.lset.Hash()

s.gc(0, 0)
s.gc(0, 0, nil)

// Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series
got := s.getByHash(hash, ms1.lset)
Expand Down Expand Up @@ -6866,3 +6866,130 @@ func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(sto

wg.Wait()
}

func TestHead_NumStaleSeries(t *testing.T) {
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
require.NoError(t, head.Close())
})
require.NoError(t, head.Init(0))

// Initially, no series should be stale.
require.Equal(t, uint64(0), head.NumStaleSeries())

appendSample := func(lbls labels.Labels, ts int64, val float64) {
app := head.Appender(context.Background())
_, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
appendHistogram := func(lbls labels.Labels, ts int64, val *histogram.Histogram) {
app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, lbls, ts, val, nil)
require.NoError(t, err)
require.NoError(t, app.Commit())
}
appendFloatHistogram := func(lbls labels.Labels, ts int64, val *histogram.FloatHistogram) {
app := head.Appender(context.Background())
_, err := app.AppendHistogram(0, lbls, ts, nil, val)
require.NoError(t, err)
require.NoError(t, app.Commit())
}

verifySeriesCounts := func(numStaleSeries, numSeries int) {
require.Equal(t, uint64(numStaleSeries), head.NumStaleSeries())
require.Equal(t, uint64(numSeries), head.NumSeries())
}

// Create some series with normal samples.
series1 := labels.FromStrings("name", "series1", "label", "value1")
series2 := labels.FromStrings("name", "series2", "label", "value2")
series3 := labels.FromStrings("name", "series3", "label", "value3")

// Add normal samples to all series.
appendSample(series1, 100, 1)
appendSample(series2, 100, 2)
appendSample(series3, 100, 3)
// Still no stale series.
verifySeriesCounts(0, 3)

// Make series1 stale by appending a stale sample. Now we should have 1 stale series.
appendSample(series1, 200, math.Float64frombits(value.StaleNaN))
verifySeriesCounts(1, 3)

// Make series2 stale as well.
appendSample(series2, 200, math.Float64frombits(value.StaleNaN))
verifySeriesCounts(2, 3)

// Add a non-stale sample to series1. It should not be counted as stale now.
appendSample(series1, 300, 10)
verifySeriesCounts(1, 3)

// Test that series3 doesn't become stale when we add another normal sample.
appendSample(series3, 200, 10)
verifySeriesCounts(1, 3)

// Test histogram stale samples as well.
series4 := labels.FromStrings("name", "series4", "type", "histogram")
h := tsdbutil.GenerateTestHistograms(1)[0]
appendHistogram(series4, 100, h)
verifySeriesCounts(1, 4)

// Make histogram series stale.
staleHist := h.Copy()
staleHist.Sum = math.Float64frombits(value.StaleNaN)
appendHistogram(series4, 200, staleHist)
verifySeriesCounts(2, 4)

// Test float histogram stale samples.
series5 := labels.FromStrings("name", "series5", "type", "float_histogram")
fh := tsdbutil.GenerateTestFloatHistograms(1)[0]
appendFloatHistogram(series5, 100, fh)
verifySeriesCounts(2, 5)

// Make float histogram series stale.
staleFH := fh.Copy()
staleFH.Sum = math.Float64frombits(value.StaleNaN)
appendFloatHistogram(series5, 200, staleFH)
verifySeriesCounts(3, 5)

// Make histogram sample non-stale and stale back again.
appendHistogram(series4, 210, h)
verifySeriesCounts(2, 5)
appendHistogram(series4, 220, staleHist)
verifySeriesCounts(3, 5)

// Make float histogram sample non-stale and stale back again.
appendFloatHistogram(series5, 210, fh)
verifySeriesCounts(2, 5)
appendFloatHistogram(series5, 220, staleFH)
verifySeriesCounts(3, 5)

// Series 1 and 3 are not stale at this point. Add a new sample to series 1 and series 5,
// so after the GC and removing series 2, 3, 4, we should be left with 1 stale and 1 non-stale series.
appendSample(series1, 400, 10)
appendFloatHistogram(series5, 400, staleFH)
verifySeriesCounts(3, 5)

// Test garbage collection behavior - stale series should be decremented when GC'd.
// Force a garbage collection by truncating old data.
require.NoError(t, head.Truncate(300))

// After truncation, run GC to collect old chunks/series.
head.gc()

// series 1 and series 5 are left.
verifySeriesCounts(1, 2)

// Test creating a new series for each of float, histogram, float histogram that starts as stale.
// This should be counted as stale.
series6 := labels.FromStrings("name", "series6", "direct", "stale")
series7 := labels.FromStrings("name", "series7", "direct", "stale")
series8 := labels.FromStrings("name", "series8", "direct", "stale")
appendSample(series6, 400, math.Float64frombits(value.StaleNaN))
verifySeriesCounts(2, 3)
appendHistogram(series7, 400, staleHist)
verifySeriesCounts(3, 4)
appendFloatHistogram(series8, 400, staleFH)
verifySeriesCounts(4, 5)
}
Loading