Skip to content

Commit e97eecb

Browse files
committed
respect created-timestamp-zero-ingestion feature flag
Signed-off-by: David Ashpole <[email protected]>
1 parent 572e778 commit e97eecb

File tree

3 files changed

+25
-15
lines changed

3 files changed

+25
-15
lines changed

storage/remote/otlptranslator/prometheusremotewrite/combined_appender.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ import (
3131
// NewCombinedAppender creates a combined appender that sets start times and
3232
// updates metadata for each series only once, and appends samples and
3333
// exemplars for each call.
34-
func NewCombinedAppender(app storage.Appender, logger *slog.Logger, reg prometheus.Registerer) CombinedAppender {
34+
func NewCombinedAppender(app storage.Appender, logger *slog.Logger, reg prometheus.Registerer, ingestCTZeroSample bool) CombinedAppender {
3535
return &combinedAppender{
36-
app: app,
37-
logger: logger,
38-
refs: make(map[uint64]storage.SeriesRef),
36+
app: app,
37+
logger: logger,
38+
ingestCTZeroSample: ingestCTZeroSample,
39+
refs: make(map[uint64]storage.SeriesRef),
3940
samplesAppendedWithoutMetadata: promauto.With(reg).NewCounter(prometheus.CounterOpts{
4041
Namespace: "prometheus",
4142
Subsystem: "api",
@@ -67,6 +68,7 @@ type combinedAppender struct {
6768
logger *slog.Logger
6869
samplesAppendedWithoutMetadata prometheus.Counter
6970
outOfOrderExemplars prometheus.Counter
71+
ingestCTZeroSample bool
7072
// Used to ensure we only update metadata and created timestamps once, and to share storage.SeriesRefs.
7173
refs map[uint64]storage.SeriesRef
7274
}
@@ -80,7 +82,7 @@ func (b *combinedAppender) AppendSample(ls labels.Labels, meta metadata.Metadata
8082
b.samplesAppendedWithoutMetadata.Add(1)
8183
b.logger.Debug("error while updating metadata from OTLP", "err", err)
8284
}
83-
if ct != 0 {
85+
if ct != 0 && b.ingestCTZeroSample {
8486
ref, err = b.app.AppendCTZeroSample(ref, ls, t, ct)
8587
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) {
8688
// Even for the first sample OOO is a common scenario because
@@ -114,12 +116,14 @@ func (b *combinedAppender) AppendHistogram(ls labels.Labels, meta metadata.Metad
114116
b.samplesAppendedWithoutMetadata.Add(1)
115117
b.logger.Debug("error while updating metadata from OTLP", "err", err)
116118
}
117-
ref, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil)
118-
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) {
119-
// Even for the first sample OOO is a common scenario because
120-
// we can't tell if a CT was already ingested in a previous request.
121-
// We ignore the error.
122-
b.logger.Debug("Error when appending Histogram CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ct, "timestamp", t)
119+
if b.ingestCTZeroSample {
120+
ref, err = b.app.AppendHistogramCTZeroSample(ref, ls, t, ct, h, nil)
121+
if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) {
122+
// Even for the first sample OOO is a common scenario because
123+
// we can't tell if a CT was already ingested in a previous request.
124+
// We ignore the error.
125+
b.logger.Debug("Error when appending Histogram CT in remote write request", "err", err, "series", ls.String(), "created_timestamp", ct, "timestamp", t)
126+
}
123127
}
124128
}
125129
ref, err = b.app.AppendHistogram(ref, ls, t, h, nil)

storage/remote/write_handler.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,9 @@ type OTLPOptions struct {
533533
// LookbackDelta is the query lookback delta.
534534
// Used to calculate the target_info sample timestamp interval.
535535
LookbackDelta time.Duration
536+
// IngestCTZeroSample enables writing zero samples based on the start time
537+
// of metrics.
538+
IngestCTZeroSample bool
536539
}
537540

538541
// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
@@ -549,6 +552,7 @@ func NewOTLPWriteHandler(logger *slog.Logger, reg prometheus.Registerer, appenda
549552
config: configFunc,
550553
allowDeltaTemporality: opts.NativeDelta,
551554
lookbackDelta: opts.LookbackDelta,
555+
ingestCTZeroSample: opts.IngestCTZeroSample,
552556
reg: reg,
553557
}
554558

@@ -588,6 +592,7 @@ type rwExporter struct {
588592
config func() config.Config
589593
allowDeltaTemporality bool
590594
lookbackDelta time.Duration
595+
ingestCTZeroSample bool
591596
reg prometheus.Registerer
592597
}
593598

@@ -597,7 +602,7 @@ func (rw *rwExporter) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) er
597602
Appender: rw.appendable.Appender(ctx),
598603
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
599604
}
600-
combinedAppender := otlptranslator.NewCombinedAppender(app, rw.logger, rw.reg)
605+
combinedAppender := otlptranslator.NewCombinedAppender(app, rw.logger, rw.reg, rw.ingestCTZeroSample)
601606
converter := otlptranslator.NewPrometheusConverter(combinedAppender)
602607
annots, err := converter.FromMetrics(ctx, md, otlptranslator.Settings{
603608
AddMetricSuffixes: otlpCfg.TranslationStrategy.ShouldAddSuffixes(),

web/api/v1/api.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,10 @@ func NewAPI(
312312
}
313313
if otlpEnabled {
314314
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, registerer, ap, configFunc, remote.OTLPOptions{
315-
ConvertDelta: otlpDeltaToCumulative,
316-
NativeDelta: otlpNativeDeltaIngestion,
317-
LookbackDelta: lookbackDelta,
315+
ConvertDelta: otlpDeltaToCumulative,
316+
NativeDelta: otlpNativeDeltaIngestion,
317+
LookbackDelta: lookbackDelta,
318+
IngestCTZeroSample: ctZeroIngestionEnabled,
318319
})
319320
}
320321

0 commit comments

Comments
 (0)