Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .github/workflows/ci-tap.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,14 @@ jobs:
docker logs clickhouse-test
exit 1

- name: Install goose
run: go install github.com/pressly/goose/v3/cmd/goose@v3.27.1

- name: Initialize ClickHouse schema
run: |
docker exec clickhouse-test clickhouse-client --multiquery < docker/init/00-schema.sql
docker exec clickhouse-test clickhouse-client -q "CREATE DATABASE IF NOT EXISTS pg_stat_ch"
"$HOME/go/bin/goose" -dir schema/migrations \
clickhouse "tcp://localhost:9000?database=pg_stat_ch" up

- name: Run TAP tests
run: |
Expand Down
9 changes: 9 additions & 0 deletions schema/migrations/00000000000001_bootstrap.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Bootstrap migration: seeds the Goose version table so that subsequent
-- migrations have a baseline. The SELECT 1 statements are intentional
-- no-ops — do not delete this file.

-- +goose Up
SELECT 1;

-- +goose Down
SELECT 1;

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/export/clickhouse_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ class ClickHouseExporter : public StatsExporter {
}

// Semantic columns
shared_ptr<Column<string>> DbNameColumn() final { return TagString("db"); }
shared_ptr<Column<string>> DbUserColumn() final { return TagString("username"); }
shared_ptr<Column<string>> DbNameColumn() final { return TagString("db_name"); }
shared_ptr<Column<string>> DbUserColumn() final { return TagString("db_user"); }
shared_ptr<Column<uint64_t>> DbDurationColumn() final { return MetricUInt64("duration_us"); }
shared_ptr<Column<string>> DbOperationColumn() final { return TagString("cmd_type"); }
shared_ptr<Column<string_view>> DbQueryTextColumn() final { return RecordString("query"); }
shared_ptr<Column<string>> DbOperationColumn() final { return TagString("db_operation"); }
shared_ptr<Column<string_view>> DbQueryTextColumn() final { return RecordString("query_text"); }

void BeginBatch() final {
block = std::make_unique<clickhouse::Block>();
Expand Down
10 changes: 5 additions & 5 deletions src/export/exporter_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ class StatsExporter {
// instrument). Pure virtuals enforce explicit handling in every exporter.
// ===========================================================================

// Database name. CH: TagString "db"; OTel semconv: "db.name" tag.
// Database name. CH: TagString "db_name"; OTel semconv: "db.name" tag.
virtual shared_ptr<Column<string>> DbNameColumn() = 0;
// Authenticated user. CH: TagString "username"; OTel semconv: "db.user" tag.
// Authenticated user. CH: TagString "db_user"; OTel semconv: "db.user" tag.
virtual shared_ptr<Column<string>> DbUserColumn() = 0;
// Query duration. Caller appends microseconds. CH: MetricUInt64 "duration_us";
// OTel: converts to seconds, records as Histogram<double> "db.client.operation.duration".
virtual shared_ptr<Column<uint64_t>> DbDurationColumn() = 0;
// SQL command type. CH: RecordString "cmd_type"; OTel: TagString "db.operation.name"
// SQL command type. CH: TagString "db_operation"; OTel: TagString "db.operation.name"
// (used as a dimension on the duration histogram).
virtual shared_ptr<Column<string>> DbOperationColumn() = 0;
// Query text. CH: RecordString "query"; OTel semconv: "db.query.text".
// Query text. CH: RecordString "query_text"; OTel semconv: "db.query.text".
virtual shared_ptr<Column<string_view>> DbQueryTextColumn() = 0;

virtual void BeginBatch() = 0;
Expand Down Expand Up @@ -95,7 +95,7 @@ void RecordExporterFailure(const char* message);
// Expected usage:
// void ProcessBatch(StatsExporter *exporter) {
// exporter->BeginBatch(); // no op or ClickHouse column reset
// auto col_user = exporter->TagString("username");
// auto col_user = exporter->TagString("db_user");
// auto col_rows = exporter->MetricUInt64("rows");
//
// for (const auto &ev : events) {
Expand Down
9 changes: 5 additions & 4 deletions src/export/stats_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ void ExportEventStatsInternal(const std::vector<PschEvent>& events, StatsExporte

exporter->BeginBatch();

auto col_ts_start = exporter->RecordDateTime("ts_start");
auto col_ts = exporter->RecordDateTime("ts");
auto col_duration_us = exporter->DbDurationColumn();
auto col_db = exporter->DbNameColumn();
auto col_username = exporter->DbUserColumn();
Expand Down Expand Up @@ -280,7 +280,7 @@ void ExportEventStatsInternal(const std::vector<PschEvent>& events, StatsExporte
auto col_parallel_workers_planned = exporter->RecordInt16("parallel_workers_planned");
auto col_parallel_workers_launched = exporter->RecordInt16("parallel_workers_launched");

auto col_err_sqlstate = exporter->MetricFixedString(5, "err_sqlstate");
auto col_err_sqlstate = exporter->TagString("err_sqlstate");
auto col_err_elevel = exporter->RecordUInt8("err_elevel");
auto col_err_message = exporter->RecordString("err_message");

Expand All @@ -290,7 +290,7 @@ void ExportEventStatsInternal(const std::vector<PschEvent>& events, StatsExporte
for (const auto& ev : events) {
exporter->BeginRow();

col_ts_start->Append(ev.ts_start + kPostgresEpochOffsetUs);
col_ts->Append(ev.ts_start + kPostgresEpochOffsetUs);
col_duration_us->Append(ev.duration_us);
col_db->Append(std::string(ev.datname, ev.datname_len));
col_username->Append(std::string(ev.username, ev.username_len));
Expand Down Expand Up @@ -337,7 +337,8 @@ void ExportEventStatsInternal(const std::vector<PschEvent>& events, StatsExporte
col_parallel_workers_planned->Append(ev.parallel_workers_planned);
col_parallel_workers_launched->Append(ev.parallel_workers_launched);

col_err_sqlstate->Append(std::string_view(ev.err_sqlstate, 5));
col_err_sqlstate->Append(
std::string(ev.err_sqlstate, strnlen(ev.err_sqlstate, sizeof(ev.err_sqlstate))));
col_err_elevel->Append(ev.err_elevel);
auto elen = ClampFieldLen(ev.err_message_len, static_cast<uint16>(PSCH_MAX_ERR_MSG_LEN),
"err_message_len");
Expand Down
12 changes: 6 additions & 6 deletions t/010_clickhouse_export.pl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

# Verify query field is populated
my $query_check = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw WHERE query != ''",
"SELECT count() FROM pg_stat_ch.events_raw WHERE query_text != ''",
sub { $_[0] >= 1 },
10
);
Expand Down Expand Up @@ -134,18 +134,18 @@
cmp_ok($duration_check, '>=', 1, 'duration_us is populated');

my $db_check = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw WHERE db = 'postgres'",
"SELECT count() FROM pg_stat_ch.events_raw WHERE db_name = 'postgres'",
sub { $_[0] >= 1 },
10
);
cmp_ok($db_check, '>=', 1, 'db field is populated');
cmp_ok($db_check, '>=', 1, 'db_name field is populated');

my $cmd_type_check = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw WHERE cmd_type != ''",
my $db_operation_check = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw WHERE db_operation != ''",
sub { $_[0] >= 1 },
10
);
cmp_ok($cmd_type_check, '>=', 1, 'cmd_type is populated');
cmp_ok($db_operation_check, '>=', 1, 'db_operation is populated');

# Clean up
$node->safe_psql('postgres', 'DROP TABLE IF EXISTS test_fields');
Expand Down
4 changes: 2 additions & 2 deletions t/012_timing_accuracy.pl
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
# Get timing statistics
my $timing_stats = psch_query_clickhouse(
"SELECT count(), avg(duration_us), min(duration_us), max(duration_us) " .
"FROM pg_stat_ch.events_raw WHERE query LIKE '%pgbench%' OR query LIKE '%UPDATE%'"
"FROM pg_stat_ch.events_raw WHERE query_text LIKE '%pgbench%' OR query_text LIKE '%UPDATE%'"
);
diag("Timing stats: $timing_stats");

Expand Down Expand Up @@ -131,7 +131,7 @@
# Check that we captured a duration close to 100ms
my $duration = psch_query_clickhouse(
"SELECT duration_us FROM pg_stat_ch.events_raw " .
"WHERE query LIKE '%pg_sleep%' ORDER BY ts_start DESC LIMIT 1"
"WHERE query_text LIKE '%pg_sleep%' ORDER BY ts DESC LIMIT 1"
);

if ($duration) {
Expand Down
4 changes: 2 additions & 2 deletions t/021_cmd_type_counts.pl
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
batch_max => 100
);

# Helper: parse cmd_type counts from ClickHouse
# Helper: parse db_operation counts from ClickHouse
sub get_cmd_type_counts {
my $result = psch_query_clickhouse(
"SELECT cmd_type, count() FROM pg_stat_ch.events_raw GROUP BY cmd_type FORMAT TabSeparated"
"SELECT db_operation, count() FROM pg_stat_ch.events_raw GROUP BY db_operation FORMAT TabSeparated"
);
my %counts;
for my $line (split /\n/, $result) {
Expand Down
52 changes: 26 additions & 26 deletions t/027_query_normalization.pl
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ sub get_captured_query {
psch_wait_for_export($node, 1, 10);

return psch_wait_for_clickhouse_query(
"SELECT query FROM pg_stat_ch.events_raw " .
"SELECT query_text FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query NOT LIKE '%pg_stat_ch%' " .
"AND query NOT LIKE '%pg_extension%' " .
"AND query != '' " .
"ORDER BY ts_start DESC LIMIT 1",
"AND query_text NOT LIKE '%pg_stat_ch%' " .
"AND query_text NOT LIKE '%pg_extension%' " .
"AND query_text != '' " .
"ORDER BY ts DESC LIMIT 1",
sub { $_[0] ne '' },
10
);
Expand Down Expand Up @@ -193,11 +193,11 @@ sub get_captured_query {
psch_wait_for_export($node, 2, 10);

my $all_queries = psch_wait_for_clickhouse_query(
"SELECT groupArray(query) FROM pg_stat_ch.events_raw " .
"SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%test_norm%' " .
"AND query NOT LIKE '%pg_stat_ch%' " .
"AND query != ''",
"AND query_text LIKE '%test_norm%' " .
"AND query_text NOT LIKE '%pg_stat_ch%' " .
"AND query_text != ''",
sub { $_[0] ne '' },
10
);
Expand Down Expand Up @@ -292,19 +292,19 @@ sub get_captured_query {
$session->quit();

my $error_query = psch_query_clickhouse(
"SELECT query FROM pg_stat_ch.events_raw " .
"SELECT query_text FROM pg_stat_ch.events_raw " .
"WHERE err_message != '' " .
"ORDER BY ts_start DESC LIMIT 1"
"ORDER BY ts DESC LIMIT 1"
);
is($error_query, '', 'Error events do not export query text');

my $q = psch_wait_for_clickhouse_query(
"SELECT query FROM pg_stat_ch.events_raw " .
"SELECT query_text FROM pg_stat_ch.events_raw " .
"WHERE err_message = '' " .
"AND query NOT LIKE '%pg_stat_ch%' " .
"AND query NOT LIKE '%pg_extension%' " .
"AND query != '' " .
"ORDER BY ts_start DESC LIMIT 1",
"AND query_text NOT LIKE '%pg_stat_ch%' " .
"AND query_text NOT LIKE '%pg_extension%' " .
"AND query_text != '' " .
"ORDER BY ts DESC LIMIT 1",
sub { $_[0] ne '' },
10
);
Expand Down Expand Up @@ -388,19 +388,19 @@ sub get_captured_query {
my $nested_count = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%WHERE%' " .
"AND query LIKE '%nested_normalize_same_sql%'",
"AND query_text LIKE '%WHERE%' " .
"AND query_text LIKE '%nested_normalize_same_sql%'",
sub { $_[0] >= 3 },
10
);
cmp_ok($nested_count, '>=', 3,
'Captured recursive nested executions of the same SPI statement');

my $queries = psch_wait_for_clickhouse_query(
"SELECT groupArray(query) FROM pg_stat_ch.events_raw " .
"SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%WHERE%' " .
"AND query LIKE '%nested_normalize_same_sql%'",
"AND query_text LIKE '%WHERE%' " .
"AND query_text LIKE '%nested_normalize_same_sql%'",
sub { $_[0] ne '' },
10
);
Expand Down Expand Up @@ -433,19 +433,19 @@ sub get_captured_query {
my $repeat_count = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%WHERE%' " .
"AND query LIKE '%nested_normalize_same_sql%'",
"AND query_text LIKE '%WHERE%' " .
"AND query_text LIKE '%nested_normalize_same_sql%'",
sub { $_[0] >= 4 },
10
);
cmp_ok($repeat_count, '>=', 4,
'Captured repeated executions of the same SPI statement in one backend');

my $repeat_queries = psch_wait_for_clickhouse_query(
"SELECT groupArray(query) FROM pg_stat_ch.events_raw " .
"SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%WHERE%' " .
"AND query LIKE '%nested_normalize_same_sql%'",
"AND query_text LIKE '%WHERE%' " .
"AND query_text LIKE '%nested_normalize_same_sql%'",
sub { $_[0] ne '' },
10
);
Expand Down
28 changes: 14 additions & 14 deletions t/031_normalize_cache.pl
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@

# Collect all exported query texts from this backend.
my $all_queries = psch_wait_for_clickhouse_query(
"SELECT groupArray(query) FROM pg_stat_ch.events_raw " .
"SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query NOT LIKE '%pg_stat_ch%' " .
"AND query NOT LIKE '%pg_extension%' " .
"AND query != ''",
"AND query_text NOT LIKE '%pg_stat_ch%' " .
"AND query_text NOT LIKE '%pg_extension%' " .
"AND query_text != ''",
sub { $_[0] ne '' },
10
);
Expand All @@ -107,9 +107,9 @@
my $count = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query NOT LIKE '%pg_stat_ch%' " .
"AND query NOT LIKE '%pg_extension%' " .
"AND query != ''",
"AND query_text NOT LIKE '%pg_stat_ch%' " .
"AND query_text NOT LIKE '%pg_extension%' " .
"AND query_text != ''",
sub { $_[0] >= scalar(@queries) },
10
);
Expand Down Expand Up @@ -160,10 +160,10 @@
psch_wait_for_export($node, 1, 10);

my $exported_query = psch_wait_for_clickhouse_query(
"SELECT query FROM pg_stat_ch.events_raw " .
"SELECT query_text FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE 'SELECT%pg_class%' " .
"ORDER BY ts_start DESC LIMIT 1",
"AND query_text LIKE 'SELECT%pg_class%' " .
"ORDER BY ts DESC LIMIT 1",
sub { $_[0] ne '' },
10
);
Expand Down Expand Up @@ -236,10 +236,10 @@
psch_wait_for_export($node, 1, 10);

my $first = psch_wait_for_clickhouse_query(
"SELECT query FROM pg_stat_ch.events_raw " .
"SELECT query_text FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%pg_class%' " .
"AND query LIKE '%\$1%' " .
"AND query_text LIKE '%pg_class%' " .
"AND query_text LIKE '%\$1%' " .
"LIMIT 1",
sub { $_[0] ne '' },
10
Expand Down Expand Up @@ -281,7 +281,7 @@

my $empty_count = psch_query_clickhouse(
"SELECT count() FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid AND query = ''"
"WHERE pid = $pid AND query_text = ''"
);
cmp_ok($empty_count, '>=', 1,
'Evicted cache entry produces empty query text on re-EXECUTE');
Expand Down
Loading