From 20b4d738b77f521970936d8fddc44f61b94b7be8 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 24 Mar 2026 17:51:36 +0900 Subject: [PATCH 1/7] Support Start TimeStamp on PRW2 Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + integration/remote_write_v2_test.go | 277 +++++++++++++++++++++++++++ pkg/cortexpb/cortex.pb.go | 286 ++++++++++++++++++---------- pkg/cortexpb/cortex.proto | 3 + pkg/ingester/ingester.go | 14 ++ pkg/ingester/ingester_test.go | 89 +++++++++ pkg/util/push/push.go | 16 +- pkg/util/push/push_test.go | 88 +++++++++ 8 files changed, 671 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 52e44b86fd..12f2fbf976 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [FEATURE] Distributor: Support start timestamp and created timestamps ingestion on Prometheus Remote Write 2.0. #7371 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363 diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index 5d8cdbc72c..1a806e3d39 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -3,14 +3,19 @@ package integration import ( + "bytes" + "fmt" "math/rand" + "net/http" "path" "sync" "testing" "time" + "github.com/golang/snappy" remoteapi "github.com/prometheus/client_golang/exp/api/remote" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" @@ -21,6 +26,7 @@ import ( "github.com/cortexproject/cortex/integration/e2e" e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/storage/tsdb" ) @@ -393,6 +399,254 @@ func TestIngest(t *testing.T) { } } +func TestIngest_StartTimestamp(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + "-distributor.remote-writev2-enabled": "true", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path := path.Join(s.SharedDir(), "cortex-1") + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + sampleTs := time.Now().Truncate(time.Second) + startTs := sampleTs.Add(-2 * time.Second) + step := sampleTs.Sub(startTs) + + sampleSymbols := []string{"", "__name__", "test_start_timestamp_sample"} + sampleSeries := []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{{ + Value: 42, + Timestamp: e2e.TimeToMilliseconds(sampleTs), + StartTimestamp: e2e.TimeToMilliseconds(startTs), + }}, + }, + } + + writeStats, err := c.PushV2(sampleSymbols, sampleSeries) + require.NoError(t, err) + testPushHeader(t, writeStats, 1, 0, 0) + + sampleResult, err := c.QueryRange("test_start_timestamp_sample", startTs, sampleTs, step) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, sampleResult.Type()) + + sampleMatrix := sampleResult.(model.Matrix) + require.Len(t, sampleMatrix, 1) + require.Len(t, sampleMatrix[0].Values, 2) + require.Empty(t, sampleMatrix[0].Histograms) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(startTs)), sampleMatrix[0].Values[0].Timestamp) + assert.Equal(t, model.SampleValue(0), sampleMatrix[0].Values[0].Value) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(sampleTs)), sampleMatrix[0].Values[1].Timestamp) + assert.Equal(t, model.SampleValue(42), sampleMatrix[0].Values[1].Value) + + histogramCases := []struct { + metricName string + isFloat bool + isCustom bool + idx uint32 + }{ + {metricName: "test_start_timestamp_histogram", isFloat: false, isCustom: false, idx: rand.Uint32()}, + {metricName: "test_start_timestamp_histogram_float", isFloat: true, isCustom: false, idx: rand.Uint32()}, + {metricName: "test_start_timestamp_histogram_custom", isFloat: false, isCustom: true, idx: rand.Uint32()}, + {metricName: "test_start_timestamp_histogram_float_custom", isFloat: true, isCustom: true, idx: rand.Uint32()}, + } + + for _, tc := range histogramCases { + symbols, series := e2e.GenerateHistogramSeriesV2(tc.metricName, sampleTs, tc.idx, tc.isCustom, tc.isFloat) + series[0].Histograms[0].StartTimestamp = e2e.TimeToMilliseconds(startTs) + + writeStats, err = c.PushV2(symbols, series) + require.NoError(t, err) + testPushHeader(t, writeStats, 0, 1, 0) + + result, err := c.QueryRange(tc.metricName, startTs, sampleTs, step) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, result.Type()) + + matrix := result.(model.Matrix) + require.Len(t, matrix, 1) + require.Empty(t, matrix[0].Values) + require.Len(t, matrix[0].Histograms, 2) + require.NotNil(t, matrix[0].Histograms[0].Histogram) + require.NotNil(t, matrix[0].Histograms[1].Histogram) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(startTs)), matrix[0].Histograms[0].Timestamp) + assert.Equal(t, model.FloatString(0), matrix[0].Histograms[0].Histogram.Count) + assert.Equal(t, model.FloatString(0), matrix[0].Histograms[0].Histogram.Sum) + + var expectedCount, expectedSum model.FloatString + if tc.isFloat { + var expected *histogram.FloatHistogram + if tc.isCustom { + expected = tsdbutil.GenerateTestCustomBucketsFloatHistogram(int64(tc.idx)) + } else { + expected = tsdbutil.GenerateTestFloatHistogram(int64(tc.idx)) + } + expectedCount = model.FloatString(expected.Count) + expectedSum = model.FloatString(expected.Sum) + } else { + var expected *histogram.Histogram + if tc.isCustom { + expected = tsdbutil.GenerateTestCustomBucketsHistogram(int64(tc.idx)) + } else { + expected = tsdbutil.GenerateTestHistogram(int64(tc.idx)) + } + expectedCount = model.FloatString(expected.Count) + expectedSum = model.FloatString(expected.Sum) + } + + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(sampleTs)), matrix[0].Histograms[1].Timestamp) + assert.Equal(t, expectedCount, matrix[0].Histograms[1].Histogram.Count) + assert.Equal(t, expectedSum, matrix[0].Histograms[1].Histogram.Sum) + } +} + +func TestIngest_CreatedTimestampFallback(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + "-distributor.replication-factor": "1", + "-distributor.remote-writev2-enabled": "true", + "-store-gateway.sharding-enabled": "false", + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + path := path.Join(s.SharedDir(), "cortex-1") + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + sampleTs := time.Now().Truncate(time.Second) + startTs := sampleTs.Add(-2 * time.Second) + step := sampleTs.Sub(startTs) + + // Send a PRW2 request encoded with Cortex proto carrying only created_timestamp. + sampleReq := &cortexpb.WriteRequestV2{ + Symbols: []string{"", "__name__", "test_created_timestamp_sample"}, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2}, + CreatedTimestamp: e2e.TimeToMilliseconds(startTs), + Samples: []cortexpb.Sample{{Value: 7, TimestampMs: e2e.TimeToMilliseconds(sampleTs)}}, + }, + }, + }, + } + pushCortexV2Request(t, cortex.HTTPEndpoint(), "user-1", sampleReq) + + sampleResult, err := c.QueryRange("test_created_timestamp_sample", startTs, sampleTs, step) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, sampleResult.Type()) + + sampleMatrix := sampleResult.(model.Matrix) + require.Len(t, sampleMatrix, 1) + require.Len(t, sampleMatrix[0].Values, 2) + require.Empty(t, sampleMatrix[0].Histograms) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(startTs)), sampleMatrix[0].Values[0].Timestamp) + assert.Equal(t, model.SampleValue(0), sampleMatrix[0].Values[0].Value) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(sampleTs)), sampleMatrix[0].Values[1].Timestamp) + assert.Equal(t, model.SampleValue(7), sampleMatrix[0].Values[1].Value) + + h := cortexpb.HistogramToHistogramProto(e2e.TimeToMilliseconds(sampleTs), tsdbutil.GenerateTestHistogram(3)) + histReq := &cortexpb.WriteRequestV2{ + Symbols: []string{"", "__name__", "test_created_timestamp_histogram"}, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2}, + CreatedTimestamp: e2e.TimeToMilliseconds(startTs), + Histograms: []cortexpb.Histogram{h}, + }, + }, + }, + } + pushCortexV2Request(t, cortex.HTTPEndpoint(), "user-1", histReq) + + histResult, err := c.QueryRange("test_created_timestamp_histogram", startTs, sampleTs, step) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, histResult.Type()) + + histMatrix := histResult.(model.Matrix) + require.Len(t, histMatrix, 1) + require.Empty(t, histMatrix[0].Values) + require.Len(t, histMatrix[0].Histograms, 2) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(startTs)), histMatrix[0].Histograms[0].Timestamp) + assert.Equal(t, model.FloatString(0), histMatrix[0].Histograms[0].Histogram.Count) + assert.Equal(t, model.FloatString(0), histMatrix[0].Histograms[0].Histogram.Sum) + + expectedHist := tsdbutil.GenerateTestHistogram(3) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(sampleTs)), histMatrix[0].Histograms[1].Timestamp) + assert.Equal(t, model.FloatString(expectedHist.Count), histMatrix[0].Histograms[1].Histogram.Count) + assert.Equal(t, model.FloatString(expectedHist.Sum), histMatrix[0].Histograms[1].Histogram.Sum) +} + func TestExemplar(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) @@ -685,3 +939,26 @@ func testPushHeader(t *testing.T, stats remoteapi.WriteResponseStats, expectedSa require.Equal(t, expectedHistogram, stats.Histograms) require.Equal(t, expectedExemplars, stats.Exemplars) } + +func pushCortexV2Request(t *testing.T, distributorAddr, orgID string, req *cortexpb.WriteRequestV2) { + t.Helper() + + data, err := req.Marshal() + require.NoError(t, err) + + compressed := snappy.Encode(nil, data) + httpReq, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", distributorAddr), bytes.NewReader(compressed)) + require.NoError(t, err) + + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request") + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0") + httpReq.Header.Set("X-Scope-OrgID", orgID) + + httpClient := &http.Client{Timeout: 30 * time.Second} + res, err := httpClient.Do(httpReq) + require.NoError(t, err) + defer res.Body.Close() //nolint:errcheck + + require.Equal(t, http.StatusNoContent, res.StatusCode) +} diff --git a/pkg/cortexpb/cortex.pb.go b/pkg/cortexpb/cortex.pb.go index cb87faedba..6037803b26 100644 --- a/pkg/cortexpb/cortex.pb.go +++ b/pkg/cortexpb/cortex.pb.go @@ -806,8 +806,9 @@ func (m *LabelPair) GetValue() []byte { } type Sample struct { - Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` - TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs,proto3" json:"timestamp_ms,omitempty"` + Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` + TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs,proto3" json:"timestamp_ms,omitempty"` + StartTimestampMs int64 `protobuf:"varint,3,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"` } func (m *Sample) Reset() { *m = Sample{} } @@ -856,6 +857,13 @@ func (m *Sample) GetTimestampMs() int64 { return 0 } +func (m *Sample) GetStartTimestampMs() int64 { + if m != nil { + return m.StartTimestampMs + } + return 0 +} + type MetricMetadata struct { Type MetricMetadata_MetricType `protobuf:"varint,1,opt,name=type,proto3,enum=cortexpb.MetricMetadata_MetricType" json:"type,omitempty"` MetricFamilyName string `protobuf:"bytes,2,opt,name=metric_family_name,json=metricFamilyName,proto3" json:"metric_family_name,omitempty"` @@ -1084,7 +1092,8 @@ type Histogram struct { // // The last element is not only the upper inclusive bound of the last regular // bucket, but implicitly the lower exclusive bound of the +Inf bucket. - CustomValues []float64 `protobuf:"fixed64,16,rep,packed,name=custom_values,json=customValues,proto3" json:"custom_values,omitempty"` + CustomValues []float64 `protobuf:"fixed64,16,rep,packed,name=custom_values,json=customValues,proto3" json:"custom_values,omitempty"` + StartTimestampMs int64 `protobuf:"varint,17,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"` } func (m *Histogram) Reset() { *m = Histogram{} } @@ -1275,6 +1284,13 @@ func (m *Histogram) GetCustomValues() []float64 { return nil } +func (m *Histogram) GetStartTimestampMs() int64 { + if m != nil { + return m.StartTimestampMs + } + return 0 +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*Histogram) XXX_OneofWrappers() []interface{} { return []interface{}{ @@ -1367,102 +1383,104 @@ func init() { func init() { proto.RegisterFile("cortex.proto", fileDescriptor_893a47d0a749d749) } var fileDescriptor_893a47d0a749d749 = []byte{ - // 1517 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xcd, 0x6f, 0x13, 0xc7, - 0x1b, 0xf6, 0xfa, 0xdb, 0xaf, 0x3f, 0xd8, 0x0c, 0x06, 0x36, 0x01, 0xd6, 0xc1, 0xe8, 0xf7, 0x6b, - 0x44, 0x51, 0x8a, 0x82, 0x4a, 0x5b, 0x84, 0x2a, 0xd9, 0xc1, 0x21, 0x16, 0xd8, 0x8e, 0xc6, 0x4e, - 0x10, 0xbd, 0xac, 0x36, 0xf6, 0x38, 0x5e, 0xe1, 0xdd, 0x75, 0x77, 0xc6, 0x88, 0xf4, 0xd4, 0x53, - 0xd5, 0xde, 0x7a, 0xe9, 0xa5, 0xb7, 0xaa, 0x97, 0x5e, 0x7b, 0xee, 0x3f, 0xc0, 0x31, 0xb7, 0x22, - 0xa4, 0x46, 0x25, 0x5c, 0x68, 0x4f, 0x1c, 0xda, 0x7b, 0x35, 0xb3, 0x9f, 0x8e, 0x83, 0x68, 0x2b, - 0x0e, 0xbd, 0xcd, 0x3c, 0xef, 0x3b, 0x33, 0xcf, 0xcc, 0xfb, 0xbc, 0xcf, 0xda, 0x50, 0xe8, 0xdb, - 0x0e, 0x23, 0x8f, 0x57, 0x27, 0x8e, 0xcd, 0x6c, 0x94, 0x75, 0x67, 0x93, 0xdd, 0xa5, 0xf2, 0x9e, - 0xbd, 0x67, 0x0b, 0xf0, 0x3d, 0x3e, 0x72, 0xe3, 0xd5, 0x45, 0x58, 0x68, 0x11, 0x4a, 0xf5, 0x3d, - 0x72, 0xdf, 0x60, 0xa3, 0xfa, 0x74, 0x88, 0xc9, 0xf0, 0x66, 0xf2, 0xd5, 0x77, 0x95, 0x58, 0xf5, - 0xab, 0x04, 0x14, 0xee, 0x3b, 0x06, 0x23, 0x98, 0x7c, 0x3a, 0x25, 0x94, 0xa1, 0x2d, 0x00, 0x66, - 0x98, 0x84, 0x12, 0xc7, 0x20, 0x54, 0x91, 0x96, 0x13, 0x2b, 0xf9, 0xb5, 0xf2, 0xaa, 0x7f, 0xc0, - 0x6a, 0xcf, 0x30, 0x49, 0x57, 0xc4, 0xea, 0x4b, 0x4f, 0x0e, 0x2b, 0xb1, 0x67, 0x87, 0x15, 0xb4, - 0xe5, 0x10, 0x7d, 0x3c, 0xb6, 0xfb, 0xbd, 0x60, 0x1d, 0x8e, 0xec, 0x81, 0xae, 0x42, 0xba, 0x6b, - 0x4f, 0x9d, 0x3e, 0x51, 0xe2, 0xcb, 0xd2, 0x4a, 0x29, 0xba, 0x9b, 0x8b, 0x37, 0xac, 0xa9, 0x89, - 0xbd, 0x1c, 0x74, 0x13, 0xb2, 0x26, 0x61, 0xfa, 0x40, 0x67, 0xba, 0x92, 0x10, 0xa7, 0x2b, 0x61, - 0x7e, 0x8b, 0x30, 0xc7, 0xe8, 0xb7, 0xbc, 0x78, 0x3d, 0xf9, 0xe4, 0xb0, 0x22, 0xe1, 0x20, 0x1f, - 0xdd, 0x82, 0x25, 0xfa, 0xd0, 0x98, 0x68, 0x63, 0x7d, 0x97, 0x8c, 0x35, 0x4b, 0x37, 0x89, 0xf6, - 0x48, 0x1f, 0x1b, 0x03, 0x9d, 0x19, 0xb6, 0xa5, 0xbc, 0xcc, 0x2c, 0x4b, 0x2b, 0x59, 0x7c, 0x8e, - 0xa7, 0xdc, 0xe3, 0x19, 0x6d, 0xdd, 0x24, 0x3b, 0x41, 0x1c, 0xb5, 0x20, 0x81, 0xc9, 0x50, 0xf9, - 0x8d, 0xa7, 0xe5, 0xd7, 0xce, 0x47, 0x4f, 0x3d, 0xf6, 0x76, 0xf5, 0x8b, 0xfc, 0xea, 0x07, 0x87, - 0x15, 0xe9, 0xd9, 0x61, 0x65, 0xfe, 0x69, 0x31, 0xdf, 0x07, 0x5d, 0x83, 0xf2, 0xc0, 0xa0, 0x7d, - 0xdd, 0x19, 0x68, 0xf6, 0x94, 0x69, 0xf6, 0x50, 0xb3, 0x9d, 0x01, 0x71, 0x94, 0xdf, 0x5d, 0x1a, - 0x0b, 0x5e, 0xb0, 0x33, 0x65, 0x9d, 0x61, 0x87, 0x47, 0xaa, 0x3f, 0xc5, 0xa1, 0x14, 0xad, 0xc5, - 0xce, 0x1a, 0x52, 0x20, 0x43, 0xf7, 0xcd, 0x5d, 0x7b, 0x4c, 0x95, 0xe4, 0x72, 0x62, 0x25, 0x87, - 0xfd, 0x29, 0xea, 0xcd, 0xd4, 0x29, 0x25, 0x5e, 0xea, 0xec, 0x49, 0x75, 0xda, 0x59, 0xab, 0x5f, - 0xf0, 0x2a, 0x55, 0x9e, 0xaf, 0xd4, 0xce, 0xda, 0x6b, 0x6a, 0x95, 0xfe, 0x1b, 0xb5, 0xfa, 0x2f, - 0xbd, 0x37, 0x7f, 0xbd, 0x42, 0xf4, 0xd6, 0xa8, 0x02, 0x79, 0x41, 0x8c, 0x6a, 0x0e, 0x19, 0xba, - 0x52, 0x2e, 0x62, 0x70, 0x21, 0x4c, 0x86, 0x14, 0x5d, 0x83, 0x0c, 0xd5, 0xcd, 0xc9, 0x98, 0x50, - 0x25, 0x2e, 0xde, 0x4f, 0x8e, 0xdc, 0x56, 0x04, 0x84, 0xc2, 0x62, 0xd8, 0x4f, 0x43, 0x1f, 0x01, - 0x8c, 0x0c, 0xca, 0xec, 0x3d, 0x47, 0x37, 0xa9, 0x27, 0xcf, 0xd3, 0xe1, 0xa2, 0x4d, 0x3f, 0xe6, - 0xad, 0x8b, 0x24, 0xa3, 0x0f, 0x21, 0x47, 0x1e, 0x13, 0x73, 0x32, 0xd6, 0x1d, 0xb7, 0x96, 0x33, - 0x6d, 0xd5, 0xf0, 0x42, 0x3b, 0x6b, 0xde, 0xd2, 0x30, 0x19, 0xdd, 0x88, 0x74, 0x44, 0x4a, 0xbc, - 0x55, 0x79, 0xa6, 0x23, 0x44, 0x24, 0x58, 0x18, 0x76, 0xc3, 0xbb, 0xb0, 0xd0, 0x77, 0x88, 0xce, - 0xc8, 0x40, 0x13, 0x15, 0x66, 0xba, 0x39, 0x11, 0x65, 0x4d, 0x60, 0xd9, 0x0b, 0xf4, 0x7c, 0xbc, - 0xaa, 0x03, 0x84, 0x1c, 0xde, 0xfc, 0x74, 0x65, 0x48, 0x3d, 0xd2, 0xc7, 0x53, 0xb7, 0xa5, 0x25, - 0xec, 0x4e, 0xd0, 0x05, 0xc8, 0x85, 0x27, 0x25, 0xc4, 0x49, 0x21, 0x50, 0xfd, 0x39, 0x0e, 0x10, - 0xd2, 0x45, 0xd7, 0x21, 0xc9, 0xf6, 0x27, 0x44, 0x91, 0x84, 0xd0, 0x2a, 0x27, 0x5d, 0xc9, 0xeb, - 0xf7, 0xde, 0xfe, 0x84, 0x60, 0x91, 0x8c, 0x16, 0x21, 0x3b, 0x22, 0xe3, 0x09, 0xa7, 0x25, 0x0e, - 0x28, 0xe2, 0x0c, 0x9f, 0xf3, 0x7e, 0x5b, 0x84, 0xec, 0xd4, 0x32, 0x98, 0x08, 0x25, 0xdd, 0x10, - 0x9f, 0x73, 0x69, 0xfc, 0x22, 0x89, 0x93, 0xbd, 0xad, 0xd0, 0x79, 0x38, 0xd7, 0x6a, 0xf4, 0x70, - 0x73, 0x5d, 0xeb, 0x3d, 0xd8, 0x6a, 0x68, 0xdb, 0xed, 0xee, 0x56, 0x63, 0xbd, 0xb9, 0xd1, 0x6c, - 0xdc, 0x96, 0x63, 0xe8, 0x1c, 0x9c, 0x8e, 0x06, 0xd7, 0x3b, 0xdb, 0xed, 0x5e, 0x03, 0xcb, 0x12, - 0x3a, 0x03, 0x0b, 0xd1, 0xc0, 0x9d, 0xda, 0xf6, 0x9d, 0x86, 0x1c, 0x47, 0x8b, 0x70, 0x26, 0x0a, - 0x6f, 0x36, 0xbb, 0xbd, 0xce, 0x1d, 0x5c, 0x6b, 0xc9, 0x09, 0xa4, 0xc2, 0xd2, 0xdc, 0x8a, 0x30, - 0x9e, 0x3c, 0x7e, 0x54, 0x77, 0xbb, 0xd5, 0xaa, 0xe1, 0x07, 0x72, 0x0a, 0x95, 0x41, 0x8e, 0x06, - 0x9a, 0xed, 0x8d, 0x8e, 0x9c, 0x46, 0x0a, 0x94, 0x67, 0xd2, 0x7b, 0xb5, 0x5e, 0xa3, 0xdb, 0xe8, - 0xc9, 0x99, 0xea, 0x8f, 0x12, 0xa0, 0x2e, 0x73, 0x88, 0x6e, 0xce, 0x58, 0xf9, 0x12, 0x64, 0x7b, - 0xc4, 0xd2, 0x2d, 0xd6, 0xbc, 0x2d, 0x5e, 0x39, 0x87, 0x83, 0x39, 0xd7, 0xbe, 0x97, 0x26, 0x4a, - 0x38, 0xe3, 0x1d, 0xd1, 0x4d, 0xb0, 0x9f, 0xe6, 0xb7, 0xeb, 0xcb, 0xb7, 0xd4, 0xae, 0xdf, 0x48, - 0x50, 0xf4, 0x0e, 0xa2, 0x13, 0xdb, 0xa2, 0x04, 0x21, 0x48, 0xf6, 0xed, 0x81, 0x2b, 0x88, 0x14, - 0x16, 0x63, 0xee, 0x7f, 0xa6, 0xbb, 0x5e, 0xd0, 0xcc, 0x61, 0x7f, 0xca, 0x23, 0x5d, 0xaf, 0x79, - 0x5d, 0xa5, 0xf9, 0x53, 0xa4, 0x02, 0x6c, 0x86, 0x4d, 0x9a, 0x14, 0xc1, 0x08, 0xc2, 0x55, 0xda, - 0x08, 0x3a, 0x31, 0xe5, 0xaa, 0x34, 0x00, 0xaa, 0x7f, 0x48, 0x00, 0xa1, 0x8d, 0xa0, 0x1a, 0xa4, - 0x5d, 0xd9, 0x7b, 0x9f, 0xc2, 0x48, 0xb7, 0x0b, 0x4f, 0xdb, 0xd2, 0x0d, 0xa7, 0x5e, 0xf6, 0xfc, - 0xb5, 0x20, 0xa0, 0xda, 0x40, 0x9f, 0x30, 0xe2, 0x60, 0x6f, 0xe1, 0xbf, 0xb0, 0x99, 0x1b, 0x51, - 0xaf, 0x70, 0x5d, 0x06, 0xcd, 0x7b, 0xc5, 0xbc, 0x53, 0xcc, 0xda, 0x53, 0xf2, 0x1f, 0xd8, 0x53, - 0xf5, 0x7d, 0xc8, 0x05, 0xf7, 0xe1, 0x95, 0xe0, 0x66, 0x2e, 0x2a, 0x51, 0xc0, 0x62, 0x3c, 0xdb, - 0xf1, 0x05, 0xaf, 0xe3, 0xab, 0x35, 0x48, 0xbb, 0x57, 0x08, 0xe3, 0x52, 0xd4, 0x11, 0x2e, 0x41, - 0x21, 0x30, 0x00, 0xcd, 0xa4, 0x62, 0x71, 0x02, 0xe7, 0x03, 0xac, 0x45, 0xab, 0xdf, 0xc6, 0xa1, - 0x34, 0xfb, 0x5d, 0x47, 0x1f, 0xcc, 0x58, 0xc3, 0xe5, 0xd7, 0x7d, 0xff, 0xe7, 0xed, 0xe1, 0x2a, - 0x20, 0x53, 0x60, 0xda, 0x50, 0x37, 0x8d, 0xf1, 0xbe, 0xf8, 0x26, 0x79, 0xca, 0x91, 0xdd, 0xc8, - 0x86, 0x08, 0xf0, 0x4f, 0x11, 0xbf, 0x26, 0x37, 0x0f, 0x21, 0x91, 0x1c, 0x16, 0x63, 0x8e, 0x71, - 0xd7, 0x10, 0xba, 0xc8, 0x61, 0x31, 0xae, 0xee, 0xcf, 0xb8, 0x47, 0x1e, 0x32, 0xdb, 0xed, 0xbb, - 0xed, 0xce, 0xfd, 0xb6, 0x1c, 0xe3, 0x93, 0xd0, 0x21, 0x72, 0x90, 0xf2, 0x5d, 0xa1, 0x08, 0xb9, - 0xa8, 0x13, 0x20, 0x28, 0xcd, 0x75, 0x7f, 0x1e, 0x32, 0x61, 0xc7, 0x67, 0x21, 0xe9, 0x75, 0x79, - 0x01, 0xb2, 0x91, 0xce, 0xbe, 0x0b, 0x69, 0xf7, 0xe8, 0xb7, 0x20, 0xc4, 0xea, 0x17, 0x12, 0x64, - 0x7d, 0xf1, 0xbc, 0x0d, 0x61, 0x9f, 0xfc, 0x11, 0x38, 0x5e, 0xf2, 0xc4, 0x7c, 0xc9, 0xff, 0x4c, - 0x41, 0x2e, 0x10, 0x23, 0xba, 0x08, 0xb9, 0xbe, 0x3d, 0xb5, 0x98, 0x66, 0x58, 0x4c, 0x94, 0x3c, - 0xb9, 0x19, 0xc3, 0x59, 0x01, 0x35, 0x2d, 0x86, 0x2e, 0x41, 0xde, 0x0d, 0x0f, 0xc7, 0xb6, 0xee, - 0xba, 0x95, 0xb4, 0x19, 0xc3, 0x20, 0xc0, 0x0d, 0x8e, 0x21, 0x19, 0x12, 0x74, 0x6a, 0x8a, 0x93, - 0x24, 0xcc, 0x87, 0xe8, 0x2c, 0xa4, 0x69, 0x7f, 0x44, 0x4c, 0x5d, 0x14, 0x77, 0x01, 0x7b, 0x33, - 0xf4, 0x3f, 0x28, 0x7d, 0x46, 0x1c, 0x5b, 0x63, 0x23, 0x87, 0xd0, 0x91, 0x3d, 0x1e, 0x88, 0x42, - 0x4b, 0xb8, 0xc8, 0xd1, 0x9e, 0x0f, 0xa2, 0xff, 0x7b, 0x69, 0x21, 0xaf, 0xb4, 0xe0, 0x25, 0xe1, - 0x02, 0xc7, 0xd7, 0x7d, 0x6e, 0x57, 0x40, 0x8e, 0xe4, 0xb9, 0x04, 0x33, 0x82, 0xa0, 0x84, 0x4b, - 0x41, 0xa6, 0x4b, 0xb2, 0x06, 0x25, 0x8b, 0xec, 0xe9, 0xcc, 0x78, 0x44, 0x34, 0x3a, 0xd1, 0x2d, - 0xaa, 0x64, 0x8f, 0xff, 0x0a, 0xa8, 0x4f, 0xfb, 0x0f, 0x09, 0xeb, 0x4e, 0x74, 0xcb, 0xeb, 0xd0, - 0xa2, 0xbf, 0x82, 0x63, 0x14, 0xbd, 0x03, 0xa7, 0x82, 0x2d, 0x06, 0x64, 0xcc, 0x74, 0xaa, 0xe4, - 0x96, 0x13, 0x2b, 0x08, 0x07, 0x3b, 0xdf, 0x16, 0xe8, 0x4c, 0xa2, 0xe0, 0x46, 0x15, 0x58, 0x4e, - 0xac, 0x48, 0x61, 0xa2, 0x20, 0xc6, 0xed, 0xad, 0x34, 0xb1, 0xa9, 0x11, 0x21, 0x95, 0x7f, 0x33, - 0x29, 0x7f, 0x45, 0x40, 0x2a, 0xd8, 0xc2, 0x23, 0x55, 0x70, 0x49, 0xf9, 0x70, 0x48, 0x2a, 0x48, - 0xf4, 0x48, 0x15, 0x5d, 0x52, 0x3e, 0xec, 0x91, 0xba, 0x05, 0xe0, 0x10, 0x4a, 0x98, 0x36, 0xe2, - 0x2f, 0x5f, 0x12, 0x26, 0x70, 0xf1, 0x04, 0x1b, 0x5b, 0xc5, 0x3c, 0x6b, 0xd3, 0xb0, 0x18, 0xce, - 0x39, 0xfe, 0x70, 0x4e, 0x7f, 0xa7, 0xe6, 0xf4, 0x87, 0x2e, 0x43, 0xb1, 0x3f, 0xa5, 0xcc, 0x36, - 0x35, 0x21, 0x59, 0xaa, 0xc8, 0x82, 0x47, 0xc1, 0x05, 0x77, 0x04, 0x56, 0xbd, 0x09, 0xb9, 0x60, - 0xff, 0xd9, 0xa6, 0xcf, 0x40, 0xe2, 0x41, 0xa3, 0x2b, 0x4b, 0x28, 0x0d, 0xf1, 0x76, 0x47, 0x8e, - 0x87, 0x8d, 0x9f, 0x58, 0x4a, 0x7e, 0xf9, 0xbd, 0x2a, 0xd5, 0x33, 0x90, 0x12, 0x37, 0xac, 0x17, - 0x00, 0x42, 0x81, 0x54, 0x6f, 0x01, 0x84, 0xaf, 0xc9, 0x35, 0x6a, 0x0f, 0x87, 0x94, 0xb8, 0xa2, - 0x5f, 0xc0, 0xde, 0x8c, 0xe3, 0x63, 0x62, 0xed, 0xb1, 0x91, 0xd0, 0x7a, 0x11, 0x7b, 0xb3, 0x2b, - 0x15, 0x80, 0xf0, 0x37, 0x38, 0x27, 0x51, 0xdb, 0x6a, 0xca, 0x31, 0x6e, 0x1d, 0x78, 0xfb, 0x5e, - 0x43, 0x96, 0xea, 0x1f, 0x1f, 0x3c, 0x57, 0x63, 0x4f, 0x9f, 0xab, 0xb1, 0x57, 0xcf, 0x55, 0xe9, - 0xf3, 0x23, 0x55, 0xfa, 0xe1, 0x48, 0x95, 0x9e, 0x1c, 0xa9, 0xd2, 0xc1, 0x91, 0x2a, 0xfd, 0x7a, - 0xa4, 0x4a, 0x2f, 0x8f, 0xd4, 0xd8, 0xab, 0x23, 0x55, 0xfa, 0xfa, 0x85, 0x1a, 0x3b, 0x78, 0xa1, - 0xc6, 0x9e, 0xbe, 0x50, 0x63, 0x9f, 0x04, 0x7f, 0x1e, 0x77, 0xd3, 0xe2, 0xdf, 0xe2, 0xf5, 0xbf, - 0x02, 0x00, 0x00, 0xff, 0xff, 0xb1, 0x8c, 0x07, 0xad, 0x5d, 0x0e, 0x00, 0x00, + // 1540 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x3b, 0x6f, 0x1b, 0xc7, + 0x16, 0xe6, 0xf2, 0xcd, 0xc3, 0x87, 0x57, 0x63, 0xda, 0x5e, 0xc9, 0xf6, 0x52, 0xa6, 0x71, 0xef, + 0x15, 0x7c, 0x0d, 0x5d, 0x43, 0xc6, 0x75, 0x12, 0xc3, 0x08, 0x40, 0xca, 0x94, 0x45, 0xd8, 0x24, + 0x85, 0x21, 0x25, 0xc3, 0x69, 0x16, 0x2b, 0x72, 0x28, 0x2e, 0xcc, 0xdd, 0x65, 0x76, 0x86, 0x86, + 0x95, 0x2a, 0x55, 0x90, 0x74, 0x69, 0xd2, 0xa4, 0x0b, 0xdc, 0xa4, 0x4d, 0x9d, 0x3f, 0xe0, 0x52, + 0x5d, 0x0c, 0x03, 0x11, 0x62, 0xb9, 0x71, 0x52, 0xb9, 0xc8, 0x0f, 0x08, 0x66, 0xf6, 0x49, 0x51, + 0x82, 0x93, 0xc0, 0x45, 0xba, 0x39, 0xdf, 0x39, 0x33, 0xf3, 0xcd, 0x79, 0x7c, 0x4b, 0x42, 0xa1, + 0x6f, 0x3b, 0x8c, 0x3c, 0x5d, 0x9d, 0x38, 0x36, 0xb3, 0x51, 0xd6, 0xb5, 0x26, 0xbb, 0x4b, 0xe5, + 0x3d, 0x7b, 0xcf, 0x16, 0xe0, 0xff, 0xf8, 0xca, 0xf5, 0x57, 0x17, 0x61, 0xa1, 0x45, 0x28, 0xd5, + 0xf7, 0xc8, 0x43, 0x83, 0x8d, 0xea, 0xd3, 0x21, 0x26, 0xc3, 0xdb, 0xc9, 0xb7, 0xdf, 0x55, 0x62, + 0xd5, 0xaf, 0x12, 0x50, 0x78, 0xe8, 0x18, 0x8c, 0x60, 0xf2, 0xe9, 0x94, 0x50, 0x86, 0xb6, 0x00, + 0x98, 0x61, 0x12, 0x4a, 0x1c, 0x83, 0x50, 0x45, 0x5a, 0x4e, 0xac, 0xe4, 0xd7, 0xca, 0xab, 0xfe, + 0x05, 0xab, 0x3d, 0xc3, 0x24, 0x5d, 0xe1, 0xab, 0x2f, 0x3d, 0x3f, 0xac, 0xc4, 0x5e, 0x1e, 0x56, + 0xd0, 0x96, 0x43, 0xf4, 0xf1, 0xd8, 0xee, 0xf7, 0x82, 0x7d, 0x38, 0x72, 0x06, 0xba, 0x0e, 0xe9, + 0xae, 0x3d, 0x75, 0xfa, 0x44, 0x89, 0x2f, 0x4b, 0x2b, 0xa5, 0xe8, 0x69, 0x2e, 0xde, 0xb0, 0xa6, + 0x26, 0xf6, 0x62, 0xd0, 0x6d, 0xc8, 0x9a, 0x84, 0xe9, 0x03, 0x9d, 0xe9, 0x4a, 0x42, 0xdc, 0xae, + 0x84, 0xf1, 0x2d, 0xc2, 0x1c, 0xa3, 0xdf, 0xf2, 0xfc, 0xf5, 0xe4, 0xf3, 0xc3, 0x8a, 0x84, 0x83, + 0x78, 0x74, 0x07, 0x96, 0xe8, 0x63, 0x63, 0xa2, 0x8d, 0xf5, 0x5d, 0x32, 0xd6, 0x2c, 0xdd, 0x24, + 0xda, 0x13, 0x7d, 0x6c, 0x0c, 0x74, 0x66, 0xd8, 0x96, 0xf2, 0x26, 0xb3, 0x2c, 0xad, 0x64, 0xf1, + 0x05, 0x1e, 0xf2, 0x80, 0x47, 0xb4, 0x75, 0x93, 0xec, 0x04, 0x7e, 0xd4, 0x82, 0x04, 0x26, 0x43, + 0xe5, 0x57, 0x1e, 0x96, 0x5f, 0xbb, 0x18, 0xbd, 0xf5, 0x58, 0xee, 0xea, 0x97, 0xf9, 0xd3, 0x0f, + 0x0e, 0x2b, 0xd2, 0xcb, 0xc3, 0xca, 0x7c, 0x6a, 0x31, 0x3f, 0x07, 0xdd, 0x80, 0xf2, 0xc0, 0xa0, + 0x7d, 0xdd, 0x19, 0x68, 0xf6, 0x94, 0x69, 0xf6, 0x50, 0xb3, 0x9d, 0x01, 0x71, 0x94, 0xdf, 0x5c, + 0x1a, 0x0b, 0x9e, 0xb3, 0x33, 0x65, 0x9d, 0x61, 0x87, 0x7b, 0xaa, 0x3f, 0xc6, 0xa1, 0x14, 0xad, + 0xc5, 0xce, 0x1a, 0x52, 0x20, 0x43, 0xf7, 0xcd, 0x5d, 0x7b, 0x4c, 0x95, 0xe4, 0x72, 0x62, 0x25, + 0x87, 0x7d, 0x13, 0xf5, 0x66, 0xea, 0x94, 0x12, 0x99, 0x3a, 0x7f, 0x52, 0x9d, 0x76, 0xd6, 0xea, + 0x97, 0xbc, 0x4a, 0x95, 0xe7, 0x2b, 0xb5, 0xb3, 0x76, 0x4a, 0xad, 0xd2, 0x7f, 0xa2, 0x56, 0xff, + 0xa4, 0x7c, 0xf3, 0xec, 0x15, 0xa2, 0xaf, 0x46, 0x15, 0xc8, 0x0b, 0x62, 0x54, 0x73, 0xc8, 0xd0, + 0x6d, 0xe5, 0x22, 0x06, 0x17, 0xc2, 0x64, 0x48, 0xd1, 0x0d, 0xc8, 0x50, 0xdd, 0x9c, 0x8c, 0x09, + 0x55, 0xe2, 0x22, 0x7f, 0x72, 0xe4, 0xb5, 0xc2, 0x21, 0x3a, 0x2c, 0x86, 0xfd, 0x30, 0xf4, 0x11, + 0xc0, 0xc8, 0xa0, 0xcc, 0xde, 0x73, 0x74, 0x93, 0x7a, 0xed, 0x79, 0x36, 0xdc, 0xb4, 0xe9, 0xfb, + 0xbc, 0x7d, 0x91, 0x60, 0xf4, 0x21, 0xe4, 0xc8, 0x53, 0x62, 0x4e, 0xc6, 0xba, 0xe3, 0xd6, 0x72, + 0x66, 0xac, 0x1a, 0x9e, 0x6b, 0x67, 0xcd, 0xdb, 0x1a, 0x06, 0xa3, 0x5b, 0x91, 0x89, 0x48, 0x89, + 0x5c, 0x95, 0x67, 0x26, 0x42, 0x78, 0x82, 0x8d, 0xe1, 0x34, 0xfc, 0x17, 0x16, 0xfa, 0x0e, 0xd1, + 0x19, 0x19, 0x68, 0xa2, 0xc2, 0x4c, 0x37, 0x27, 0xa2, 0xac, 0x09, 0x2c, 0x7b, 0x8e, 0x9e, 0x8f, + 0x57, 0x75, 0x80, 0x90, 0xc3, 0xbb, 0x53, 0x57, 0x86, 0xd4, 0x13, 0x7d, 0x3c, 0x75, 0x47, 0x5a, + 0xc2, 0xae, 0x81, 0x2e, 0x41, 0x2e, 0xbc, 0x29, 0x21, 0x6e, 0x0a, 0x81, 0xea, 0x4f, 0x71, 0x80, + 0x90, 0x2e, 0xba, 0x09, 0x49, 0xb6, 0x3f, 0x21, 0x8a, 0x24, 0x1a, 0xad, 0x72, 0xd2, 0x93, 0xbc, + 0x79, 0xef, 0xed, 0x4f, 0x08, 0x16, 0xc1, 0x68, 0x11, 0xb2, 0x23, 0x32, 0x9e, 0x70, 0x5a, 0xe2, + 0x82, 0x22, 0xce, 0x70, 0x9b, 0xcf, 0xdb, 0x22, 0x64, 0xa7, 0x96, 0xc1, 0x84, 0x2b, 0xe9, 0xba, + 0xb8, 0xcd, 0x5b, 0xe3, 0x67, 0x49, 0xdc, 0xec, 0x1d, 0x85, 0x2e, 0xc2, 0x85, 0x56, 0xa3, 0x87, + 0x9b, 0xeb, 0x5a, 0xef, 0xd1, 0x56, 0x43, 0xdb, 0x6e, 0x77, 0xb7, 0x1a, 0xeb, 0xcd, 0x8d, 0x66, + 0xe3, 0xae, 0x1c, 0x43, 0x17, 0xe0, 0x6c, 0xd4, 0xb9, 0xde, 0xd9, 0x6e, 0xf7, 0x1a, 0x58, 0x96, + 0xd0, 0x39, 0x58, 0x88, 0x3a, 0xee, 0xd5, 0xb6, 0xef, 0x35, 0xe4, 0x38, 0x5a, 0x84, 0x73, 0x51, + 0x78, 0xb3, 0xd9, 0xed, 0x75, 0xee, 0xe1, 0x5a, 0x4b, 0x4e, 0x20, 0x15, 0x96, 0xe6, 0x76, 0x84, + 0xfe, 0xe4, 0xf1, 0xab, 0xba, 0xdb, 0xad, 0x56, 0x0d, 0x3f, 0x92, 0x53, 0xa8, 0x0c, 0x72, 0xd4, + 0xd1, 0x6c, 0x6f, 0x74, 0xe4, 0x34, 0x52, 0xa0, 0x3c, 0x13, 0xde, 0xab, 0xf5, 0x1a, 0xdd, 0x46, + 0x4f, 0xce, 0x54, 0x7f, 0x90, 0x00, 0x75, 0x99, 0x43, 0x74, 0x73, 0x46, 0xca, 0x97, 0x20, 0xdb, + 0x23, 0x96, 0x6e, 0xb1, 0xe6, 0x5d, 0x91, 0xe5, 0x1c, 0x0e, 0x6c, 0xde, 0xfb, 0x5e, 0x98, 0x28, + 0xe1, 0x8c, 0x76, 0x44, 0x0f, 0xc1, 0x7e, 0x98, 0x3f, 0xae, 0x6f, 0xde, 0xd3, 0xb8, 0x7e, 0x23, + 0x41, 0xd1, 0xbb, 0x88, 0x4e, 0x6c, 0x8b, 0x12, 0x84, 0x20, 0xd9, 0xb7, 0x07, 0x6e, 0x43, 0xa4, + 0xb0, 0x58, 0x73, 0xfd, 0x33, 0xdd, 0xfd, 0x82, 0x66, 0x0e, 0xfb, 0x26, 0xf7, 0x74, 0xbd, 0xe1, + 0x75, 0x3b, 0xcd, 0x37, 0x91, 0x0a, 0xb0, 0x19, 0x0e, 0x69, 0x52, 0x38, 0x23, 0x08, 0xef, 0xd2, + 0x46, 0x30, 0x89, 0x29, 0xb7, 0x4b, 0x03, 0xa0, 0xfa, 0xbb, 0x04, 0x10, 0xca, 0x08, 0xaa, 0x41, + 0xda, 0x6d, 0x7b, 0xef, 0x53, 0x18, 0x99, 0x76, 0xa1, 0x69, 0x5b, 0xba, 0xe1, 0xd4, 0xcb, 0x9e, + 0xbe, 0x16, 0x04, 0x54, 0x1b, 0xe8, 0x13, 0x46, 0x1c, 0xec, 0x6d, 0xfc, 0x1b, 0x32, 0x73, 0x2b, + 0xaa, 0x15, 0xae, 0xca, 0xa0, 0x79, 0xad, 0x98, 0x57, 0x8a, 0x59, 0x79, 0x4a, 0xfe, 0x05, 0x79, + 0xaa, 0xfe, 0x1f, 0x72, 0xc1, 0x7b, 0x78, 0x25, 0xb8, 0x98, 0x8b, 0x4a, 0x14, 0xb0, 0x58, 0xcf, + 0x4e, 0x7c, 0xc1, 0x9b, 0xf8, 0xaa, 0x0d, 0x69, 0xf7, 0x09, 0xa1, 0x5f, 0x8a, 0x2a, 0xc2, 0x15, + 0x28, 0x04, 0x02, 0xa0, 0x99, 0x54, 0x6c, 0x4e, 0xe0, 0x7c, 0x80, 0xb5, 0xf8, 0x27, 0x07, 0x51, + 0xa6, 0x3b, 0x4c, 0x9b, 0x09, 0x74, 0x6b, 0x2a, 0x0b, 0x4f, 0x2f, 0x8c, 0xae, 0x7e, 0x1b, 0x87, + 0xd2, 0xec, 0xaf, 0x00, 0xf4, 0xc1, 0x8c, 0x90, 0x5c, 0x3d, 0xed, 0xd7, 0xc2, 0xbc, 0x98, 0x5c, + 0x07, 0x64, 0x0a, 0x4c, 0x1b, 0xea, 0xa6, 0x31, 0xde, 0x17, 0x5f, 0x30, 0xaf, 0xcf, 0x64, 0xd7, + 0xb3, 0x21, 0x1c, 0xfc, 0xc3, 0xc5, 0x93, 0xc2, 0xa5, 0x46, 0x34, 0x54, 0x0e, 0x8b, 0x35, 0xc7, + 0xb8, 0xc6, 0x88, 0x2e, 0xca, 0x61, 0xb1, 0xae, 0xee, 0xcf, 0x68, 0x4d, 0x1e, 0x32, 0xdb, 0xed, + 0xfb, 0xed, 0xce, 0xc3, 0xb6, 0x1c, 0xe3, 0x46, 0xa8, 0x27, 0x39, 0x48, 0xf9, 0x1a, 0x52, 0x84, + 0x5c, 0x54, 0x37, 0x10, 0x94, 0xe6, 0xb4, 0x22, 0x0f, 0x99, 0x50, 0x1f, 0xb2, 0x90, 0xf4, 0x34, + 0xa1, 0x00, 0xd9, 0x88, 0x0e, 0xdc, 0x87, 0xb4, 0x7b, 0xf5, 0x7b, 0x68, 0xdb, 0xea, 0x17, 0x12, + 0x64, 0xfd, 0x56, 0x7b, 0x1f, 0x63, 0x70, 0xf2, 0x27, 0xe3, 0x78, 0x83, 0x24, 0xe6, 0x1a, 0xa4, + 0xfa, 0x2c, 0x0d, 0xb9, 0xa0, 0x75, 0xd1, 0x65, 0xc8, 0xf5, 0xed, 0xa9, 0xc5, 0x34, 0xc3, 0x62, + 0xa2, 0xe4, 0xc9, 0xcd, 0x18, 0xce, 0x0a, 0xa8, 0x69, 0x31, 0x74, 0x05, 0xf2, 0xae, 0x7b, 0x38, + 0xb6, 0x75, 0x57, 0xdb, 0xa4, 0xcd, 0x18, 0x06, 0x01, 0x6e, 0x70, 0x0c, 0xc9, 0x90, 0xa0, 0x53, + 0x53, 0xdc, 0x24, 0x61, 0xbe, 0x44, 0xe7, 0x21, 0x4d, 0xfb, 0x23, 0x62, 0xea, 0xa2, 0xb8, 0x0b, + 0xd8, 0xb3, 0xd0, 0xbf, 0xa0, 0xf4, 0x19, 0x71, 0x6c, 0x8d, 0x8d, 0x1c, 0x42, 0x47, 0xf6, 0x78, + 0x20, 0x0a, 0x2d, 0xe1, 0x22, 0x47, 0x7b, 0x3e, 0x88, 0xfe, 0xed, 0x85, 0x85, 0xbc, 0xd2, 0x82, + 0x97, 0x84, 0x0b, 0x1c, 0x5f, 0xf7, 0xb9, 0x5d, 0x03, 0x39, 0x12, 0xe7, 0x12, 0xcc, 0x08, 0x82, + 0x12, 0x2e, 0x05, 0x91, 0x2e, 0xc9, 0x1a, 0x94, 0x2c, 0xb2, 0xa7, 0x33, 0xe3, 0x09, 0xd1, 0xe8, + 0x44, 0xb7, 0xa8, 0x92, 0x3d, 0xfe, 0x9b, 0xa1, 0x3e, 0xed, 0x3f, 0x26, 0xac, 0x3b, 0xd1, 0x2d, + 0x6f, 0x9e, 0x8b, 0xfe, 0x0e, 0x8e, 0x51, 0xf4, 0x1f, 0x38, 0x13, 0x1c, 0x31, 0x20, 0x63, 0xa6, + 0x53, 0x25, 0xb7, 0x9c, 0x58, 0x41, 0x38, 0x38, 0xf9, 0xae, 0x40, 0x67, 0x02, 0x05, 0x37, 0xaa, + 0xc0, 0x72, 0x62, 0x45, 0x0a, 0x03, 0x05, 0x31, 0x2e, 0x86, 0xa5, 0x89, 0x4d, 0x8d, 0x08, 0xa9, + 0xfc, 0xbb, 0x49, 0xf9, 0x3b, 0x02, 0x52, 0xc1, 0x11, 0x1e, 0xa9, 0x82, 0x4b, 0xca, 0x87, 0x43, + 0x52, 0x41, 0xa0, 0x47, 0xaa, 0xe8, 0x92, 0xf2, 0x61, 0x8f, 0xd4, 0x1d, 0x00, 0x87, 0x50, 0xc2, + 0xb4, 0x11, 0xcf, 0x7c, 0x49, 0x88, 0xc0, 0xe5, 0x13, 0x44, 0x6f, 0x15, 0xf3, 0xa8, 0x4d, 0xc3, + 0x62, 0x38, 0xe7, 0xf8, 0xcb, 0xb9, 0xfe, 0x3b, 0x33, 0x2f, 0x50, 0x57, 0xa1, 0xd8, 0x9f, 0x52, + 0x66, 0x9b, 0x9a, 0x68, 0x59, 0xaa, 0xc8, 0x82, 0x47, 0xc1, 0x05, 0x77, 0x04, 0x76, 0x8a, 0x8a, + 0x2d, 0x9c, 0xa2, 0x62, 0xb7, 0x21, 0x17, 0xb0, 0x99, 0x95, 0x88, 0x0c, 0x24, 0x1e, 0x35, 0xba, + 0xb2, 0x84, 0xd2, 0x10, 0x6f, 0x77, 0xe4, 0x78, 0x28, 0x13, 0x89, 0xa5, 0xe4, 0x97, 0xcf, 0x54, + 0xa9, 0x9e, 0x81, 0x94, 0xc8, 0x47, 0xbd, 0x00, 0x10, 0xb6, 0x53, 0xf5, 0x0e, 0x40, 0x98, 0x7b, + 0xde, 0xd1, 0xf6, 0x70, 0x48, 0x89, 0x3b, 0x22, 0x0b, 0xd8, 0xb3, 0x38, 0x3e, 0x26, 0xd6, 0x1e, + 0x1b, 0x89, 0xc9, 0x28, 0x62, 0xcf, 0xba, 0x56, 0x01, 0x08, 0x7f, 0xdf, 0x73, 0x12, 0xb5, 0xad, + 0xa6, 0x1c, 0xe3, 0x42, 0x83, 0xb7, 0x1f, 0x34, 0x64, 0xa9, 0xfe, 0xf1, 0xc1, 0x2b, 0x35, 0xf6, + 0xe2, 0x95, 0x1a, 0x7b, 0xfb, 0x4a, 0x95, 0x3e, 0x3f, 0x52, 0xa5, 0xef, 0x8f, 0x54, 0xe9, 0xf9, + 0x91, 0x2a, 0x1d, 0x1c, 0xa9, 0xd2, 0x2f, 0x47, 0xaa, 0xf4, 0xe6, 0x48, 0x8d, 0xbd, 0x3d, 0x52, + 0xa5, 0xaf, 0x5f, 0xab, 0xb1, 0x83, 0xd7, 0x6a, 0xec, 0xc5, 0x6b, 0x35, 0xf6, 0x49, 0xf0, 0xc7, + 0x74, 0x37, 0x2d, 0xfe, 0x89, 0xde, 0xfc, 0x23, 0x00, 0x00, 0xff, 0xff, 0x75, 0x0f, 0x93, 0x48, + 0xb9, 0x0e, 0x00, 0x00, } func (x SourceEnum) String() string { @@ -1904,6 +1922,9 @@ func (this *Sample) Equal(that interface{}) bool { if this.TimestampMs != that1.TimestampMs { return false } + if this.StartTimestampMs != that1.StartTimestampMs { + return false + } return true } func (this *MetricMetadata) Equal(that interface{}) bool { @@ -2111,6 +2132,9 @@ func (this *Histogram) Equal(that interface{}) bool { return false } } + if this.StartTimestampMs != that1.StartTimestampMs { + return false + } return true } func (this *Histogram_CountInt) Equal(that interface{}) bool { @@ -2407,10 +2431,11 @@ func (this *Sample) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&cortexpb.Sample{") s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") s = append(s, "TimestampMs: "+fmt.Sprintf("%#v", this.TimestampMs)+",\n") + s = append(s, "StartTimestampMs: "+fmt.Sprintf("%#v", this.StartTimestampMs)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -2453,7 +2478,7 @@ func (this *Histogram) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 20) + s := make([]string, 0, 21) s = append(s, "&cortexpb.Histogram{") if this.Count != nil { s = append(s, "Count: "+fmt.Sprintf("%#v", this.Count)+",\n") @@ -2485,6 +2510,7 @@ func (this *Histogram) GoString() string { s = append(s, "ResetHint: "+fmt.Sprintf("%#v", this.ResetHint)+",\n") s = append(s, "TimestampMs: "+fmt.Sprintf("%#v", this.TimestampMs)+",\n") s = append(s, "CustomValues: "+fmt.Sprintf("%#v", this.CustomValues)+",\n") + s = append(s, "StartTimestampMs: "+fmt.Sprintf("%#v", this.StartTimestampMs)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -3157,6 +3183,11 @@ func (m *Sample) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.StartTimestampMs != 0 { + i = encodeVarintCortex(dAtA, i, uint64(m.StartTimestampMs)) + i-- + dAtA[i] = 0x18 + } if m.TimestampMs != 0 { i = encodeVarintCortex(dAtA, i, uint64(m.TimestampMs)) i-- @@ -3325,6 +3356,13 @@ func (m *Histogram) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.StartTimestampMs != 0 { + i = encodeVarintCortex(dAtA, i, uint64(m.StartTimestampMs)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x88 + } if len(m.CustomValues) > 0 { for iNdEx := len(m.CustomValues) - 1; iNdEx >= 0; iNdEx-- { f10 := math.Float64bits(float64(m.CustomValues[iNdEx])) @@ -3816,6 +3854,9 @@ func (m *Sample) Size() (n int) { if m.TimestampMs != 0 { n += 1 + sovCortex(uint64(m.TimestampMs)) } + if m.StartTimestampMs != 0 { + n += 1 + sovCortex(uint64(m.StartTimestampMs)) + } return n } @@ -3941,6 +3982,9 @@ func (m *Histogram) Size() (n int) { if len(m.CustomValues) > 0 { n += 2 + sovCortex(uint64(len(m.CustomValues)*8)) + len(m.CustomValues)*8 } + if m.StartTimestampMs != 0 { + n += 2 + sovCortex(uint64(m.StartTimestampMs)) + } return n } @@ -4170,6 +4214,7 @@ func (this *Sample) String() string { s := strings.Join([]string{`&Sample{`, `Value:` + fmt.Sprintf("%v", this.Value) + `,`, `TimestampMs:` + fmt.Sprintf("%v", this.TimestampMs) + `,`, + `StartTimestampMs:` + fmt.Sprintf("%v", this.StartTimestampMs) + `,`, `}`, }, "") return s @@ -4238,6 +4283,7 @@ func (this *Histogram) String() string { `ResetHint:` + fmt.Sprintf("%v", this.ResetHint) + `,`, `TimestampMs:` + fmt.Sprintf("%v", this.TimestampMs) + `,`, `CustomValues:` + fmt.Sprintf("%v", this.CustomValues) + `,`, + `StartTimestampMs:` + fmt.Sprintf("%v", this.StartTimestampMs) + `,`, `}`, }, "") return s @@ -5994,6 +6040,25 @@ func (m *Sample) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartTimestampMs", wireType) + } + m.StartTimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartTimestampMs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipCortex(dAtA[iNdEx:]) @@ -6948,6 +7013,25 @@ func (m *Histogram) Unmarshal(dAtA []byte) error { } else { return fmt.Errorf("proto: wrong wireType = %d for field CustomValues", wireType) } + case 17: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartTimestampMs", wireType) + } + m.StartTimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCortex + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartTimestampMs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipCortex(dAtA[iNdEx:]) diff --git a/pkg/cortexpb/cortex.proto b/pkg/cortexpb/cortex.proto index 6288511695..a0eecd9fb3 100644 --- a/pkg/cortexpb/cortex.proto +++ b/pkg/cortexpb/cortex.proto @@ -151,6 +151,7 @@ message LabelPair { message Sample { double value = 1; int64 timestamp_ms = 2; + int64 start_timestamp_ms = 3; } message MetricMetadata { @@ -265,6 +266,8 @@ message Histogram { // The last element is not only the upper inclusive bound of the last regular // bucket, but implicitly the lower exclusive bound of the +Inf bucket. repeated double custom_values = 16; + + int64 start_timestamp_ms = 17; } // A BucketSpan defines a number of consecutive buckets with their diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 59f2abd09e..b843026bed 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1460,6 +1460,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte for _, s := range ts.Samples { var err error + if s.StartTimestampMs != 0 && s.TimestampMs != 0 { + // TODO(SungJin1212): Change to AppendSTZeroSample after update the Prometheus v3.9.0+ + if _, err = app.AppendCTZeroSample(ref, copiedLabels, s.TimestampMs, s.StartTimestampMs); err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { + level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to append start timestamp for sample", "user", userID, "series", copiedLabels.String(), "timestamp", s.TimestampMs, "start_timestamp", s.StartTimestampMs, "err", err) + } + } + // If the cached reference exists, we try to use it. if ref != 0 { if _, err = app.Append(ref, copiedLabels, s.TimestampMs, s.Value); err == nil { @@ -1506,6 +1513,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte h = cortexpb.HistogramProtoToHistogram(hp) } + if hp.StartTimestampMs != 0 && hp.TimestampMs != 0 { + // TODO(SungJin1212): Change to AppendHistogramSTZeroSample after update the Prometheus v3.9.0+ + if _, err = app.AppendHistogramCTZeroSample(ref, copiedLabels, hp.TimestampMs, hp.StartTimestampMs, h, fh); err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { + level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to append start timestamp for histogram", "user", userID, "series", copiedLabels.String(), "timestamp", hp.TimestampMs, "start_timestamp", hp.StartTimestampMs, "err", err) + } + } + if ref != 0 { if _, err = app.AppendHistogram(ref, copiedLabels, hp.TimestampMs, h, fh); err == nil { succeededHistogramsCount++ diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 8f09aab987..0fbcfc44fa 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2127,6 +2127,95 @@ func TestIngester_Push(t *testing.T) { } } +func TestIngester_Push_StartTimestamp(t *testing.T) { + tests := []struct { + name string + metricName string + req *cortexpb.WriteRequest + assertFn func(t *testing.T, ts cortexpb.TimeSeries) + }{ + { + name: "sample start timestamp appends zero sample", + metricName: "test_start_timestamp_sample", + req: cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, "test_start_timestamp_sample")}, + []cortexpb.Sample{{Value: 42, TimestampMs: 200, StartTimestampMs: 100}}, + nil, + nil, + cortexpb.API, + ), + assertFn: func(t *testing.T, ts cortexpb.TimeSeries) { + require.Len(t, ts.Samples, 2) + assert.Equal(t, int64(100), ts.Samples[0].TimestampMs) + assert.Equal(t, float64(0), ts.Samples[0].Value) + assert.Equal(t, int64(200), ts.Samples[1].TimestampMs) + assert.Equal(t, float64(42), ts.Samples[1].Value) + }, + }, + { + name: "histogram start timestamp appends zero histogram", + metricName: "test_start_timestamp_histogram", + req: func() *cortexpb.WriteRequest { + h := cortexpb.HistogramToHistogramProto(200, tsdbutil.GenerateTestHistogram(1)) + h.StartTimestampMs = 100 + return cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, "test_start_timestamp_histogram")}, + nil, + nil, + []cortexpb.Histogram{h}, + cortexpb.API, + ) + }(), + assertFn: func(t *testing.T, ts cortexpb.TimeSeries) { + require.Len(t, ts.Histograms, 2) + assert.Equal(t, int64(100), ts.Histograms[0].TimestampMs) + assert.Equal(t, int64(200), ts.Histograms[1].TimestampMs) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + limits := defaultLimitsTestConfig() + limits.EnableNativeHistograms = true + + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = ing.Push(ctx, tc.req) + require.NoError(t, err) + + s := &mockQueryStreamServer{ctx: ctx} + err = ing.QueryStream(&client.QueryRequest{ + StartTimestampMs: math.MinInt64, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: tc.metricName}}, + }, s) + require.NoError(t, err) + + set, err := seriesSetFromResponseStream(s) + require.NoError(t, err) + + resp, err := client.SeriesSetToQueryResponse(set) + require.NoError(t, err) + require.Len(t, resp.Timeseries, 1) + + ts := resp.Timeseries[0] + tc.assertFn(t, ts) + }) + } +} + // Referred from https://github.com/prometheus/prometheus/blob/v3.9.1/model/histogram/histogram_test.go#L1384. func TestIngester_PushNativeHistogramErrors(t *testing.T) { metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}} diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index bbd5a3b793..3a7a904d65 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -253,9 +253,21 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni ts := cortexpb.TimeseriesFromPool() ts.Labels = cortexpb.FromLabelsToLabelAdapters(lbs) - ts.Samples = append(ts.Samples, v2Ts.Samples...) + for _, sample := range v2Ts.Samples { + // Backward compatibility: use created_timestamp as a fallback (before the Prometheus v3.8.0 model) + if sample.StartTimestampMs == 0 { + sample.StartTimestampMs = v2Ts.CreatedTimestamp + } + ts.Samples = append(ts.Samples, sample) + } ts.Exemplars = exemplars - ts.Histograms = append(ts.Histograms, v2Ts.Histograms...) + for _, histogram := range v2Ts.Histograms { + // Backward compatibility: use series-level created_timestamp as a fallback (before the Prometheus v3.8.0 model) + if histogram.StartTimestampMs == 0 { + histogram.StartTimestampMs = v2Ts.CreatedTimestamp + } + ts.Histograms = append(ts.Histograms, histogram) + } v1Timeseries = append(v1Timeseries, cortexpb.PreallocTimeseries{ TimeSeries: ts, diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index c842fd4d48..198c16133b 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -1224,3 +1224,91 @@ func Test_convertV2RequestToV1_DeepCopy(t *testing.T) { require.True(t, len(v1Ts.Histograms) > 0 && len(v2Ts.Histograms) > 0) require.NotSame(t, &v1Ts.Histograms[0], &v2Ts.Histograms[0], "Histograms array must not share the same memory address") } + +func Test_convertV2RequestToV1_PreservesStartTimestamp(t *testing.T) { + v2Req := &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: []string{"", "__name__", "test_metric"}, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2}, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000, StartTimestampMs: 100}, + }, + Histograms: []cortexpb.Histogram{ + {TimestampMs: 2000, StartTimestampMs: 200}, + }, + }, + }, + }, + }, + } + + v1Req, err := convertV2RequestToV1(v2Req, false) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries, 1) + require.Len(t, v1Req.Timeseries[0].Samples, 1) + require.Len(t, v1Req.Timeseries[0].Histograms, 1) + + assert.Equal(t, int64(100), v1Req.Timeseries[0].Samples[0].StartTimestampMs) + assert.Equal(t, int64(200), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) +} + +func Test_convertV2RequestToV1_UsesCreatedTimestampAsFallback(t *testing.T) { + v2Req := &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: []string{"", "__name__", "test_metric"}, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2}, + CreatedTimestamp: 777, + Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 1000}}, + Histograms: []cortexpb.Histogram{{TimestampMs: 2000}}, + }, + }, + }, + }, + } + + v1Req, err := convertV2RequestToV1(v2Req, false) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries, 1) + require.Len(t, v1Req.Timeseries[0].Samples, 1) + require.Len(t, v1Req.Timeseries[0].Histograms, 1) + + assert.Equal(t, int64(777), v1Req.Timeseries[0].Samples[0].StartTimestampMs) + assert.Equal(t, int64(777), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) +} + +func Test_convertV2RequestToV1_ExplicitStartTimestampTakesPrecedence(t *testing.T) { + v2Req := &cortexpb.PreallocWriteRequestV2{ + WriteRequestV2: cortexpb.WriteRequestV2{ + Symbols: []string{"", "__name__", "test_metric"}, + Timeseries: []cortexpb.PreallocTimeseriesV2{ + { + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2}, + CreatedTimestamp: 777, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000, StartTimestampMs: 100}, + }, + Histograms: []cortexpb.Histogram{ + {TimestampMs: 2000, StartTimestampMs: 200}, + }, + }, + }, + }, + }, + } + + v1Req, err := convertV2RequestToV1(v2Req, false) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries, 1) + require.Len(t, v1Req.Timeseries[0].Samples, 1) + require.Len(t, v1Req.Timeseries[0].Histograms, 1) + + assert.Equal(t, int64(100), v1Req.Timeseries[0].Samples[0].StartTimestampMs) + assert.Equal(t, int64(200), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) +} From dd9eefd111d1b6faef38d0c78c5e422145a87eba Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 24 Mar 2026 18:26:42 +0900 Subject: [PATCH 2/7] Clear CT Signed-off-by: SungJin1212 --- pkg/cortexpb/timeseriesv2.go | 2 ++ pkg/cortexpb/timeseriesv2_test.go | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/pkg/cortexpb/timeseriesv2.go b/pkg/cortexpb/timeseriesv2.go index 291ac32789..7f01b6f90d 100644 --- a/pkg/cortexpb/timeseriesv2.go +++ b/pkg/cortexpb/timeseriesv2.go @@ -120,6 +120,8 @@ func ReuseTimeseriesV2(ts *TimeSeriesV2) { ts.Metadata.Type = 0 ts.Metadata.UnitRef = 0 ts.Metadata.HelpRef = 0 + // Clear CT + ts.CreatedTimestamp = 0 // clear exemplar label refs for i := range ts.Exemplars { diff --git a/pkg/cortexpb/timeseriesv2_test.go b/pkg/cortexpb/timeseriesv2_test.go index 270a859711..0d272ebb5e 100644 --- a/pkg/cortexpb/timeseriesv2_test.go +++ b/pkg/cortexpb/timeseriesv2_test.go @@ -42,6 +42,8 @@ func TestTimeseriesV2FromPool(t *testing.T) { ts.Samples = []Sample{{Value: 1, TimestampMs: 2}} ts.Exemplars = []ExemplarV2{{LabelsRefs: []uint32{1, 2}, Value: 1, Timestamp: 2}} ts.Histograms = []Histogram{{}} + ts.CreatedTimestamp = 12345 + ts.Metadata = MetadataV2{Type: 1, HelpRef: 2, UnitRef: 3} ReuseTimeseriesV2(ts) reused := TimeseriesV2FromPool() @@ -49,6 +51,10 @@ func TestTimeseriesV2FromPool(t *testing.T) { assert.Len(t, reused.Samples, 0) assert.Len(t, reused.Exemplars, 0) assert.Len(t, reused.Histograms, 0) + assert.Zero(t, reused.CreatedTimestamp) + assert.Zero(t, reused.Metadata.Type) + assert.Zero(t, reused.Metadata.HelpRef) + assert.Zero(t, reused.Metadata.UnitRef) }) } From 72f6eff1c290f81b9ccb9d5186db807440009535 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 24 Mar 2026 20:33:51 +0900 Subject: [PATCH 3/7] Add feature flag Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- docs/configuration/config-file-reference.md | 6 + docs/configuration/v1-guarantees.md | 1 + integration/remote_write_v2_test.go | 154 ++++++++++++++++++++ pkg/util/push/push.go | 24 ++- pkg/util/push/push_test.go | 81 ++++++---- pkg/util/validation/limits.go | 6 + schemas/cortex-config-schema.json | 6 + 8 files changed, 244 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12f2fbf976..3bc586441b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## master / unreleased -* [FEATURE] Distributor: Support start timestamp and created timestamps ingestion on Prometheus Remote Write 2.0. #7371 +* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 7c1cd7265d..fcf337d534 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4111,6 +4111,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.enable-type-and-unit-labels [enable_type_and_unit_labels: | default = false] +# EXPERIMENTAL: If true, StartTimestampMs (ST) is handled for remote write v2 +# samples and histograms. CreatedTimestamp (CT) is used as a fallback when ST is +# not set. +# CLI flag: -distributor.enable-start-timestamp +[enable_start_timestamp: | default = false] + # The maximum number of active series per user, per ingester. 0 to disable. # CLI flag: -ingester.max-series-per-user [max_series_per_user: | default = 5000000] diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 4bb632023e..be3ee78ce5 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -115,6 +115,7 @@ Currently experimental features are: - Distributor/Ingester: Stream push connection - Enable stream push connection between distributor and ingester by setting `-distributor.use-stream-push=true` on Distributor. - Add `__type__` and `__unit__` labels to OTLP and remote write v2 requests (`-distributor.enable-type-and-unit-labels`) + - Handle StartTimestampMs (ST) for remote write v2 samples and histograms, using CreatedTimestamp (CT) as a fallback when ST is not set (`-distributor.enable-start-timestamp`) - Ingester: Series Queried Metric - Enable on Ingester via `-ingester.active-queried-series-metrics-enabled=true` - Set the time window to expose via metrics using `-ingester.active-queried-series-metrics-windows=2h`. At least 1 time window is required to expose the metric. diff --git a/integration/remote_write_v2_test.go b/integration/remote_write_v2_test.go index 1a806e3d39..c61cc1aa6d 100644 --- a/integration/remote_write_v2_test.go +++ b/integration/remote_write_v2_test.go @@ -305,6 +305,7 @@ func TestIngest(t *testing.T) { // Distributor. "-distributor.replication-factor": "1", "-distributor.remote-writev2-enabled": "true", + "-distributor.enable-start-timestamp": "true", // Store-gateway. "-store-gateway.sharding-enabled": "false", // alert manager @@ -427,6 +428,7 @@ func TestIngest_StartTimestamp(t *testing.T) { // Distributor. "-distributor.replication-factor": "1", "-distributor.remote-writev2-enabled": "true", + "-distributor.enable-start-timestamp": "true", // Store-gateway. "-store-gateway.sharding-enabled": "false", // alert manager @@ -566,6 +568,7 @@ func TestIngest_CreatedTimestampFallback(t *testing.T) { "-consul.hostname": consul.NetworkHTTPEndpoint(), "-distributor.replication-factor": "1", "-distributor.remote-writev2-enabled": "true", + "-distributor.enable-start-timestamp": "true", "-store-gateway.sharding-enabled": "false", "-alertmanager.web.external-url": "http://localhost/alertmanager", }, @@ -647,6 +650,157 @@ func TestIngest_CreatedTimestampFallback(t *testing.T) { assert.Equal(t, model.FloatString(expectedHist.Sum), histMatrix[0].Histograms[1].Histogram.Sum) } +func TestIngest_StartAndCreatedTimestampIgnoredWhenDisabled(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + flags := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + "-distributor.replication-factor": "1", + "-distributor.remote-writev2-enabled": "true", + "-distributor.enable-start-timestamp": "false", + "-store-gateway.sharding-enabled": "false", + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + path := path.Join(s.SharedDir(), "cortex-1") + flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path}) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + sampleTs := time.Now().Truncate(time.Second) + startTs := sampleTs.Add(-2 * time.Second) + step := sampleTs.Sub(startTs) + + t.Run("ST is ignored", func(t *testing.T) { + sampleSymbols := []string{"", "__name__", "test_start_timestamp_ignored_sample"} + sampleSeries := []writev2.TimeSeries{{ + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{{ + Value: 42, + Timestamp: e2e.TimeToMilliseconds(sampleTs), + StartTimestamp: e2e.TimeToMilliseconds(startTs), + }}, + }} + + writeStats, err := c.PushV2(sampleSymbols, sampleSeries) + require.NoError(t, err) + testPushHeader(t, writeStats, 1, 0, 0) + + sampleResult, err := c.QueryRange("test_start_timestamp_ignored_sample", startTs, sampleTs, step) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, sampleResult.Type()) + + sampleMatrix := sampleResult.(model.Matrix) + require.Len(t, sampleMatrix, 1) + require.Len(t, sampleMatrix[0].Values, 1) + require.Empty(t, sampleMatrix[0].Histograms) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(sampleTs)), sampleMatrix[0].Values[0].Timestamp) + assert.Equal(t, model.SampleValue(42), sampleMatrix[0].Values[0].Value) + + histogramIdx := rand.Uint32() + symbols, series := e2e.GenerateHistogramSeriesV2("test_start_timestamp_ignored_histogram", sampleTs, histogramIdx, false, false) + series[0].Histograms[0].StartTimestamp = e2e.TimeToMilliseconds(startTs) + + writeStats, err = c.PushV2(symbols, series) + require.NoError(t, err) + testPushHeader(t, writeStats, 0, 1, 0) + + histResult, err := c.QueryRange("test_start_timestamp_ignored_histogram", startTs, sampleTs, step) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, histResult.Type()) + + histMatrix := histResult.(model.Matrix) + require.Len(t, histMatrix, 1) + require.Empty(t, histMatrix[0].Values) + require.Len(t, histMatrix[0].Histograms, 1) + require.NotNil(t, histMatrix[0].Histograms[0].Histogram) + + expectedHist := tsdbutil.GenerateTestHistogram(int64(histogramIdx)) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(sampleTs)), histMatrix[0].Histograms[0].Timestamp) + assert.Equal(t, model.FloatString(expectedHist.Count), histMatrix[0].Histograms[0].Histogram.Count) + assert.Equal(t, model.FloatString(expectedHist.Sum), histMatrix[0].Histograms[0].Histogram.Sum) + }) + + t.Run("CT fallback is ignored", func(t *testing.T) { + sampleReq := &cortexpb.WriteRequestV2{ + Symbols: []string{"", "__name__", "test_created_timestamp_ignored_sample"}, + Timeseries: []cortexpb.PreallocTimeseriesV2{{ + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2}, + CreatedTimestamp: e2e.TimeToMilliseconds(startTs), + Samples: []cortexpb.Sample{{Value: 7, TimestampMs: e2e.TimeToMilliseconds(sampleTs)}}, + }, + }}, + } + pushCortexV2Request(t, cortex.HTTPEndpoint(), "user-1", sampleReq) + + sampleResult, err := c.QueryRange("test_created_timestamp_ignored_sample", startTs, sampleTs, step) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, sampleResult.Type()) + + sampleMatrix := sampleResult.(model.Matrix) + require.Len(t, sampleMatrix, 1) + require.Len(t, sampleMatrix[0].Values, 1) + require.Empty(t, sampleMatrix[0].Histograms) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(sampleTs)), sampleMatrix[0].Values[0].Timestamp) + assert.Equal(t, model.SampleValue(7), sampleMatrix[0].Values[0].Value) + + h := cortexpb.HistogramToHistogramProto(e2e.TimeToMilliseconds(sampleTs), tsdbutil.GenerateTestHistogram(3)) + histReq := &cortexpb.WriteRequestV2{ + Symbols: []string{"", "__name__", "test_created_timestamp_ignored_histogram"}, + Timeseries: []cortexpb.PreallocTimeseriesV2{{ + TimeSeriesV2: &cortexpb.TimeSeriesV2{ + LabelsRefs: []uint32{1, 2}, + CreatedTimestamp: e2e.TimeToMilliseconds(startTs), + Histograms: []cortexpb.Histogram{h}, + }, + }}, + } + pushCortexV2Request(t, cortex.HTTPEndpoint(), "user-1", histReq) + + histResult, err := c.QueryRange("test_created_timestamp_ignored_histogram", startTs, sampleTs, step) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, histResult.Type()) + + histMatrix := histResult.(model.Matrix) + require.Len(t, histMatrix, 1) + require.Empty(t, histMatrix[0].Values) + require.Len(t, histMatrix[0].Histograms, 1) + require.NotNil(t, histMatrix[0].Histograms[0].Histogram) + + expectedHist := tsdbutil.GenerateTestHistogram(3) + assert.Equal(t, model.Time(e2e.TimeToMilliseconds(sampleTs)), histMatrix[0].Histograms[0].Timestamp) + assert.Equal(t, model.FloatString(expectedHist.Count), histMatrix[0].Histograms[0].Histogram.Count) + assert.Equal(t, model.FloatString(expectedHist.Sum), histMatrix[0].Histograms[0].Histogram.Sum) + }) +} + func TestExemplar(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 3a7a904d65..cfe51d0fc6 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -112,7 +112,7 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool, req.Source = cortexpb.API } - v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID)) + v1Req, err := convertV2RequestToV1(&req, overrides.EnableTypeAndUnitLabels(userID), overrides.EnableStartTimestamp(userID)) if err != nil { level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) @@ -208,7 +208,7 @@ func setPRW2RespHeader(w http.ResponseWriter, samples, histograms, exemplars int w.Header().Set(rw20WrittenExemplarsHeader, strconv.FormatInt(exemplars, 10)) } -func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool) (cortexpb.PreallocWriteRequest, error) { +func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUnitLabels bool, enableStartTimestamp bool) (cortexpb.PreallocWriteRequest, error) { var v1Req cortexpb.PreallocWriteRequest v1Timeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries)) var v1Metadata []*cortexpb.MetricMetadata @@ -254,17 +254,25 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni ts := cortexpb.TimeseriesFromPool() ts.Labels = cortexpb.FromLabelsToLabelAdapters(lbs) for _, sample := range v2Ts.Samples { - // Backward compatibility: use created_timestamp as a fallback (before the Prometheus v3.8.0 model) - if sample.StartTimestampMs == 0 { - sample.StartTimestampMs = v2Ts.CreatedTimestamp + if enableStartTimestamp { + // Use created_timestamp as a fallback for start_timestamp_ms when not set. + if sample.StartTimestampMs == 0 { + sample.StartTimestampMs = v2Ts.CreatedTimestamp + } + } else { + sample.StartTimestampMs = 0 } ts.Samples = append(ts.Samples, sample) } ts.Exemplars = exemplars for _, histogram := range v2Ts.Histograms { - // Backward compatibility: use series-level created_timestamp as a fallback (before the Prometheus v3.8.0 model) - if histogram.StartTimestampMs == 0 { - histogram.StartTimestampMs = v2Ts.CreatedTimestamp + if enableStartTimestamp { + // Use created_timestamp as a fallback for start_timestamp_ms when not set. + if histogram.StartTimestampMs == 0 { + histogram.StartTimestampMs = v2Ts.CreatedTimestamp + } + } else { + histogram.StartTimestampMs = 0 } ts.Histograms = append(ts.Histograms, histogram) } diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 198c16133b..fe13dda0f8 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -75,7 +75,7 @@ func makeV2ReqWithSeries(num int) *cortexpb.PreallocWriteRequestV2 { func createPRW1HTTPRequest(seriesNum int) (*http.Request, error) { series := makeV2ReqWithSeries(seriesNum) - v1Req, err := convertV2RequestToV1(series, false) + v1Req, err := convertV2RequestToV1(series, false, false) if err != nil { return nil, err } @@ -168,7 +168,7 @@ func Benchmark_convertV2RequestToV1(b *testing.B) { b.ReportAllocs() for b.Loop() { - _, err := convertV2RequestToV1(series, false) + _, err := convertV2RequestToV1(series, false, false) require.NoError(b, err) } }) @@ -377,7 +377,7 @@ func Test_convertV2RequestToV1_WithEnableTypeAndUnitLabels(t *testing.T) { for _, test := range tests { t.Run(test.desc, func(t *testing.T) { - v1Req, err := convertV2RequestToV1(test.v2Req, test.enableTypeAndUnitLabels) + v1Req, err := convertV2RequestToV1(test.v2Req, test.enableTypeAndUnitLabels, false) for i := range v1Req.Timeseries { if len(v1Req.Timeseries[i].Samples) == 0 { @@ -441,7 +441,7 @@ func Test_convertV2RequestToV1(t *testing.T) { v2Req.Symbols = symbols v2Req.Timeseries = timeseries - v1Req, err := convertV2RequestToV1(&v2Req, false) + v1Req, err := convertV2RequestToV1(&v2Req, false, false) assert.NoError(t, err) expectedSamples := 3 expectedExemplars := 2 @@ -541,7 +541,7 @@ func Test_convertV2RequestToV1_InvalidSymbolRefs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := convertV2RequestToV1(tt.v2Req, false) + _, err := convertV2RequestToV1(tt.v2Req, false, false) if tt.expectedError == "" { assert.NoError(t, err) } else { @@ -1208,7 +1208,7 @@ func Test_convertV2RequestToV1_DeepCopy(t *testing.T) { }, } - v1Req, err := convertV2RequestToV1(v2Req, false) + v1Req, err := convertV2RequestToV1(v2Req, false, false) require.NoError(t, err) require.Len(t, v1Req.Timeseries, 1) @@ -1245,14 +1245,23 @@ func Test_convertV2RequestToV1_PreservesStartTimestamp(t *testing.T) { }, } - v1Req, err := convertV2RequestToV1(v2Req, false) - require.NoError(t, err) - require.Len(t, v1Req.Timeseries, 1) - require.Len(t, v1Req.Timeseries[0].Samples, 1) - require.Len(t, v1Req.Timeseries[0].Histograms, 1) + t.Run("enableStartTimestamp=true preserves ST", func(t *testing.T) { + v1Req, err := convertV2RequestToV1(v2Req, false, true) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries[0].Samples, 1) + require.Len(t, v1Req.Timeseries[0].Histograms, 1) + assert.Equal(t, int64(100), v1Req.Timeseries[0].Samples[0].StartTimestampMs) + assert.Equal(t, int64(200), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) + }) - assert.Equal(t, int64(100), v1Req.Timeseries[0].Samples[0].StartTimestampMs) - assert.Equal(t, int64(200), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) + t.Run("enableStartTimestamp=false clears ST", func(t *testing.T) { + v1Req, err := convertV2RequestToV1(v2Req, false, false) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries[0].Samples, 1) + require.Len(t, v1Req.Timeseries[0].Histograms, 1) + assert.Equal(t, int64(0), v1Req.Timeseries[0].Samples[0].StartTimestampMs) + assert.Equal(t, int64(0), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) + }) } func Test_convertV2RequestToV1_UsesCreatedTimestampAsFallback(t *testing.T) { @@ -1272,14 +1281,23 @@ func Test_convertV2RequestToV1_UsesCreatedTimestampAsFallback(t *testing.T) { }, } - v1Req, err := convertV2RequestToV1(v2Req, false) - require.NoError(t, err) - require.Len(t, v1Req.Timeseries, 1) - require.Len(t, v1Req.Timeseries[0].Samples, 1) - require.Len(t, v1Req.Timeseries[0].Histograms, 1) + t.Run("enableStartTimestamp=true uses CT as fallback for ST", func(t *testing.T) { + v1Req, err := convertV2RequestToV1(v2Req, false, true) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries[0].Samples, 1) + require.Len(t, v1Req.Timeseries[0].Histograms, 1) + assert.Equal(t, int64(777), v1Req.Timeseries[0].Samples[0].StartTimestampMs) + assert.Equal(t, int64(777), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) + }) - assert.Equal(t, int64(777), v1Req.Timeseries[0].Samples[0].StartTimestampMs) - assert.Equal(t, int64(777), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) + t.Run("enableStartTimestamp=false ignores CT", func(t *testing.T) { + v1Req, err := convertV2RequestToV1(v2Req, false, false) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries[0].Samples, 1) + require.Len(t, v1Req.Timeseries[0].Histograms, 1) + assert.Equal(t, int64(0), v1Req.Timeseries[0].Samples[0].StartTimestampMs) + assert.Equal(t, int64(0), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) + }) } func Test_convertV2RequestToV1_ExplicitStartTimestampTakesPrecedence(t *testing.T) { @@ -1303,12 +1321,21 @@ func Test_convertV2RequestToV1_ExplicitStartTimestampTakesPrecedence(t *testing. }, } - v1Req, err := convertV2RequestToV1(v2Req, false) - require.NoError(t, err) - require.Len(t, v1Req.Timeseries, 1) - require.Len(t, v1Req.Timeseries[0].Samples, 1) - require.Len(t, v1Req.Timeseries[0].Histograms, 1) + t.Run("enableStartTimestamp=true: explicit ST takes precedence over CT", func(t *testing.T) { + v1Req, err := convertV2RequestToV1(v2Req, false, true) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries[0].Samples, 1) + require.Len(t, v1Req.Timeseries[0].Histograms, 1) + assert.Equal(t, int64(100), v1Req.Timeseries[0].Samples[0].StartTimestampMs) + assert.Equal(t, int64(200), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) + }) - assert.Equal(t, int64(100), v1Req.Timeseries[0].Samples[0].StartTimestampMs) - assert.Equal(t, int64(200), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) + t.Run("enableStartTimestamp=false: ST and CT are both ignored", func(t *testing.T) { + v1Req, err := convertV2RequestToV1(v2Req, false, false) + require.NoError(t, err) + require.Len(t, v1Req.Timeseries[0].Samples, 1) + require.Len(t, v1Req.Timeseries[0].Histograms, 1) + assert.Equal(t, int64(0), v1Req.Timeseries[0].Samples[0].StartTimestampMs) + assert.Equal(t, int64(0), v1Req.Timeseries[0].Histograms[0].StartTimestampMs) + }) } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 73f09fe340..87a12a6db4 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -154,6 +154,7 @@ type Limits struct { MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` EnableTypeAndUnitLabels bool `yaml:"enable_type_and_unit_labels" json:"enable_type_and_unit_labels"` + EnableStartTimestamp bool `yaml:"enable_start_timestamp" json:"enable_start_timestamp"` // Ingester enforced limits. // Series @@ -274,6 +275,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var((*flagext.StringSliceCSV)(&l.PromoteResourceAttributes), "distributor.promote-resource-attributes", "Comma separated list of resource attributes that should be converted to labels.") f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.") f.BoolVar(&l.EnableTypeAndUnitLabels, "distributor.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This applies to remote write v2 and OTLP requests.") + f.BoolVar(&l.EnableStartTimestamp, "distributor.enable-start-timestamp", false, "EXPERIMENTAL: If true, StartTimestampMs (ST) is handled for remote write v2 samples and histograms. CreatedTimestamp (CT) is used as a fallback when ST is not set.") f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") @@ -1137,6 +1139,10 @@ func (o *Overrides) EnableTypeAndUnitLabels(userID string) bool { return o.GetOverridesForUser(userID).EnableTypeAndUnitLabels } +func (o *Overrides) EnableStartTimestamp(userID string) bool { + return o.GetOverridesForUser(userID).EnableStartTimestamp +} + func (o *Overrides) DisabledRuleGroups(userID string) DisabledRuleGroups { if o.tenantLimits != nil { l := o.tenantLimits.ByUserID(userID) diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 20cf970c35..20043252c6 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5077,6 +5077,12 @@ "type": "boolean", "x-cli-flag": "blocks-storage.tsdb.enable-native-histograms" }, + "enable_start_timestamp": { + "default": false, + "description": "EXPERIMENTAL: If true, StartTimestampMs (ST) is handled for remote write v2 samples and histograms. CreatedTimestamp (CT) is used as a fallback when ST is not set.", + "type": "boolean", + "x-cli-flag": "distributor.enable-start-timestamp" + }, "enable_type_and_unit_labels": { "default": false, "description": "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This applies to remote write v2 and OTLP requests.", From 98d86e648044d8911aff6bc642133f6e5176748b Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 24 Mar 2026 20:46:32 +0900 Subject: [PATCH 4/7] pre-allocate v2 samples/histograms Signed-off-by: SungJin1212 --- pkg/util/push/push.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index cfe51d0fc6..64e169e58d 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -253,6 +253,7 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni ts := cortexpb.TimeseriesFromPool() ts.Labels = cortexpb.FromLabelsToLabelAdapters(lbs) + ts.Samples = make([]cortexpb.Sample, 0, len(v2Ts.Samples)) for _, sample := range v2Ts.Samples { if enableStartTimestamp { // Use created_timestamp as a fallback for start_timestamp_ms when not set. @@ -265,6 +266,7 @@ func convertV2RequestToV1(req *cortexpb.PreallocWriteRequestV2, enableTypeAndUni ts.Samples = append(ts.Samples, sample) } ts.Exemplars = exemplars + ts.Histograms = make([]cortexpb.Histogram, 0, len(v2Ts.Histograms)) for _, histogram := range v2Ts.Histograms { if enableStartTimestamp { // Use created_timestamp as a fallback for start_timestamp_ms when not set. From bf2d4f2d8dc9a63a9fe34277438ff86f892ac3d3 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 24 Mar 2026 20:58:19 +0900 Subject: [PATCH 5/7] fix test Signed-off-by: SungJin1212 --- pkg/util/validation/exporter_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index 0b1ef21ce8..8ee72f7b6a 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -54,6 +54,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="compactor_tenant_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="creation_grace_period",user="tenant-a"} 600 cortex_overrides{limit_name="enable_native_histograms",user="tenant-a"} 0 + cortex_overrides{limit_name="enable_start_timestamp",user="tenant-a"} 0 cortex_overrides{limit_name="enable_type_and_unit_labels",user="tenant-a"} 0 cortex_overrides{limit_name="enforce_metadata_metric_name",user="tenant-a"} 1 cortex_overrides{limit_name="enforce_metric_name",user="tenant-a"} 1 From aca82c01e41b1eb525a80bc7bcf4bb6fb16c90ff Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 25 Mar 2026 18:21:46 +0900 Subject: [PATCH 6/7] Add metric Signed-off-by: SungJin1212 --- pkg/ingester/ingester.go | 6 ++-- pkg/ingester/ingester_test.go | 65 +++++++++++++++++++++++++++++++++++ pkg/ingester/metrics.go | 8 +++++ pkg/ingester/metrics_test.go | 4 +++ 4 files changed, 81 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index b843026bed..9d1b740310 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1463,7 +1463,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if s.StartTimestampMs != 0 && s.TimestampMs != 0 { // TODO(SungJin1212): Change to AppendSTZeroSample after update the Prometheus v3.9.0+ if _, err = app.AppendCTZeroSample(ref, copiedLabels, s.TimestampMs, s.StartTimestampMs); err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { - level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to append start timestamp for sample", "user", userID, "series", copiedLabels.String(), "timestamp", s.TimestampMs, "start_timestamp", s.StartTimestampMs, "err", err) + level.Debug(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to append start timestamp for sample", "user", userID, "series", copiedLabels.String(), "timestamp", s.TimestampMs, "start_timestamp", s.StartTimestampMs, "err", err) + i.metrics.startTimestampFail.WithLabelValues(sampleMetricTypeFloat).Inc() } } @@ -1516,7 +1517,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if hp.StartTimestampMs != 0 && hp.TimestampMs != 0 { // TODO(SungJin1212): Change to AppendHistogramSTZeroSample after update the Prometheus v3.9.0+ if _, err = app.AppendHistogramCTZeroSample(ref, copiedLabels, hp.TimestampMs, hp.StartTimestampMs, h, fh); err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { - level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to append start timestamp for histogram", "user", userID, "series", copiedLabels.String(), "timestamp", hp.TimestampMs, "start_timestamp", hp.StartTimestampMs, "err", err) + i.metrics.startTimestampFail.WithLabelValues(sampleMetricTypeHistogram).Inc() + level.Debug(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to append start timestamp for histogram", "user", userID, "series", copiedLabels.String(), "timestamp", hp.TimestampMs, "start_timestamp", hp.StartTimestampMs, "err", err) } } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 0fbcfc44fa..64843e32ba 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2216,6 +2216,71 @@ func TestIngester_Push_StartTimestamp(t *testing.T) { } } +func TestIngester_Push_StartTimestampAppendFailureMetrics(t *testing.T) { + tests := []struct { + name string + req *cortexpb.WriteRequest + expectedType string + unexpectedType string + }{ + { + name: "sample start timestamp append failure increments float metric", + req: cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, "test_start_timestamp_failure_sample")}, + []cortexpb.Sample{{Value: 42, TimestampMs: 200, StartTimestampMs: math.MinInt64}}, + nil, + nil, + cortexpb.API, + ), + expectedType: sampleMetricTypeFloat, + unexpectedType: sampleMetricTypeHistogram, + }, + { + name: "histogram start timestamp append failure increments histogram metric", + req: func() *cortexpb.WriteRequest { + h := cortexpb.HistogramToHistogramProto(200, tsdbutil.GenerateTestHistogram(1)) + h.StartTimestampMs = math.MinInt64 + return cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, "test_start_timestamp_failure_histogram")}, + nil, + nil, + []cortexpb.Histogram{h}, + cortexpb.API, + ) + }(), + expectedType: sampleMetricTypeHistogram, + unexpectedType: sampleMetricTypeFloat, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + + limits := defaultLimitsTestConfig() + limits.EnableNativeHistograms = true + + registry := prometheus.NewRegistry() + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test") + _, err = ing.Push(ctx, tc.req) + require.NoError(t, err) + + require.Equal(t, float64(1), testutil.ToFloat64(ing.metrics.startTimestampFail.WithLabelValues(tc.expectedType))) + require.Equal(t, float64(0), testutil.ToFloat64(ing.metrics.startTimestampFail.WithLabelValues(tc.unexpectedType))) + }) + } +} + // Referred from https://github.com/prometheus/prometheus/blob/v3.9.1/model/histogram/histogram_test.go#L1384. func TestIngester_PushNativeHistogramErrors(t *testing.T) { metricLabelAdapters := []cortexpb.LabelAdapter{{Name: labels.MetricName, Value: "test"}} diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 773646cb12..f0fbddd334 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -36,6 +36,7 @@ type ingesterMetrics struct { ingestedMetadata prometheus.Counter ingestedSamplesFail prometheus.Counter ingestedHistogramsFail prometheus.Counter + startTimestampFail *prometheus.CounterVec ingestedExemplarsFail prometheus.Counter ingestedMetadataFail prometheus.Counter ingestedHistogramBuckets *prometheus.HistogramVec @@ -123,6 +124,10 @@ func newIngesterMetrics(r prometheus.Registerer, Name: "cortex_ingester_ingested_native_histograms_failures_total", Help: "The total number of native histograms that errored on ingestion.", }), + startTimestampFail: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_start_timestamp_append_failures_total", + Help: "Total number of failed appends for samples and histograms with a start timestamp.", + }, []string{"type"}), ingestedExemplarsFail: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_ingested_exemplars_failures_total", Help: "The total number of exemplars that errored on ingestion.", @@ -355,6 +360,9 @@ func newIngesterMetrics(r prometheus.Registerer, }, []string{"user"}) } + m.startTimestampFail.WithLabelValues(sampleMetricTypeFloat) + m.startTimestampFail.WithLabelValues(sampleMetricTypeHistogram) + return m } diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 011e530c89..cc1c85929c 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -151,6 +151,10 @@ func TestIngesterMetrics(t *testing.T) { # HELP cortex_ingester_ingested_samples_total The total number of samples ingested. # TYPE cortex_ingester_ingested_samples_total counter cortex_ingester_ingested_samples_total 0 + # HELP cortex_ingester_start_timestamp_append_failures_total Total number of failed appends for samples and histograms with a start timestamp. + # TYPE cortex_ingester_start_timestamp_append_failures_total counter + cortex_ingester_start_timestamp_append_failures_total{type="float"} 0 + cortex_ingester_start_timestamp_append_failures_total{type="histogram"} 0 # HELP cortex_ingester_ingested_native_histograms_total The total number of native histograms ingested. # TYPE cortex_ingester_ingested_native_histograms_total counter cortex_ingester_ingested_native_histograms_total 0 From 9a24b47638ed2bce184075f403a1c183de78fd01 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 26 Mar 2026 10:28:26 +0900 Subject: [PATCH 7/7] move debug log to push level Signed-off-by: SungJin1212 --- pkg/ingester/ingester.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9d1b740310..8411dbe63e 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1336,6 +1336,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte failedHistogramsCount = 0 succeededExemplarsCount = 0 failedExemplarsCount = 0 + startTimestampSampleAppendFailCount = 0 + startTimestampHistogramAppendFailCount = 0 startAppend = time.Now() sampleOutOfBoundsCount = 0 sampleOutOfOrderCount = 0 @@ -1463,7 +1465,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if s.StartTimestampMs != 0 && s.TimestampMs != 0 { // TODO(SungJin1212): Change to AppendSTZeroSample after update the Prometheus v3.9.0+ if _, err = app.AppendCTZeroSample(ref, copiedLabels, s.TimestampMs, s.StartTimestampMs); err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { - level.Debug(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to append start timestamp for sample", "user", userID, "series", copiedLabels.String(), "timestamp", s.TimestampMs, "start_timestamp", s.StartTimestampMs, "err", err) + startTimestampSampleAppendFailCount++ i.metrics.startTimestampFail.WithLabelValues(sampleMetricTypeFloat).Inc() } } @@ -1517,8 +1519,8 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if hp.StartTimestampMs != 0 && hp.TimestampMs != 0 { // TODO(SungJin1212): Change to AppendHistogramSTZeroSample after update the Prometheus v3.9.0+ if _, err = app.AppendHistogramCTZeroSample(ref, copiedLabels, hp.TimestampMs, hp.StartTimestampMs, h, fh); err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { + startTimestampHistogramAppendFailCount++ i.metrics.startTimestampFail.WithLabelValues(sampleMetricTypeHistogram).Inc() - level.Debug(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to append start timestamp for histogram", "user", userID, "series", copiedLabels.String(), "timestamp", hp.TimestampMs, "start_timestamp", hp.StartTimestampMs, "err", err) } } @@ -1603,6 +1605,15 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // At this point all samples have been added to the appender, so we can track the time it took. i.TSDBState.appenderAddDuration.Observe(time.Since(startAppend).Seconds()) + if startTimestampSampleAppendFailCount > 0 || startTimestampHistogramAppendFailCount > 0 { + level.Debug(logutil.WithContext(ctx, i.logger)).Log( + "msg", "failed to append start timestamp in push", + "user", userID, + "sample_failures", startTimestampSampleAppendFailCount, + "histogram_failures", startTimestampHistogramAppendFailCount, + ) + } + startCommit := time.Now() if err := app.Commit(); err != nil { return nil, wrapWithUser(err, userID)