From 9c39ecd3bb9957553dc69728a69ed362a38e129c Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 20 May 2026 11:56:39 -0400 Subject: [PATCH 1/3] schema: unify events_raw column names and types with prod Arrow path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In-place patch of docker/init/00-schema.sql, the CH-native exporter, and the TAP tests so the docker quickstart schema aligns with what prod actually writes to (datagres_otel.query_logs_arrow in clickgres-platform). This is the pre-cutover unification: pg_stat_ch's CH-native path was previously isolated from prod, and the two schemas had drifted apart on both column naming and types. Column renames (prod-side naming wins; closer to OTel semantic conventions and minimizes downstream churn): ts_start -> ts db -> db_name username -> db_user cmd_type -> db_operation query -> query_text Type fix: err_sqlstate FixedString(5) -> LowCardinality(String) FixedString does not round-trip through Arrow IPC cleanly, and ~270 SQLSTATE codes are dictionary-friendly. The CH-native exporter is updated to write the column via TagString (clickhouse-cpp's ColumnString -> CH LowCardinality(String) is fine on the wire). Envelope columns added (with DEFAULT '' so the CH-native exporter, which does not yet emit these, continues to insert successfully): instance_ubid, server_ubid, server_role, region, cell, service_version, host_id, pod_name Engine/partitioning aligned with prod: ORDER BY ts -> ORDER BY (instance_ubid, ts) (tenant locality) TTL added: toDate(ts) + INTERVAL 180 DAY SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1 Materialized views (events_recent_1h, query_stats_5m, db_app_user_1m, errors_recent) updated to reference the new column names and to include instance_ubid in their ORDER BY / GROUP BY / SELECT projections so they remain consistent with the events_raw partitioning strategy. Test fixtures updated to query the new column names: t/010_clickhouse_export.pl, t/012_timing_accuracy.pl, t/021_cmd_type_counts.pl, t/027_query_normalization.pl, t/031_normalize_cache.pl parent_query_id is intentionally NOT included here — it's the subject of PR #95 (parent-query-id-surgical) and lands as its own follow-up migration after this PR. Validated end-to-end: docker/init/00-schema.sql applies cleanly on clickhouse/clickhouse-server:26.1 (the version pinned in docker/docker-compose.test.yml); INSERTs that omit the envelope columns fill them via DEFAULT ''; all 4 MVs build. CI will run the TAP suite. Co-Authored-By: Claude Opus 4.7 (1M context) --- docker/init/00-schema.sql | 127 ++++++++++++++++++------------ src/export/clickhouse_exporter.cc | 8 +- src/export/exporter_interface.h | 10 +-- src/export/stats_exporter.cc | 9 ++- t/010_clickhouse_export.pl | 12 +-- t/012_timing_accuracy.pl | 4 +- t/021_cmd_type_counts.pl | 4 +- t/027_query_normalization.pl | 52 ++++++------ t/031_normalize_cache.pl | 28 +++---- 9 files changed, 140 insertions(+), 114 deletions(-) diff --git a/docker/init/00-schema.sql b/docker/init/00-schema.sql index 4b15ef2..eee768a 100644 --- a/docker/init/00-schema.sql +++ b/docker/init/00-schema.sql @@ -32,7 +32,8 @@ DROP TABLE IF EXISTS pg_stat_ch.events_raw; -- exported in batches by the pg_stat_ch background worker. -- -- Partitioned by date for efficient time-range queries and data retention. --- Ordered by ts_start for efficient global time-range scans. +-- Ordered by (instance_ubid, ts) to keep tenant data colocated and bound +-- per-tenant time-range scans. -- -- ============================================================================ @@ -41,23 +42,23 @@ CREATE TABLE pg_stat_ch.events_raw -- ======================================================================== -- Core identity and timing -- ======================================================================== - ts_start DateTime64(6, 'UTC') COMMENT 'Query execution start timestamp (UTC). Used for time-range filtering and partitioning.', + ts DateTime64(6, 'UTC') COMMENT 'Query execution start timestamp (UTC). Used for time-range filtering and partitioning.', duration_us UInt64 COMMENT 'Total query execution time in microseconds. HIGH: slow query, investigate EXPLAIN. LOW: fast query. Compare with p95/p99 from query_stats_5m to identify outliers.', - db LowCardinality(String) COMMENT 'PostgreSQL database name. Use for multi-tenant filtering and per-database load analysis.', + db_name LowCardinality(String) COMMENT 'PostgreSQL database name. Use for multi-tenant filtering and per-database load analysis.', - username LowCardinality(String) COMMENT 'PostgreSQL user/role name. Useful for auditing and per-user resource tracking.', + db_user LowCardinality(String) COMMENT 'PostgreSQL user/role name. Useful for auditing and per-user resource tracking.', pid Int32 COMMENT 'PostgreSQL backend process ID. Correlate with pg_stat_activity for session debugging.', query_id Int64 COMMENT '64-bit hash identifying normalized queries. Queries differing only in constants share the same query_id. Use for aggregating statistics across similar queries.', - cmd_type LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', + db_operation LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', rows UInt64 COMMENT 'Rows returned (SELECT) or affected (INSERT/UPDATE/DELETE). HIGH: large result sets or bulk operations. LOW: point queries. Watch for unexpected HIGH values indicating missing WHERE clauses.', - query String COMMENT 'Full SQL query text (may be truncated). Used for debugging and query analysis.', + query_text String COMMENT 'Full SQL query text (may be truncated). Used for debugging and query analysis.', -- ======================================================================== -- Shared buffer metrics (main buffer cache) @@ -176,7 +177,7 @@ CREATE TABLE pg_stat_ch.events_raw -- Captured via emit_log_hook when a query produces an error or warning. -- Useful for error tracking, debugging, and monitoring error rates. -- ======================================================================== - err_sqlstate FixedString(5) COMMENT 'SQL standard 5-character error code. Examples: 42P01=undefined_table, 23505=unique_violation, 42601=syntax_error, 57014=query_canceled. See PostgreSQL error codes appendix.', + err_sqlstate LowCardinality(String) COMMENT 'SQL standard 5-character error code. Examples: 42P01=undefined_table, 23505=unique_violation, 42601=syntax_error, 57014=query_canceled. See PostgreSQL error codes appendix.', err_elevel UInt8 COMMENT 'Error severity level. 0=none (success), 19=WARNING, 21=ERROR, 22=FATAL, 23=PANIC. Filter err_elevel>=21 for actual errors. WARNING (19) indicates potential issues.', @@ -189,11 +190,30 @@ CREATE TABLE pg_stat_ch.events_raw -- ======================================================================== app LowCardinality(String) COMMENT 'Client application_name. Set via connection string or SET application_name. Use for identifying load sources: "pgAdmin", "myapp-api", "pg_dump", etc.', - client_addr String COMMENT 'Client IP address. Useful for geographic analysis, debugging connection issues, and identifying load sources by host.' + client_addr String COMMENT 'Client IP address. Useful for geographic analysis, debugging connection issues, and identifying load sources by host.', + + -- ======================================================================== + -- OTel resource attributes (envelope) + -- ======================================================================== + -- Populated from psch_extra_attributes (pg_stat_ch GUC). One row carries + -- the resource context for the emitting Postgres instance. Default to '' + -- so the CH-native exporter (which does not yet emit these) inserts + -- successfully against this schema. + -- ======================================================================== + instance_ubid String DEFAULT '' COMMENT 'Ubicloud ID of the emitting Postgres instance. String (opaque, not LC) — cardinality scales with active customer count.', + server_ubid String DEFAULT '' COMMENT 'Ubicloud ID of the underlying server. String (opaque).', + server_role LowCardinality(String) DEFAULT '' COMMENT 'Server role within the HA pair. ~2 values: "primary", "standby".', + region LowCardinality(String) DEFAULT '' COMMENT 'Cloud region (e.g. "us-east-1"). ~tens of values.', + cell LowCardinality(String) DEFAULT '' COMMENT 'Cell (sharded deployment unit) within the region. ~hundreds of values.', + service_version LowCardinality(String) DEFAULT '' COMMENT 'pg_stat_ch extension version.', + host_id String DEFAULT '' COMMENT 'Physical host identifier. String (opaque).', + pod_name String DEFAULT '' COMMENT 'Kubernetes pod name. String (opaque).' ) ENGINE = MergeTree -PARTITION BY toDate(ts_start) -ORDER BY ts_start; +PARTITION BY toDate(ts) +ORDER BY (instance_ubid, ts) +TTL toDate(ts) + INTERVAL 180 DAY +SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; -- ============================================================================ @@ -213,10 +233,10 @@ ORDER BY ts_start; -- - events_raw can have longer retention (days/weeks) -- -- EXAMPLE QUERY: --- SELECT ts_start, db, duration_us/1000 AS ms, substring(query, 1, 100) +-- SELECT ts, db_name, duration_us/1000 AS ms, substring(query_text, 1, 100) -- FROM pg_stat_ch.events_recent_1h --- WHERE ts_start > now() - INTERVAL 5 MINUTE --- ORDER BY ts_start DESC +-- WHERE ts > now() - INTERVAL 5 MINUTE +-- ORDER BY ts DESC -- LIMIT 50; -- -- ============================================================================ @@ -225,9 +245,9 @@ DROP TABLE IF EXISTS pg_stat_ch.events_recent_1h; CREATE MATERIALIZED VIEW pg_stat_ch.events_recent_1h ENGINE = MergeTree -PARTITION BY toDate(ts_start) -ORDER BY ts_start -TTL toDateTime(ts_start) + INTERVAL 1 HOUR DELETE +PARTITION BY toDate(ts) +ORDER BY (instance_ubid, ts) +TTL toDateTime(ts) + INTERVAL 1 HOUR DELETE AS SELECT * FROM pg_stat_ch.events_raw; @@ -260,14 +280,14 @@ FROM pg_stat_ch.events_raw; -- -- SELECT -- query_id, --- cmd_type, +-- db_operation, -- countMerge(calls_state) AS calls, -- round(sumMerge(duration_sum_state) / countMerge(calls_state) / 1000, 2) AS avg_ms, -- round(quantilesTDigestMerge(0.95, 0.99)(duration_q_state)[1] / 1000, 2) AS p95_ms, -- round(quantilesTDigestMerge(0.95, 0.99)(duration_q_state)[2] / 1000, 2) AS p99_ms -- FROM pg_stat_ch.query_stats_5m -- WHERE bucket >= now() - INTERVAL 1 HOUR --- GROUP BY query_id, cmd_type +-- GROUP BY query_id, db_operation -- ORDER BY p99_ms DESC -- LIMIT 10; -- @@ -278,9 +298,10 @@ DROP TABLE IF EXISTS pg_stat_ch.query_stats_5m; CREATE MATERIALIZED VIEW pg_stat_ch.query_stats_5m ( bucket DateTime COMMENT '5-minute time bucket start', - db LowCardinality(String) COMMENT 'Database name', + instance_ubid String COMMENT 'Emitting instance ID — first ORDER BY key to keep tenant data colocated.', + db_name LowCardinality(String) COMMENT 'Database name', query_id Int64 COMMENT 'Normalized query identifier', - cmd_type LowCardinality(String) COMMENT 'Command type (SELECT, INSERT, etc.)', + db_operation LowCardinality(String) COMMENT 'Command type (SELECT, INSERT, etc.)', calls_state AggregateFunction(count) COMMENT 'Call count state. Finalize with countMerge().', duration_sum_state AggregateFunction(sum, UInt64) COMMENT 'Total duration state. Finalize with sumMerge().', @@ -294,13 +315,14 @@ CREATE MATERIALIZED VIEW pg_stat_ch.query_stats_5m ) ENGINE = AggregatingMergeTree PARTITION BY toYYYYMMDD(bucket) -ORDER BY (bucket, db, query_id, cmd_type) +ORDER BY (instance_ubid, bucket, db_name, query_id, db_operation) AS SELECT - toStartOfInterval(toDateTime(ts_start), INTERVAL 5 MINUTE) AS bucket, - db, + toStartOfInterval(toDateTime(ts), INTERVAL 5 MINUTE) AS bucket, + instance_ubid, + db_name, query_id, - cmd_type, + db_operation, countState() AS calls_state, sumState(duration_us) AS duration_sum_state, @@ -312,7 +334,7 @@ SELECT sumState(shared_blks_hit) AS shared_hit_sum_state, sumState(shared_blks_read) AS shared_read_sum_state FROM pg_stat_ch.events_raw -GROUP BY bucket, db, query_id, cmd_type; +GROUP BY bucket, instance_ubid, db_name, query_id, db_operation; -- ============================================================================ @@ -343,14 +365,14 @@ GROUP BY bucket, db, query_id, cmd_type; -- EXAMPLE: Error rate by database and user -- -- SELECT --- db, --- username, +-- db_name, +-- db_user, -- countMerge(calls_state) AS queries, -- sumMerge(errors_sum_state) AS errors, -- round(100 * sumMerge(errors_sum_state) / countMerge(calls_state), 2) AS error_pct -- FROM pg_stat_ch.db_app_user_1m -- WHERE bucket >= now() - INTERVAL 1 HOUR --- GROUP BY db, username +-- GROUP BY db_name, db_user -- HAVING errors > 0 -- ORDER BY error_pct DESC; -- @@ -361,10 +383,11 @@ DROP TABLE IF EXISTS pg_stat_ch.db_app_user_1m; CREATE MATERIALIZED VIEW pg_stat_ch.db_app_user_1m ( bucket DateTime COMMENT '1-minute time bucket start', - db LowCardinality(String) COMMENT 'Database name', + instance_ubid String COMMENT 'Emitting instance ID — first ORDER BY key for tenant locality.', + db_name LowCardinality(String) COMMENT 'Database name', app LowCardinality(String) COMMENT 'Application name', - username LowCardinality(String) COMMENT 'PostgreSQL username', - cmd_type LowCardinality(String) COMMENT 'Command type', + db_user LowCardinality(String) COMMENT 'PostgreSQL user/role name', + db_operation LowCardinality(String) COMMENT 'Command type', calls_state AggregateFunction(count) COMMENT 'Query count state. Finalize with countMerge().', duration_sum_state AggregateFunction(sum, UInt64) COMMENT 'Total duration state (μs). Finalize with sumMerge().', @@ -373,21 +396,22 @@ CREATE MATERIALIZED VIEW pg_stat_ch.db_app_user_1m ) ENGINE = AggregatingMergeTree PARTITION BY toYYYYMMDD(bucket) -ORDER BY (bucket, db, app, username, cmd_type) +ORDER BY (instance_ubid, bucket, db_name, app, db_user, db_operation) AS SELECT - toStartOfMinute(toDateTime(ts_start)) AS bucket, - db, + toStartOfMinute(toDateTime(ts)) AS bucket, + instance_ubid, + db_name, app, - username, - cmd_type, + db_user, + db_operation, countState() AS calls_state, sumState(duration_us) AS duration_sum_state, quantilesTDigestState(0.95, 0.99)(duration_us) AS duration_q_state, sumState(toUInt64(err_elevel > 0)) AS errors_sum_state FROM pg_stat_ch.events_raw -GROUP BY bucket, db, app, username, cmd_type; +GROUP BY bucket, instance_ubid, db_name, app, db_user, db_operation; -- ============================================================================ @@ -409,16 +433,16 @@ GROUP BY bucket, db, app, username, cmd_type; -- EXAMPLE: Recent errors with query context -- -- SELECT --- ts_start, --- db, --- username, +-- ts, +-- db_name, +-- db_user, -- app, -- err_sqlstate, -- err_message, --- substring(query, 1, 200) AS query_preview +-- substring(query_text, 1, 200) AS query_preview -- FROM pg_stat_ch.errors_recent --- WHERE ts_start > now() - INTERVAL 1 HOUR --- ORDER BY ts_start DESC +-- WHERE ts > now() - INTERVAL 1 HOUR +-- ORDER BY ts DESC -- LIMIT 100; -- -- EXAMPLE: Error breakdown by SQLSTATE @@ -429,7 +453,7 @@ GROUP BY bucket, db, app, username, cmd_type; -- uniq(query_id) AS unique_queries, -- any(err_message) AS sample_message -- FROM pg_stat_ch.errors_recent --- WHERE ts_start > now() - INTERVAL 24 HOUR +-- WHERE ts > now() - INTERVAL 24 HOUR -- GROUP BY err_sqlstate -- ORDER BY occurrences DESC; -- @@ -450,14 +474,15 @@ DROP TABLE IF EXISTS pg_stat_ch.errors_recent; CREATE MATERIALIZED VIEW pg_stat_ch.errors_recent ENGINE = MergeTree -PARTITION BY toDate(ts_start) -ORDER BY ts_start -TTL toDateTime(ts_start) + INTERVAL 7 DAY DELETE +PARTITION BY toDate(ts) +ORDER BY (instance_ubid, ts) +TTL toDateTime(ts) + INTERVAL 7 DAY DELETE AS SELECT - ts_start, - db, - username, + ts, + instance_ubid, + db_name, + db_user, app, client_addr, pid, @@ -465,6 +490,6 @@ SELECT err_sqlstate, err_elevel, err_message, - query + query_text FROM pg_stat_ch.events_raw WHERE err_elevel > 0; diff --git a/src/export/clickhouse_exporter.cc b/src/export/clickhouse_exporter.cc index 743c537..57aea0f 100644 --- a/src/export/clickhouse_exporter.cc +++ b/src/export/clickhouse_exporter.cc @@ -69,11 +69,11 @@ class ClickHouseExporter : public StatsExporter { } // Semantic columns - shared_ptr> DbNameColumn() final { return TagString("db"); } - shared_ptr> DbUserColumn() final { return TagString("username"); } + shared_ptr> DbNameColumn() final { return TagString("db_name"); } + shared_ptr> DbUserColumn() final { return TagString("db_user"); } shared_ptr> DbDurationColumn() final { return MetricUInt64("duration_us"); } - shared_ptr> DbOperationColumn() final { return TagString("cmd_type"); } - shared_ptr> DbQueryTextColumn() final { return RecordString("query"); } + shared_ptr> DbOperationColumn() final { return TagString("db_operation"); } + shared_ptr> DbQueryTextColumn() final { return RecordString("query_text"); } void BeginBatch() final { block = std::make_unique(); diff --git a/src/export/exporter_interface.h b/src/export/exporter_interface.h index 018762d..a4a638d 100644 --- a/src/export/exporter_interface.h +++ b/src/export/exporter_interface.h @@ -54,17 +54,17 @@ class StatsExporter { // instrument). Pure virtuals enforce explicit handling in every exporter. // =========================================================================== - // Database name. CH: TagString "db"; OTel semconv: "db.name" tag. + // Database name. CH: TagString "db_name"; OTel semconv: "db.name" tag. virtual shared_ptr> DbNameColumn() = 0; - // Authenticated user. CH: TagString "username"; OTel semconv: "db.user" tag. + // Authenticated user. CH: TagString "db_user"; OTel semconv: "db.user" tag. virtual shared_ptr> DbUserColumn() = 0; // Query duration. Caller appends microseconds. CH: MetricUInt64 "duration_us"; // OTel: converts to seconds, records as Histogram "db.client.operation.duration". virtual shared_ptr> DbDurationColumn() = 0; - // SQL command type. CH: RecordString "cmd_type"; OTel: TagString "db.operation.name" + // SQL command type. CH: TagString "db_operation"; OTel: TagString "db.operation.name" // (used as a dimension on the duration histogram). virtual shared_ptr> DbOperationColumn() = 0; - // Query text. CH: RecordString "query"; OTel semconv: "db.query.text". + // Query text. CH: RecordString "query_text"; OTel semconv: "db.query.text". virtual shared_ptr> DbQueryTextColumn() = 0; virtual void BeginBatch() = 0; @@ -95,7 +95,7 @@ void RecordExporterFailure(const char* message); // Expected usage: // void ProcessBatch(StatsExporter *exporter) { // exporter->BeginBatch(); // no op or ClickHouse column reset -// auto col_user = exporter->TagString("username"); +// auto col_user = exporter->TagString("db_user"); // auto col_rows = exporter->MetricUInt64("rows"); // // for (const auto &ev : events) { diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index ee19661..6dfa985 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -235,7 +235,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte exporter->BeginBatch(); - auto col_ts_start = exporter->RecordDateTime("ts_start"); + auto col_ts = exporter->RecordDateTime("ts"); auto col_duration_us = exporter->DbDurationColumn(); auto col_db = exporter->DbNameColumn(); auto col_username = exporter->DbUserColumn(); @@ -280,7 +280,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte auto col_parallel_workers_planned = exporter->RecordInt16("parallel_workers_planned"); auto col_parallel_workers_launched = exporter->RecordInt16("parallel_workers_launched"); - auto col_err_sqlstate = exporter->MetricFixedString(5, "err_sqlstate"); + auto col_err_sqlstate = exporter->TagString("err_sqlstate"); auto col_err_elevel = exporter->RecordUInt8("err_elevel"); auto col_err_message = exporter->RecordString("err_message"); @@ -290,7 +290,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte for (const auto& ev : events) { exporter->BeginRow(); - col_ts_start->Append(ev.ts_start + kPostgresEpochOffsetUs); + col_ts->Append(ev.ts_start + kPostgresEpochOffsetUs); col_duration_us->Append(ev.duration_us); col_db->Append(std::string(ev.datname, ev.datname_len)); col_username->Append(std::string(ev.username, ev.username_len)); @@ -337,7 +337,8 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte col_parallel_workers_planned->Append(ev.parallel_workers_planned); col_parallel_workers_launched->Append(ev.parallel_workers_launched); - col_err_sqlstate->Append(std::string_view(ev.err_sqlstate, 5)); + col_err_sqlstate->Append( + std::string(ev.err_sqlstate, strnlen(ev.err_sqlstate, sizeof(ev.err_sqlstate)))); col_err_elevel->Append(ev.err_elevel); auto elen = ClampFieldLen(ev.err_message_len, static_cast(PSCH_MAX_ERR_MSG_LEN), "err_message_len"); diff --git a/t/010_clickhouse_export.pl b/t/010_clickhouse_export.pl index fd2768b..64a4bd4 100644 --- a/t/010_clickhouse_export.pl +++ b/t/010_clickhouse_export.pl @@ -58,7 +58,7 @@ # Verify query field is populated my $query_check = psch_wait_for_clickhouse_query( - "SELECT count() FROM pg_stat_ch.events_raw WHERE query != ''", + "SELECT count() FROM pg_stat_ch.events_raw WHERE query_text != ''", sub { $_[0] >= 1 }, 10 ); @@ -134,18 +134,18 @@ cmp_ok($duration_check, '>=', 1, 'duration_us is populated'); my $db_check = psch_wait_for_clickhouse_query( - "SELECT count() FROM pg_stat_ch.events_raw WHERE db = 'postgres'", + "SELECT count() FROM pg_stat_ch.events_raw WHERE db_name = 'postgres'", sub { $_[0] >= 1 }, 10 ); - cmp_ok($db_check, '>=', 1, 'db field is populated'); + cmp_ok($db_check, '>=', 1, 'db_name field is populated'); - my $cmd_type_check = psch_wait_for_clickhouse_query( - "SELECT count() FROM pg_stat_ch.events_raw WHERE cmd_type != ''", + my $db_operation_check = psch_wait_for_clickhouse_query( + "SELECT count() FROM pg_stat_ch.events_raw WHERE db_operation != ''", sub { $_[0] >= 1 }, 10 ); - cmp_ok($cmd_type_check, '>=', 1, 'cmd_type is populated'); + cmp_ok($db_operation_check, '>=', 1, 'db_operation is populated'); # Clean up $node->safe_psql('postgres', 'DROP TABLE IF EXISTS test_fields'); diff --git a/t/012_timing_accuracy.pl b/t/012_timing_accuracy.pl index d615f09..d306caa 100644 --- a/t/012_timing_accuracy.pl +++ b/t/012_timing_accuracy.pl @@ -101,7 +101,7 @@ # Get timing statistics my $timing_stats = psch_query_clickhouse( "SELECT count(), avg(duration_us), min(duration_us), max(duration_us) " . - "FROM pg_stat_ch.events_raw WHERE query LIKE '%pgbench%' OR query LIKE '%UPDATE%'" + "FROM pg_stat_ch.events_raw WHERE query_text LIKE '%pgbench%' OR query_text LIKE '%UPDATE%'" ); diag("Timing stats: $timing_stats"); @@ -131,7 +131,7 @@ # Check that we captured a duration close to 100ms my $duration = psch_query_clickhouse( "SELECT duration_us FROM pg_stat_ch.events_raw " . - "WHERE query LIKE '%pg_sleep%' ORDER BY ts_start DESC LIMIT 1" + "WHERE query_text LIKE '%pg_sleep%' ORDER BY ts DESC LIMIT 1" ); if ($duration) { diff --git a/t/021_cmd_type_counts.pl b/t/021_cmd_type_counts.pl index aa475dd..cccf9ac 100755 --- a/t/021_cmd_type_counts.pl +++ b/t/021_cmd_type_counts.pl @@ -28,10 +28,10 @@ batch_max => 100 ); -# Helper: parse cmd_type counts from ClickHouse +# Helper: parse db_operation counts from ClickHouse sub get_cmd_type_counts { my $result = psch_query_clickhouse( - "SELECT cmd_type, count() FROM pg_stat_ch.events_raw GROUP BY cmd_type FORMAT TabSeparated" + "SELECT db_operation, count() FROM pg_stat_ch.events_raw GROUP BY db_operation FORMAT TabSeparated" ); my %counts; for my $line (split /\n/, $result) { diff --git a/t/027_query_normalization.pl b/t/027_query_normalization.pl index 2c4bac6..a2702f2 100644 --- a/t/027_query_normalization.pl +++ b/t/027_query_normalization.pl @@ -53,12 +53,12 @@ sub get_captured_query { psch_wait_for_export($node, 1, 10); return psch_wait_for_clickhouse_query( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query NOT LIKE '%pg_extension%' " . - "AND query != '' " . - "ORDER BY ts_start DESC LIMIT 1", + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text NOT LIKE '%pg_extension%' " . + "AND query_text != '' " . + "ORDER BY ts DESC LIMIT 1", sub { $_[0] ne '' }, 10 ); @@ -193,11 +193,11 @@ sub get_captured_query { psch_wait_for_export($node, 2, 10); my $all_queries = psch_wait_for_clickhouse_query( - "SELECT groupArray(query) FROM pg_stat_ch.events_raw " . + "SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%test_norm%' " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query != ''", + "AND query_text LIKE '%test_norm%' " . + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text != ''", sub { $_[0] ne '' }, 10 ); @@ -292,19 +292,19 @@ sub get_captured_query { $session->quit(); my $error_query = psch_query_clickhouse( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE err_message != '' " . - "ORDER BY ts_start DESC LIMIT 1" + "ORDER BY ts DESC LIMIT 1" ); is($error_query, '', 'Error events do not export query text'); my $q = psch_wait_for_clickhouse_query( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE err_message = '' " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query NOT LIKE '%pg_extension%' " . - "AND query != '' " . - "ORDER BY ts_start DESC LIMIT 1", + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text NOT LIKE '%pg_extension%' " . + "AND query_text != '' " . + "ORDER BY ts DESC LIMIT 1", sub { $_[0] ne '' }, 10 ); @@ -388,8 +388,8 @@ sub get_captured_query { my $nested_count = psch_wait_for_clickhouse_query( "SELECT count() FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%WHERE%' " . - "AND query LIKE '%nested_normalize_same_sql%'", + "AND query_text LIKE '%WHERE%' " . + "AND query_text LIKE '%nested_normalize_same_sql%'", sub { $_[0] >= 3 }, 10 ); @@ -397,10 +397,10 @@ sub get_captured_query { 'Captured recursive nested executions of the same SPI statement'); my $queries = psch_wait_for_clickhouse_query( - "SELECT groupArray(query) FROM pg_stat_ch.events_raw " . + "SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%WHERE%' " . - "AND query LIKE '%nested_normalize_same_sql%'", + "AND query_text LIKE '%WHERE%' " . + "AND query_text LIKE '%nested_normalize_same_sql%'", sub { $_[0] ne '' }, 10 ); @@ -433,8 +433,8 @@ sub get_captured_query { my $repeat_count = psch_wait_for_clickhouse_query( "SELECT count() FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%WHERE%' " . - "AND query LIKE '%nested_normalize_same_sql%'", + "AND query_text LIKE '%WHERE%' " . + "AND query_text LIKE '%nested_normalize_same_sql%'", sub { $_[0] >= 4 }, 10 ); @@ -442,10 +442,10 @@ sub get_captured_query { 'Captured repeated executions of the same SPI statement in one backend'); my $repeat_queries = psch_wait_for_clickhouse_query( - "SELECT groupArray(query) FROM pg_stat_ch.events_raw " . + "SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%WHERE%' " . - "AND query LIKE '%nested_normalize_same_sql%'", + "AND query_text LIKE '%WHERE%' " . + "AND query_text LIKE '%nested_normalize_same_sql%'", sub { $_[0] ne '' }, 10 ); diff --git a/t/031_normalize_cache.pl b/t/031_normalize_cache.pl index 68842b1..ea9bd11 100644 --- a/t/031_normalize_cache.pl +++ b/t/031_normalize_cache.pl @@ -85,11 +85,11 @@ # Collect all exported query texts from this backend. my $all_queries = psch_wait_for_clickhouse_query( - "SELECT groupArray(query) FROM pg_stat_ch.events_raw " . + "SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query NOT LIKE '%pg_extension%' " . - "AND query != ''", + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text NOT LIKE '%pg_extension%' " . + "AND query_text != ''", sub { $_[0] ne '' }, 10 ); @@ -107,9 +107,9 @@ my $count = psch_wait_for_clickhouse_query( "SELECT count() FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query NOT LIKE '%pg_extension%' " . - "AND query != ''", + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text NOT LIKE '%pg_extension%' " . + "AND query_text != ''", sub { $_[0] >= scalar(@queries) }, 10 ); @@ -160,10 +160,10 @@ psch_wait_for_export($node, 1, 10); my $exported_query = psch_wait_for_clickhouse_query( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE 'SELECT%pg_class%' " . - "ORDER BY ts_start DESC LIMIT 1", + "AND query_text LIKE 'SELECT%pg_class%' " . + "ORDER BY ts DESC LIMIT 1", sub { $_[0] ne '' }, 10 ); @@ -236,10 +236,10 @@ psch_wait_for_export($node, 1, 10); my $first = psch_wait_for_clickhouse_query( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%pg_class%' " . - "AND query LIKE '%\$1%' " . + "AND query_text LIKE '%pg_class%' " . + "AND query_text LIKE '%\$1%' " . "LIMIT 1", sub { $_[0] ne '' }, 10 @@ -281,7 +281,7 @@ my $empty_count = psch_query_clickhouse( "SELECT count() FROM pg_stat_ch.events_raw " . - "WHERE pid = $pid AND query = ''" + "WHERE pid = $pid AND query_text = ''" ); cmp_ok($empty_count, '>=', 1, 'Evicted cache entry produces empty query text on re-EXECUTE'); From 299115b11c2abc4e95c4d8e05072d4378c65de91 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 20 May 2026 12:52:13 -0400 Subject: [PATCH 2/3] schema: move docker init schema into Goose migrations directory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mechanical move + goose annotations. The pg_stat_ch ClickHouse schema that was previously the docker quickstart init script becomes the first real Goose migration under schema/migrations/, matching clickgres-platform's runner layout (pressly/goose v3, DialectClickHouse, embed.FS). Changes to the content of the moved file: * Header banner rewritten from "CANONICAL SCHEMA REFERENCE / single source of truth / dual role as docker init" to "initial migration" framing. * Added -- +goose Up / -- +goose Down section markers. * Each CREATE DATABASE / CREATE TABLE / CREATE MATERIALIZED VIEW wrapped in -- +goose StatementBegin / StatementEnd so goose's parser handles the multi-statement bodies correctly. * Removed the pre-CREATE "DROP TABLE IF EXISTS X" idioms — those existed to make the docker init script idempotent on container restart, but goose tracks state via goose_db_version. Drops now live exclusively in the -- +goose Down section in reverse dependency order. The schema content itself (column names, types, MV definitions, ORDER BY / TTL / SETTINGS) is unchanged from the previous commit. Git rename detection should follow docker/init/00-schema.sql -> schema/migrations/20260519000001_create_initial_schema.sql. Also adds schema/migrations/00000000000001_bootstrap.sql, a no-op SELECT 1 migration required by goose to seed the goose_db_version table (copied verbatim from clickgres-platform's bootstrap). Validated end-to-end against clickhouse/clickhouse-server:26.1: pressly goose v3.27.1 `up` and `reset` round-trip cleanly. All 51 columns and 4 MVs land with the expected types. Note: this leaves docker/init/ empty. The docker-compose mounts will need updating in a follow-on PR to point at schema/migrations/ (which requires a small shim to invoke goose-up at container start, since clickhouse-server's docker entrypoint cannot parse goose's StatementBegin/End directives directly). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../migrations/00000000000001_bootstrap.sql | 9 +++ .../20260519000001_create_initial_schema.sql | 62 ++++++++++++------- 2 files changed, 49 insertions(+), 22 deletions(-) create mode 100644 schema/migrations/00000000000001_bootstrap.sql rename docker/init/00-schema.sql => schema/migrations/20260519000001_create_initial_schema.sql (96%) diff --git a/schema/migrations/00000000000001_bootstrap.sql b/schema/migrations/00000000000001_bootstrap.sql new file mode 100644 index 0000000..1b3cdc5 --- /dev/null +++ b/schema/migrations/00000000000001_bootstrap.sql @@ -0,0 +1,9 @@ +-- Bootstrap migration: seeds the Goose version table so that subsequent +-- migrations have a baseline. The SELECT 1 statements are intentional +-- no-ops — do not delete this file. + +-- +goose Up +SELECT 1; + +-- +goose Down +SELECT 1; diff --git a/docker/init/00-schema.sql b/schema/migrations/20260519000001_create_initial_schema.sql similarity index 96% rename from docker/init/00-schema.sql rename to schema/migrations/20260519000001_create_initial_schema.sql index eee768a..a56e7d7 100644 --- a/docker/init/00-schema.sql +++ b/schema/migrations/20260519000001_create_initial_schema.sql @@ -1,18 +1,10 @@ -- ============================================================================ --- ClickHouse schema for pg_stat_ch events +-- Initial pg_stat_ch ClickHouse schema: events_raw + 4 aggregation MVs. -- ============================================================================ -- --- CANONICAL SCHEMA REFERENCE --- --- This file is the single source of truth for the pg_stat_ch ClickHouse schema. --- It serves a dual role: --- 1. Docker init script (applied automatically by docker-compose) --- 2. Schema documentation (column comments, MV explanations) --- --- For production deployments: clickhouse-client < docker/init/00-schema.sql --- For documentation: see docs/clickhouse.md --- --- ============================================================================ +-- This is the first real migration on top of bootstrap; everything in here is +-- authored together pre-GA, applied together, and not subject to historical +-- evolution. Subsequent schema changes get their own timestamped files. -- -- This schema is designed for the pg_stat_ch PostgreSQL extension which exports -- raw query execution telemetry to ClickHouse. All aggregation (p50/p95/p99, @@ -20,9 +12,11 @@ -- -- ============================================================================ -CREATE DATABASE IF NOT EXISTS pg_stat_ch; +-- +goose Up -DROP TABLE IF EXISTS pg_stat_ch.events_raw; +-- +goose StatementBegin +CREATE DATABASE IF NOT EXISTS pg_stat_ch; +-- +goose StatementEnd -- ============================================================================ -- events_raw: Raw query execution events @@ -37,6 +31,7 @@ DROP TABLE IF EXISTS pg_stat_ch.events_raw; -- -- ============================================================================ +-- +goose StatementBegin CREATE TABLE pg_stat_ch.events_raw ( -- ======================================================================== @@ -214,6 +209,7 @@ PARTITION BY toDate(ts) ORDER BY (instance_ubid, ts) TTL toDate(ts) + INTERVAL 180 DAY SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; +-- +goose StatementEnd -- ============================================================================ @@ -241,8 +237,7 @@ SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; -- -- ============================================================================ -DROP TABLE IF EXISTS pg_stat_ch.events_recent_1h; - +-- +goose StatementBegin CREATE MATERIALIZED VIEW pg_stat_ch.events_recent_1h ENGINE = MergeTree PARTITION BY toDate(ts) @@ -251,6 +246,7 @@ TTL toDateTime(ts) + INTERVAL 1 HOUR DELETE AS SELECT * FROM pg_stat_ch.events_raw; +-- +goose StatementEnd -- ============================================================================ @@ -293,8 +289,7 @@ FROM pg_stat_ch.events_raw; -- -- ============================================================================ -DROP TABLE IF EXISTS pg_stat_ch.query_stats_5m; - +-- +goose StatementBegin CREATE MATERIALIZED VIEW pg_stat_ch.query_stats_5m ( bucket DateTime COMMENT '5-minute time bucket start', @@ -335,6 +330,7 @@ SELECT sumState(shared_blks_read) AS shared_read_sum_state FROM pg_stat_ch.events_raw GROUP BY bucket, instance_ubid, db_name, query_id, db_operation; +-- +goose StatementEnd -- ============================================================================ @@ -378,8 +374,7 @@ GROUP BY bucket, instance_ubid, db_name, query_id, db_operation; -- -- ============================================================================ -DROP TABLE IF EXISTS pg_stat_ch.db_app_user_1m; - +-- +goose StatementBegin CREATE MATERIALIZED VIEW pg_stat_ch.db_app_user_1m ( bucket DateTime COMMENT '1-minute time bucket start', @@ -412,6 +407,7 @@ SELECT sumState(toUInt64(err_elevel > 0)) AS errors_sum_state FROM pg_stat_ch.events_raw GROUP BY bucket, instance_ubid, db_name, app, db_user, db_operation; +-- +goose StatementEnd -- ============================================================================ @@ -470,8 +466,7 @@ GROUP BY bucket, instance_ubid, db_name, app, db_user, db_operation; -- -- ============================================================================ -DROP TABLE IF EXISTS pg_stat_ch.errors_recent; - +-- +goose StatementBegin CREATE MATERIALIZED VIEW pg_stat_ch.errors_recent ENGINE = MergeTree PARTITION BY toDate(ts) @@ -493,3 +488,26 @@ SELECT query_text FROM pg_stat_ch.events_raw WHERE err_elevel > 0; +-- +goose StatementEnd + +-- +goose Down + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.errors_recent; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.db_app_user_1m; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.query_stats_5m; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.events_recent_1h; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.events_raw; +-- +goose StatementEnd From 002c4ce93316c9a44418df4b0df46fdf291ed4cd Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 20 May 2026 13:42:18 -0400 Subject: [PATCH 3/3] ci(tap): apply schema/migrations via goose instead of raw clickhouse-client The previous "Initialize ClickHouse schema" step ran `clickhouse-client --multiquery < docker/init/00-schema.sql`. That file moved in the previous commit; pointing the step at the new location without further changes would not work because clickhouse-client cannot parse goose -- +goose Up/Down/StatementBegin/End directives, and would execute the Down section's DROP statements right after the Up section's CREATEs. Switch the step to install pressly/goose v3.27.1 (~5 sec on Ubuntu CI runners which have Go preinstalled) and apply the migrations from schema/migrations/ via `goose ... up` against the running CH container. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci-tap.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-tap.yml b/.github/workflows/ci-tap.yml index 718888b..db0f918 100644 --- a/.github/workflows/ci-tap.yml +++ b/.github/workflows/ci-tap.yml @@ -97,9 +97,14 @@ jobs: docker logs clickhouse-test exit 1 + - name: Install goose + run: go install github.com/pressly/goose/v3/cmd/goose@v3.27.1 + - name: Initialize ClickHouse schema run: | - docker exec clickhouse-test clickhouse-client --multiquery < docker/init/00-schema.sql + docker exec clickhouse-test clickhouse-client -q "CREATE DATABASE IF NOT EXISTS pg_stat_ch" + "$HOME/go/bin/goose" -dir schema/migrations \ + clickhouse "tcp://localhost:9000?database=pg_stat_ch" up - name: Run TAP tests run: |