diff --git a/.github/workflows/ci-tap.yml b/.github/workflows/ci-tap.yml index 718888b..8e60aca 100644 --- a/.github/workflows/ci-tap.yml +++ b/.github/workflows/ci-tap.yml @@ -42,6 +42,13 @@ jobs: - uses: ./.github/actions/setup-vcpkg + - name: Install uv + # uv is used by Arrow-IPC TAP tests (t/026, t/028) to run inline + # pyarrow scripts that decode the debug_arrow_dump_dir output. + uses: astral-sh/setup-uv@v6 + with: + version: "latest" + - name: Cache PostgreSQL build id: cache-pg uses: actions/cache@v4 diff --git a/docker/init/00-schema.sql b/docker/init/00-schema.sql index 4b15ef2..542a5f2 100644 --- a/docker/init/00-schema.sql +++ b/docker/init/00-schema.sql @@ -53,6 +53,8 @@ CREATE TABLE pg_stat_ch.events_raw query_id Int64 COMMENT '64-bit hash identifying normalized queries. Queries differing only in constants share the same query_id. Use for aggregating statistics across similar queries.', + parent_query_id Int64 COMMENT 'query_id of the calling query (e.g. the plpgsql function that issued this SPI statement). 0 for top-level queries. Use WHERE parent_query_id = 0 to restrict aggregations to top-level queries and avoid double-counting CPU and duration. Signedness matches query_id so the two columns compare/join without explicit casts.', + cmd_type LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', rows UInt64 COMMENT 'Rows returned (SELECT) or affected (INSERT/UPDATE/DELETE). HIGH: large result sets or bulk operations. LOW: point queries. Watch for unexpected HIGH values indicating missing WHERE clauses.', diff --git a/migrations/001_add_parent_query_id.sql b/migrations/001_add_parent_query_id.sql new file mode 100644 index 0000000..6ebb46e --- /dev/null +++ b/migrations/001_add_parent_query_id.sql @@ -0,0 +1,12 @@ +-- Migration: add parent_query_id column +-- +-- Introduced in pg_stat_ch 0.4.x. Each event now carries the query_id of its +-- calling query (e.g. the plpgsql function that issued an SPI statement). +-- Top-level queries emit 0. Use WHERE parent_query_id = 0 in aggregations to +-- avoid double-counting CPU and duration across nested calls. +-- +-- Run against your ClickHouse instance before upgrading the extension: +-- clickhouse-client < migrations/001_add_parent_query_id.sql + +ALTER TABLE pg_stat_ch.events_raw + ADD COLUMN IF NOT EXISTS parent_query_id Int64 DEFAULT 0; diff --git a/src/export/arrow_batch.cc b/src/export/arrow_batch.cc index 75d4ebf..acb611c 100644 --- a/src/export/arrow_batch.cc +++ b/src/export/arrow_batch.cc @@ -149,6 +149,7 @@ struct ArrowBatchBuilder::Impl { arrow::StringBuilder trace_id_builder; arrow::StringBuilder span_id_builder; DictBuilder query_id_builder; + DictBuilder parent_query_id_builder; DictBuilder db_name_builder; DictBuilder db_user_builder; DictBuilder db_operation_builder; @@ -216,6 +217,7 @@ struct ArrowBatchBuilder::Impl { arrow::field("trace_id", arrow::utf8()), arrow::field("span_id", arrow::utf8()), arrow::field("query_id", DictionaryUtf8Type()), + arrow::field("parent_query_id", DictionaryUtf8Type()), arrow::field("db_name", DictionaryUtf8Type()), arrow::field("db_user", DictionaryUtf8Type()), arrow::field("db_operation", DictionaryUtf8Type()), @@ -309,10 +311,15 @@ struct ArrowBatchBuilder::Impl { char queryid_buf[24]; snprintf(queryid_buf, sizeof(queryid_buf), "%" PRIu64, static_cast(event.queryid)); + char parent_query_id_buf[24]; + snprintf(parent_query_id_buf, sizeof(parent_query_id_buf), "%" PRIu64, + static_cast(event.parent_query_id)); char pid_buf[12]; snprintf(pid_buf, sizeof(pid_buf), "%d", event.pid); if (!AppendString(&query_id_builder, queryid_buf, "Arrow query_id append") || + !AppendString(&parent_query_id_builder, parent_query_id_buf, + "Arrow parent_query_id append") || !AppendString(&db_name_builder, db_name, "Arrow db_name append") || !AppendString(&db_user_builder, db_user, "Arrow db_user append") || !AppendString(&db_operation_builder, PschCmdTypeToString(event.cmd_type), @@ -475,6 +482,7 @@ struct ArrowBatchBuilder::Impl { !add_array(&trace_id_builder, "Arrow trace_id finish") || !add_array(&span_id_builder, "Arrow span_id finish") || !add_dict_array(&query_id_builder, "Arrow query_id finish") || + !add_dict_array(&parent_query_id_builder, "Arrow parent_query_id finish") || !add_dict_array(&db_name_builder, "Arrow db_name finish") || !add_dict_array(&db_user_builder, "Arrow db_user finish") || !add_dict_array(&db_operation_builder, "Arrow db_operation finish") || @@ -581,6 +589,7 @@ struct ArrowBatchBuilder::Impl { trace_id_builder.Reset(); span_id_builder.Reset(); query_id_builder.ResetFull(); + parent_query_id_builder.ResetFull(); db_name_builder.ResetFull(); db_user_builder.ResetFull(); db_operation_builder.ResetFull(); diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index ee19661..9d99a3a 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -241,6 +241,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte auto col_username = exporter->DbUserColumn(); auto col_pid = exporter->RecordInt32("pid"); auto col_query_id = exporter->RecordInt64("query_id"); + auto col_parent_query_id = exporter->RecordInt64("parent_query_id"); auto col_cmd_type = exporter->DbOperationColumn(); auto col_rows = exporter->MetricUInt64("rows"); auto col_query = exporter->DbQueryTextColumn(); @@ -296,6 +297,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte col_username->Append(std::string(ev.username, ev.username_len)); col_pid->Append(ev.pid); col_query_id->Append(static_cast(ev.queryid)); + col_parent_query_id->Append(static_cast(ev.parent_query_id)); col_cmd_type->Append(PschCmdTypeToString(ev.cmd_type)); col_rows->Append(ev.rows); diff --git a/src/hooks/hooks.c b/src/hooks/hooks.c index 6d99d3a..8619625 100644 --- a/src/hooks/hooks.c +++ b/src/hooks/hooks.c @@ -48,21 +48,41 @@ static ExecutorEnd_hook_type prev_executor_end = NULL; static ProcessUtility_hook_type prev_process_utility = NULL; static emit_log_hook_type prev_emit_log_hook = NULL; -// Track nesting level to identify top-level queries -static int nesting_level = 0; - -// CPU time tracking via getrusage -static struct rusage rusage_start; +// nesting_level is the slot index of the currently-active frame: it moves +// only where frames move (PschExecutorStart++ / PschExecutorEnd--, and the +// same pair inside PschProcessUtility around its body). Run/Finish do not +// touch it. Tying push to the same point as the increment means every +// reader knows exactly what each slot represents — no offset +// reinterpretation by phase. +// +// nesting_level == -1 is the resting state (no query active). Push +// increments before writing; pop reads before decrementing. A negative +// nesting_level doubles as the "no active query" check used by +// TopQueryFrame/PopQueryFrame. +// +// parent_query_id is captured into the frame at push time, so any reader +// fetches frame->parent_query_id directly regardless of whether it's at an +// emit hook or inside the body via CaptureLogEvent. The top of the stack +// is uniformly slot[nesting_level]. +// +// PSCH_MAX_NESTING_DEPTH is a small fixed cap. Frames at greater depth are +// not written; the corresponding emit at that depth falls back to zero CPU +// and parent_query_id 0. PL/pgSQL recursion is capped well below 16 by +// PostgreSQL's max_stack_depth, so the cap is defensive against runaway +// recursion rather than a normal-case constraint. +#define PSCH_MAX_NESTING_DEPTH 16 +typedef struct PschQueryFrame { + uint64 queryid; + uint64 parent_query_id; // captured from the previous top at push time + struct rusage rusage_start; + TimestampTz query_start_ts; +} PschQueryFrame; +static PschQueryFrame query_stack[PSCH_MAX_NESTING_DEPTH]; +static int nesting_level = -1; // Deadlock prevention for emit_log_hook static bool disable_error_capture = false; -// Track whether the current query started at top level -static bool current_query_is_top_level = false; - -// Track query start time for duration calculation -static TimestampTz query_start_ts = 0; - // System initialization flag - set after hooks are installed and shmem is ready static bool system_init = false; @@ -311,18 +331,68 @@ static void InitEventPartial(PschEvent* event) { event->query[0] = '\0'; } -static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, bool top_level, +static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, uint64 parent_query_id, PschCmdType cmd_type) { InitEventPartial(event); event->ts_start = ts_start; event->dbid = MyDatabaseId; event->userid = GetUserId(); event->pid = MyProcPid; - event->top_level = top_level; + event->parent_query_id = parent_query_id; event->cmd_type = cmd_type; ResolveNames(event); } +// Push a frame for the about-to-execute query, returning the frame pointer +// (NULL if depth has exceeded the cap — we still bump nesting_level so the +// matching pop balances). parent_query_id is captured here from the +// previous top, so every later reader fetches it directly from the frame +// without reinterpreting slot offsets. +static PschQueryFrame* PushQueryFrame(uint64 queryid) { + nesting_level++; + if (nesting_level >= PSCH_MAX_NESTING_DEPTH) { + // Runaway nesting — log once per overflowing push so it's visible if it + // happens in practice, but stay non-fatal so the query itself still + // runs. The pop on this push is still balanced (nesting_level keeps + // incrementing past the cap). + elog(WARNING, + "pg_stat_ch: query nesting depth %d exceeds cap %d; CPU and " + "parent_query_id telemetry will be missing for this frame", + nesting_level, PSCH_MAX_NESTING_DEPTH); + return NULL; + } + PschQueryFrame* frame = &query_stack[nesting_level]; + frame->queryid = queryid; + frame->parent_query_id = (nesting_level > 0) ? query_stack[nesting_level - 1].queryid : 0; + frame->query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &frame->rusage_start); + return frame; +} + +// Return a pointer to the top frame (the currently-active query) without +// changing depth. NULL if the stack is empty or depth is past the cap. +static const PschQueryFrame* TopQueryFrame(void) { + if (nesting_level < 0 || nesting_level >= PSCH_MAX_NESTING_DEPTH) { + return NULL; + } + return &query_stack[nesting_level]; +} + +// Pop the top frame and return its still-valid data (slots are never zeroed +// — next push at the same depth overwrites). NULL if the stack was empty +// or its matching push had been past the cap. +static const PschQueryFrame* PopQueryFrame(void) { + if (nesting_level < 0) { + return NULL; + } + int top = nesting_level; + nesting_level--; + if (top >= PSCH_MAX_NESTING_DEPTH) { + return NULL; + } + return &query_stack[top]; +} + static void CopyClientContext(PschEvent* event) { event->application_name_len = (uint8)( GetApplicationName(event->application_name, sizeof(event->application_name))); @@ -412,10 +482,9 @@ static void CopyParallelWorkerInfo(PschEvent* event pg_attribute_unused(), #endif } -static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int64 cpu_user_us, - int64 cpu_sys_us) { - InitBaseEvent(event, query_start_ts, current_query_is_top_level, - ConvertCmdType(query_desc->operation)); +static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, TimestampTz start_ts, + uint64 parent_query_id, int64 cpu_user_us, int64 cpu_sys_us) { + InitBaseEvent(event, start_ts, parent_query_id, ConvertCmdType(query_desc->operation)); event->queryid = query_desc->plannedstmt->queryId; event->rows = query_desc->estate->es_processed; event->cpu_user_time_us = cpu_user_us; @@ -432,7 +501,7 @@ static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int CopyIoTiming(event, &query_desc->totaltime->bufusage); CopyWalUsage(event, &query_desc->totaltime->walusage); } else { - event->duration_us = (uint64)(GetCurrentTimestamp() - query_start_ts); + event->duration_us = (uint64)(GetCurrentTimestamp() - start_ts); } CopyJitInstrumentation(event, query_desc); @@ -498,37 +567,44 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { return; } - // Record if this is a top-level query (before nesting_level changes in Run) - if (nesting_level == 0) { - current_query_is_top_level = true; - query_start_ts = GetCurrentTimestamp(); - // Capture CPU time baseline for top-level queries - if (psch_enabled) { - getrusage(RUSAGE_SELF, &rusage_start); - } - } else { - current_query_is_top_level = false; - } + // Push our frame. The matching pop is in PschExecutorEnd. The PG_TRY + // wraps everything between push and function-return so any longjmp + // before End fires — including from InstrAlloc OOM — pops the frame on + // the unwind. Run/Finish use the same shape for the body. + PushQueryFrame(query_desc->plannedstmt->queryId); - if (prev_executor_start != NULL) { - prev_executor_start(query_desc, eflags); - } else { - standard_ExecutorStart(query_desc, eflags); - } + PG_TRY(); + { + if (prev_executor_start != NULL) { + prev_executor_start(query_desc, eflags); + } else { + standard_ExecutorStart(query_desc, eflags); + } - if (psch_enabled && query_desc->plannedstmt->queryId != UINT64CONST(0)) { - if (query_desc->totaltime == NULL) { - MemoryContext oldcxt = MemoryContextSwitchTo(query_desc->estate->es_query_cxt); + if (psch_enabled && query_desc->plannedstmt->queryId != UINT64CONST(0)) { + if (query_desc->totaltime == NULL) { + MemoryContext oldcxt = MemoryContextSwitchTo(query_desc->estate->es_query_cxt); #if PG_VERSION_NUM < 140000 - query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL); + query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL); #else - query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false); + query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false); #endif - MemoryContextSwitchTo(oldcxt); + MemoryContextSwitchTo(oldcxt); + } } } + PG_CATCH(); + { + PopQueryFrame(); + PG_RE_THROW(); + } + PG_END_TRY(); } +// Run does no nesting_level bookkeeping of its own — the frame was pushed +// in Start and will be popped in End on success. We only wrap the chain +// in PG_TRY/PG_CATCH so that if the body longjmps (the common error path), +// we pop nesting_level on the unwind and stay balanced. #if PG_VERSION_NUM >= 180000 static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint64 count) { #else @@ -536,6 +612,7 @@ static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint bool execute_once) { #endif if (IsParallelWorker()) { + // Parallel workers never pushed, so they don't pop and don't need PG_CATCH. #if PG_VERSION_NUM >= 180000 if (prev_executor_run != NULL) { prev_executor_run(query_desc, direction, count); @@ -552,7 +629,6 @@ static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint return; } - nesting_level++; PG_TRY(); { #if PG_VERSION_NUM >= 180000 @@ -569,8 +645,11 @@ static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint } #endif } - PG_FINALLY(); - { nesting_level--; } + PG_CATCH(); + { + PopQueryFrame(); + PG_RE_THROW(); + } PG_END_TRY(); } @@ -584,7 +663,8 @@ static void PschExecutorFinish(QueryDesc* query_desc) { return; } - nesting_level++; + // Finish does no nesting_level bookkeeping of its own; only PG_CATCH-pop + // on error so the unwind stays balanced. PG_TRY(); { if (prev_executor_finish != NULL) { @@ -593,13 +673,32 @@ static void PschExecutorFinish(QueryDesc* query_desc) { standard_ExecutorFinish(query_desc); } } - PG_FINALLY(); - { nesting_level--; } + PG_CATCH(); + { + PopQueryFrame(); + PG_RE_THROW(); + } PG_END_TRY(); } static void PschExecutorEnd(QueryDesc* query_desc) { - if (!psch_enabled || IsParallelWorker() || query_desc->plannedstmt->queryId == UINT64CONST(0)) { + if (IsParallelWorker()) { + // Parallel workers never pushed in Start, so don't pop here. + if (prev_executor_end != NULL) { + prev_executor_end(query_desc); + } else { + standard_ExecutorEnd(query_desc); + } + return; + } + + // Pop the frame Start pushed. Slot data remains valid through the + // returned pointer (slots are never zeroed), so we keep reading from it + // through the rest of this function. Null only if the matching push + // had been past the cap. + const PschQueryFrame* frame = PopQueryFrame(); + + if (!psch_enabled || query_desc->plannedstmt->queryId == UINT64CONST(0)) { if (prev_executor_end != NULL) { prev_executor_end(query_desc); } else { @@ -612,7 +711,14 @@ static void PschExecutorEnd(QueryDesc* query_desc) { InstrEndLoop(query_desc->totaltime); } - // Compute duration early for sampling filter + // start_ts falls back to "now" so the duration computation below yields + // ~0us if the frame is null, rather than ~25 years from subtracting the + // PG epoch. query_desc->totaltime almost always supplies a real + // duration from instrumentation; the fallback subtraction is only used + // when totaltime wasn't allocated. + TimestampTz start_ts = frame ? frame->query_start_ts : GetCurrentTimestamp(); + uint64 parent_query_id = frame ? frame->parent_query_id : 0; + uint64 duration_us; if (query_desc->totaltime != NULL) { #if PG_VERSION_NUM >= 190000 @@ -621,7 +727,7 @@ static void PschExecutorEnd(QueryDesc* query_desc) { duration_us = (uint64)(query_desc->totaltime->total * 1000000.0); #endif } else { - duration_us = (uint64)(GetCurrentTimestamp() - query_start_ts); + duration_us = (uint64)(GetCurrentTimestamp() - start_ts); } if (!ShouldSampleEvent(duration_us)) { @@ -633,17 +739,16 @@ static void PschExecutorEnd(QueryDesc* query_desc) { return; } - // Compute CPU time delta from getrusage int64 cpu_user_us = 0; int64 cpu_sys_us = 0; struct rusage rusage_end; - if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, rusage_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, rusage_start.ru_stime); + if (frame != NULL && getrusage(RUSAGE_SELF, &rusage_end) == 0) { + cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame->rusage_start.ru_utime); + cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); } PschEvent event; - BuildEventFromQueryDesc(query_desc, &event, cpu_user_us, cpu_sys_us); + BuildEventFromQueryDesc(query_desc, &event, start_ts, parent_query_id, cpu_user_us, cpu_sys_us); PschEnqueueEvent(&event); if (prev_executor_end != NULL) { @@ -655,10 +760,10 @@ static void PschExecutorEnd(QueryDesc* query_desc) { // Build a PschEvent for utility statements (no QueryDesc available) static void BuildEventForUtility(PschEvent* event, uint64 query_id, TimestampTz start_ts, - uint64 duration_us, bool is_top_level, uint64 rows, + uint64 duration_us, uint64 parent_query_id, uint64 rows, BufferUsage* bufusage, WalUsage* walusage, int64 cpu_user_us, int64 cpu_sys_us) { - InitBaseEvent(event, start_ts, is_top_level, PSCH_CMD_UTILITY); + InitBaseEvent(event, start_ts, parent_query_id, PSCH_CMD_UTILITY); event->queryid = query_id; event->duration_us = duration_us; event->rows = rows; @@ -728,21 +833,6 @@ static uint64 GetUtilityRowCount(QueryCompletion* qc) { } } -static void ExecuteUtilityWithNesting(PlannedStmt* pstmt, const char* queryString, -#if PG_VERSION_NUM >= 140000 - bool readOnlyTree, -#endif - ProcessUtilityContext context, ParamListInfo params, - QueryEnvironment* queryEnv, DestReceiver* dest, - QueryCompletion* qc) { - nesting_level++; - PG_TRY(); - { CALL_PROCESS_UTILITY(); } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); -} - // ProcessUtility hook - captures DDL and utility statements #if PG_VERSION_NUM >= 140000 static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, bool readOnlyTree, @@ -760,22 +850,24 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, return; } - // Capture state before execution - bool is_top_level = (nesting_level == 0); - TimestampTz start_ts = GetCurrentTimestamp(); + // Push our frame so any executor hooks fired from within this utility + // (e.g. the SELECT inside CREATE TABLE AS) read us as their parent. + // PushQueryFrame captures the rusage baseline and start timestamp into + // the frame; we use those for the emit below. The PG_TRY/PG_FINALLY + // around the body guarantees the pop on both success and longjmp. + const PschQueryFrame* frame = PushQueryFrame(pstmt->queryId); + uint64 query_id = pstmt->queryId; BufferUsage bufusage_start = pgBufferUsage; WalUsage walusage_start = pgWalUsage; - struct rusage rusage_util_start; - getrusage(RUSAGE_SELF, &rusage_util_start); instr_time start_time; INSTR_TIME_SET_CURRENT(start_time); -#if PG_VERSION_NUM >= 140000 - ExecuteUtilityWithNesting(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, qc); -#else - ExecuteUtilityWithNesting(pstmt, queryString, context, params, queryEnv, dest, qc); -#endif + PG_TRY(); + { CALL_PROCESS_UTILITY(); } + PG_FINALLY(); + { PopQueryFrame(); } + PG_END_TRY(); instr_time duration; INSTR_TIME_SET_CURRENT(duration); @@ -795,14 +887,20 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, int64 cpu_user_us = 0; int64 cpu_sys_us = 0; - struct rusage rusage_util_end; - if (getrusage(RUSAGE_SELF, &rusage_util_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_util_end.ru_utime, rusage_util_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_util_end.ru_stime, rusage_util_start.ru_stime); + struct rusage rusage_end; + if (frame != NULL && getrusage(RUSAGE_SELF, &rusage_end) == 0) { + cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame->rusage_start.ru_utime); + cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); } + // start_ts falls back to "now" so ts_start in the emitted event is at + // least approximately correct rather than the PG epoch. Matches the + // same fallback in PschExecutorEnd. + TimestampTz start_ts = frame ? frame->query_start_ts : GetCurrentTimestamp(); + uint64 parent_query_id = frame ? frame->parent_query_id : 0; + PschEvent event; - BuildEventForUtility(&event, query_id, start_ts, duration_us, is_top_level, + BuildEventForUtility(&event, query_id, start_ts, duration_us, parent_query_id, GetUtilityRowCount(qc), &bufusage_delta, &walusage_delta, cpu_user_us, cpu_sys_us); PschEnqueueEvent(&event); @@ -850,7 +948,18 @@ static bool ShouldCaptureLog(ErrorData* edata) { // message, SQLSTATE, and client/session metadata. static void CaptureLogEvent(ErrorData* edata) { PschEvent event; - InitBaseEvent(&event, GetCurrentTimestamp(), (nesting_level == 0), PSCH_CMD_UNKNOWN); + + // Top of the stack is whatever query is currently executing — Start + // pushed it, End/ProcessUtility haven't popped yet because we're still + // inside the body that fired this elog. Attribute this log event to the + // running query (queryid) and inherit its parent_query_id, captured at + // push time. + const PschQueryFrame* top = TopQueryFrame(); + uint64 running_query_id = top ? top->queryid : 0; + uint64 parent_query_id = top ? top->parent_query_id : 0; + + InitBaseEvent(&event, GetCurrentTimestamp(), parent_query_id, PSCH_CMD_UNKNOWN); + event.queryid = running_query_id; UnpackSqlState(edata->sqlerrcode, event.err_sqlstate); event.err_elevel = (uint8)(edata->elevel); diff --git a/src/queue/event.h b/src/queue/event.h index a617915..830fb9f 100644 --- a/src/queue/event.h +++ b/src/queue/event.h @@ -96,16 +96,16 @@ typedef struct PschEvent { uint64 duration_us; // Execution duration in microseconds // Identity - Oid dbid; // Database OID - Oid userid; // User OID - char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) - uint8 datname_len; // Actual length - char username[64]; // User name (NAMEDATALEN=64, resolved at capture) - uint8 username_len; // Actual length - int32 pid; // Backend process ID - uint64 queryid; // Query ID (from pg_stat_statements) - bool top_level; // True if this is a top-level query - PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) + Oid dbid; // Database OID + Oid userid; // User OID + char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) + uint8 datname_len; // Actual length + char username[64]; // User name (NAMEDATALEN=64, resolved at capture) + uint8 username_len; // Actual length + int32 pid; // Backend process ID + uint64 queryid; // Query ID (from pg_stat_statements) + uint64 parent_query_id; // queryid of the calling query (0 if top-level) + PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) // Results uint64 rows; // Number of rows affected/returned diff --git a/src/queue/ring_entry.h b/src/queue/ring_entry.h index faf1c4c..c3ce6dd 100644 --- a/src/queue/ring_entry.h +++ b/src/queue/ring_entry.h @@ -39,7 +39,7 @@ typedef struct PschRingEntry { uint8 username_len; int32 pid; uint64 queryid; - bool top_level; + uint64 parent_query_id; PschCmdType cmd_type; // === Results === diff --git a/t/028_parent_query_id.pl b/t/028_parent_query_id.pl new file mode 100644 index 0000000..ca4d9f5 --- /dev/null +++ b/t/028_parent_query_id.pl @@ -0,0 +1,204 @@ +#!/usr/bin/env perl +# Parent query id linkage: +# - top-level queries report parent_query_id = 0 +# - nested SPI queries report parent_query_id = outer's query_id +# - log events captured while a nested SPI query is on the stack report +# queryid = the running (inner) statement and parent_query_id = its +# outer caller — catches the CaptureLogEvent off-by-one. +# NOTE: we use RAISE WARNING for this rather than a caught ERROR. +# emit_log_hook does not fire from errfinish for ERROR-level events +# (errfinish PG_RE_THROWs without calling EmitErrorReport; emit_log_hook +# only fires from PostgresMain's top-level catch, after all frames have +# been popped — or never, for caught-in-EXCEPTION errors). WARNING +# goes through EmitErrorReport directly so the frame stack is intact +# when our hook runs. +# +# Filters key off distinctive table/function names — these survive query +# normalization, where string/numeric literals get replaced with $N +# placeholders and would not. + +use strict; +use warnings; +use lib 't'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use psch; + +if (!psch_clickhouse_available()) { + plan skip_all => 'Docker not available, skipping ClickHouse tests'; +} + +my $ch_check = `curl -s 'http://localhost:18123/' --data 'SELECT 1' 2>/dev/null`; +if ($ch_check !~ /^1/) { + plan skip_all => 'ClickHouse container not running. Start with: docker compose -f docker/docker-compose.test.yml up -d'; +} + +psch_query_clickhouse("TRUNCATE TABLE IF EXISTS pg_stat_ch.events_raw"); + +my $node = psch_init_node_with_clickhouse('parent_qid', + flush_interval_ms => 100, + batch_max => 100, +); + +# A dedicated table whose name appears verbatim in normalized query text. +$node->safe_psql('postgres', 'CREATE TABLE pqid_top_marker(x int)'); + +# Test 1: Top-level queries report parent_query_id = 0 +subtest 'top-level parent_query_id is 0' => sub { + psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); + psch_reset_stats($node); + + $node->safe_psql('postgres', 'SELECT * FROM pqid_top_marker'); + $node->safe_psql('postgres', 'SELECT count(*) FROM pqid_top_marker'); + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + my $matches = psch_wait_for_clickhouse_query( + "SELECT count() FROM pg_stat_ch.events_raw WHERE query LIKE '%pqid_top_marker%'", + sub { $_[0] >= 2 }, + 10, + ); + cmp_ok($matches, '>=', 2, 'top-level marker rows landed'); + + my $nonzero_parents = psch_query_clickhouse( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE query LIKE '%pqid_top_marker%' AND parent_query_id != 0" + ); + is($nonzero_parents, '0', 'top-level queries report parent_query_id = 0'); +}; + +# Test 2: Nested SPI queries report parent_query_id matching the outer's queryid +subtest 'nested SPI parent_query_id links to outer' => sub { + psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); + psch_reset_stats($node); + + $node->safe_psql('postgres', q{ + CREATE TABLE pqid_inner_marker(x int); + INSERT INTO pqid_inner_marker VALUES (1); + CREATE FUNCTION pqid_outer_caller() RETURNS int + LANGUAGE plpgsql AS $$ + DECLARE v int; + BEGIN + SELECT x INTO v FROM pqid_inner_marker; + RETURN v; + END$$; + }); + + psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); + psch_reset_stats($node); + + $node->safe_psql('postgres', 'SELECT pqid_outer_caller()'); + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + # Wait for both rows. + psch_wait_for_clickhouse_query( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE query LIKE '%pqid_outer_caller%' OR query LIKE '%pqid_inner_marker%'", + sub { $_[0] >= 2 }, + 10, + ); + + # Self-join: inner row's parent_query_id must equal outer row's query_id. + my $linked = psch_query_clickhouse(q{ + SELECT count() FROM pg_stat_ch.events_raw inner_q + JOIN pg_stat_ch.events_raw outer_q + ON inner_q.parent_query_id = outer_q.query_id + WHERE inner_q.query LIKE '%pqid_inner_marker%' + AND outer_q.query LIKE '%pqid_outer_caller%' + }); + cmp_ok($linked, '>=', 1, 'nested SPI parent_query_id matches outer query_id'); + + my $orphan = psch_query_clickhouse( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE query LIKE '%pqid_inner_marker%' AND parent_query_id = 0" + ); + is($orphan, '0', 'nested SPI query is not reported as top-level'); + + my $outer_self = psch_query_clickhouse( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE query LIKE '%pqid_outer_caller%' AND parent_query_id != 0" + ); + is($outer_self, '0', 'outer call still reports parent_query_id = 0'); +}; + +# Test 3: Log event captured inside a nested SPI query. +# queryid = the running (nested) query's id +# parent_query_id = the outer caller's id +# Catches the CaptureLogEvent off-by-one — the old code read the wrong +# slot and would have attributed the warning to the outer caller (or +# emitted query_id = 0) instead of the inner SPI statement. +subtest 'log event inside nested SPI links queryid -> outer' => sub { + $node->safe_psql('postgres', q{ + CREATE TABLE pqid_warn_tbl(x int); + INSERT INTO pqid_warn_tbl VALUES (1); + CREATE FUNCTION pqid_emit_warn(x int) RETURNS int + LANGUAGE plpgsql AS $$ + BEGIN + RAISE WARNING 'pqid_warn_marker' USING ERRCODE = '01001'; + RETURN x; + END$$; + CREATE FUNCTION pqid_warn_outer() RETURNS int + LANGUAGE plpgsql AS $$ + DECLARE v int; + BEGIN + SELECT pqid_emit_warn(x) INTO v FROM pqid_warn_tbl; + RETURN v; + END$$; + }); + + psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); + psch_reset_stats($node); + + $node->safe_psql('postgres', 'SELECT pqid_warn_outer()'); + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + # The log event itself carries no query text (CaptureLogEvent leaves + # query empty) — identify it via cmd_type=UNKNOWN + err_sqlstate=01001 + # (our RAISE WARNING's custom code). + psch_wait_for_clickhouse_query( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '01001'", + sub { $_[0] >= 1 }, + 10, + ); + + # parent_query_id of the warning must equal the outer caller's query_id. + my $warn_to_outer = psch_query_clickhouse(q{ + SELECT count() FROM pg_stat_ch.events_raw warn_q + JOIN pg_stat_ch.events_raw outer_q + ON warn_q.parent_query_id = outer_q.query_id + WHERE warn_q.cmd_type = 'UNKNOWN' + AND warn_q.err_sqlstate = '01001' + AND outer_q.query LIKE '%pqid_warn_outer%' + }); + cmp_ok($warn_to_outer, '>=', 1, + 'warning log event: parent_query_id = outer caller'); + + # query_id of the warning must equal the inner SPI statement's query_id + # (the running query), NOT the outer caller's query_id. + my $warn_to_inner = psch_query_clickhouse(q{ + SELECT count() FROM pg_stat_ch.events_raw warn_q + JOIN pg_stat_ch.events_raw inner_q + ON warn_q.query_id = inner_q.query_id + WHERE warn_q.cmd_type = 'UNKNOWN' + AND warn_q.err_sqlstate = '01001' + AND inner_q.query LIKE '%pqid_emit_warn%' + }); + cmp_ok($warn_to_inner, '>=', 1, + 'warning log event: query_id = inner SPI statement (the running query)'); + + # And those two must NOT be equal — running query is the child, parent + # is its caller. + my $self_parent = psch_query_clickhouse( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '01001' " + . " AND query_id = parent_query_id AND query_id != 0" + ); + is($self_parent, '0', + 'log event query_id and parent_query_id are distinct (no self-parent)'); +}; + +$node->stop(); +done_testing(); diff --git a/t/029_parent_query_id_arrow.pl b/t/029_parent_query_id_arrow.pl new file mode 100644 index 0000000..9e765d1 --- /dev/null +++ b/t/029_parent_query_id_arrow.pl @@ -0,0 +1,240 @@ +#!/usr/bin/env perl +# Parent query id linkage, verified through the OTel/Arrow export path +# (the production export pathway). No ClickHouse or OTel collector is +# needed: pg_stat_ch.debug_arrow_dump_dir captures each Arrow IPC batch +# to disk before the gRPC send (which we deliberately point at a +# non-existent collector so it fails harmlessly). See t/026_arrow_dump.pl +# for the same trick. +# +# What this test guards: +# 1. The Arrow IPC schema actually contains parent_query_id. An earlier +# version of #95 added the field to PschEvent and the +# ClickHouse-native exporter but missed arrow_batch.cc. That gap +# escaped because the only existing parent_query_id test +# (t/028, ClickHouse-native) didn't exercise the Arrow path. +# 2. Top-level queries report parent_query_id = 0. +# 3. Nested SPI queries report parent_query_id matching the outer's +# query_id. +# 4. Log events captured by emit_log_hook while a nested SPI query is on +# the stack carry query_id of the running (inner) statement and +# parent_query_id of the outer caller — catches the CaptureLogEvent +# off-by-one (would otherwise read the wrong slot). We use RAISE +# WARNING for this: emit_log_hook does not fire from errfinish for +# ERROR-level events (errfinish PG_RE_THROWs without calling +# EmitErrorReport; emit_log_hook only fires later in PostgresMain's +# top-level catch, after all frames have been popped — or never, for +# caught-in-EXCEPTION errors). WARNING goes through EmitErrorReport +# directly so the frame stack is intact when our hook runs. + +use strict; +use warnings; +use lib 't'; +use File::Temp qw(tempdir); + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use psch; + +if (system("uv --version >/dev/null 2>&1") != 0) { + plan skip_all => 'uv not installed (needed for inline pyarrow validation)'; +} + +my $dump_dir = tempdir('psch_pqid_arrow_XXXX', TMPDIR => 1, CLEANUP => 1); + +my $node = PostgreSQL::Test::Cluster->new('pqid_arrow'); +$node->init(); +$node->append_conf('postgresql.conf', qq{ +shared_preload_libraries = 'pg_stat_ch' +pg_stat_ch.enabled = on +pg_stat_ch.queue_capacity = 65536 +pg_stat_ch.flush_interval_ms = 100 +pg_stat_ch.batch_max = 100 +pg_stat_ch.use_otel = on +pg_stat_ch.otel_endpoint = 'localhost:14317' +pg_stat_ch.otel_arrow_passthrough = on +pg_stat_ch.debug_arrow_dump_dir = '$dump_dir' +pg_stat_ch.hostname = 'test-pqid-arrow-host' +}); +$node->start(); +$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_ch'); + +# Fixtures. Distinctive table/function names so we can filter on them +# in the Arrow dumps — names survive query normalization where literals +# do not. +$node->safe_psql('postgres', q{ + CREATE TABLE pqid_top_marker(x int); + + CREATE TABLE pqid_inner_marker(x int); + INSERT INTO pqid_inner_marker VALUES (1); + CREATE FUNCTION pqid_outer_caller() RETURNS int + LANGUAGE plpgsql AS $$ + DECLARE v int; + BEGIN + SELECT x INTO v FROM pqid_inner_marker; + RETURN v; + END$$; + + CREATE TABLE pqid_warn_tbl(x int); + INSERT INTO pqid_warn_tbl VALUES (1); + CREATE FUNCTION pqid_emit_warn(x int) RETURNS int + LANGUAGE plpgsql AS $$ + BEGIN + RAISE WARNING 'pqid_warn_marker' USING ERRCODE = '01001'; + RETURN x; + END$$; + CREATE FUNCTION pqid_warn_outer() RETURNS int + LANGUAGE plpgsql AS $$ + DECLARE v int; + BEGIN + SELECT pqid_emit_warn(x) INTO v FROM pqid_warn_tbl; + RETURN v; + END$$; +}); + +# Wait briefly for setup events to flush, then clear so the test queries +# we drive next are the only ones in the dump set. +$node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); +select(undef, undef, undef, 0.5); +unlink glob("$dump_dir/*.ipc"); +psch_reset_stats($node); + +# Drive the test queries. +$node->safe_psql('postgres', q{ + SELECT * FROM pqid_top_marker; + SELECT count(*) FROM pqid_top_marker; + SELECT pqid_outer_caller(); + SELECT pqid_warn_outer(); + SELECT pg_stat_ch_flush(); +}); + +# Wait for dump files. +my @ipc_files; +my $deadline = time() + 10; +while (time() < $deadline) { + @ipc_files = glob("$dump_dir/*.ipc"); + last if @ipc_files > 0; + select(undef, undef, undef, 0.2); +} +cmp_ok(scalar @ipc_files, '>=', 1, 'arrow IPC dumps were produced') + or BAIL_OUT('No Arrow IPC dumps; cannot verify parent_query_id'); + +# Inline pyarrow validator: parses all .ipc files, computes the +# assertions, and emits one KEY=VALUE line per result. Keeping the +# logic in one script avoids parsing Arrow IPC from Perl directly. +my $py = <<'PYEOF'; +# /// script +# requires-python = ">=3.10" +# dependencies = ["pyarrow"] +# /// +import glob, os, sys +import pyarrow.ipc as ipc + +dump_dir = sys.argv[1] +rows = [] +schema_has_pqid = False +for p in sorted(glob.glob(os.path.join(dump_dir, "*.ipc"))): + with open(p, "rb") as f: + reader = ipc.open_stream(f) + if reader.schema.get_field_index("parent_query_id") != -1: + schema_has_pqid = True + rows.extend(reader.read_all().to_pylist()) + +print(f"schema_has_parent_query_id={'1' if schema_has_pqid else '0'}") + +def has(text): + return [r for r in rows if text in (r.get("query_text") or "")] + +# Arrow dict-encodes query_id and parent_query_id as decimal *strings*; +# "0" means top-level / no parent. Compare against strings, not ints. +def is_zero(v): + return v is None or v == "" or v == "0" + +# Test 1: top-level +top = has("pqid_top_marker") +print(f"top_rows={len(top)}") +print(f"top_nonzero_parent={sum(1 for r in top if not is_zero(r.get('parent_query_id')))}") + +# Test 2: nested SPI. Filter down to the SELECT op so that setup CREATE +# TABLE / INSERT / DROP utility statements (which also mention the table) +# don't pollute the "no orphans" check. +outer = [r for r in has("pqid_outer_caller") if r.get("db_operation") == "SELECT"] +inner = [r for r in has("pqid_inner_marker") if r.get("db_operation") == "SELECT"] +outer_qids = {o.get('query_id') for o in outer if not is_zero(o.get('query_id'))} +inner_linked = sum(1 for r in inner if r.get('parent_query_id') in outer_qids) +print(f"outer_rows={len(outer)}") +print(f"inner_rows={len(inner)}") +print(f"inner_linked_to_outer={inner_linked}") +print(f"inner_zero_parent={sum(1 for r in inner if is_zero(r.get('parent_query_id')))}") +print(f"outer_nonzero_parent={sum(1 for r in outer if not is_zero(r.get('parent_query_id')))}") + +# Test 3: log event captured inside nested SPI. The warning event itself +# carries no query_text (CaptureLogEvent leaves it empty), so we identify +# it via err_sqlstate '01001' (our RAISE WARNING's custom code). +warn_outer = [r for r in has("pqid_warn_outer") if r.get("db_operation") == "SELECT"] +warn_inner = [r for r in has("pqid_emit_warn") if r.get("db_operation") == "SELECT"] +warn_outer_qids = {o.get('query_id') for o in warn_outer if not is_zero(o.get('query_id'))} +warn_inner_qids = {i.get('query_id') for i in warn_inner if not is_zero(i.get('query_id'))} +warn = [r for r in rows + if r.get("err_sqlstate") == "01001" and r.get("db_operation") in (None, "", "UNKNOWN")] +warn_linked_to_outer = sum(1 for r in warn + if r.get('parent_query_id') in warn_outer_qids) +warn_qid_is_inner = sum(1 for r in warn if r.get('query_id') in warn_inner_qids) +warn_qid_is_outer = sum(1 for r in warn if r.get('query_id') in warn_outer_qids) +warn_self_parent = sum(1 for r in warn + if not is_zero(r.get('query_id')) + and r.get('query_id') == r.get('parent_query_id')) +print(f"warn_rows={len(warn)}") +print(f"warn_linked_to_outer={warn_linked_to_outer}") +print(f"warn_qid_is_inner={warn_qid_is_inner}") +print(f"warn_qid_is_outer={warn_qid_is_outer}") +print(f"warn_self_parent={warn_self_parent}") +PYEOF + +my $script_path = "$dump_dir/_validate.py"; +open(my $fh, '>', $script_path) or die "Cannot write $script_path: $!"; +print $fh $py; +close $fh; + +my $raw = `uv run --quiet '$script_path' '$dump_dir' 2>&1`; +my %r; +for my $line (split /\n/, $raw) { + $r{$1} = $2 if $line =~ /^(\w+)=(.*)$/; +} +diag("pyarrow stdout:\n$raw") if $ENV{TEST_VERBOSE}; + +# Regression check: the schema itself must include parent_query_id. +is($r{schema_has_parent_query_id}, '1', + 'arrow_batch.cc schema includes parent_query_id column'); + +subtest 'top-level parent_query_id is 0' => sub { + cmp_ok($r{top_rows}, '>=', 1, 'top-level marker rows landed'); + is($r{top_nonzero_parent}, '0', 'top-level rows report parent_query_id = 0'); +}; + +subtest 'nested SPI parent_query_id links to outer' => sub { + cmp_ok($r{outer_rows}, '>=', 1, 'outer rows landed'); + cmp_ok($r{inner_rows}, '>=', 1, 'inner rows landed'); + cmp_ok($r{inner_linked_to_outer}, '>=', 1, + 'inner row joins outer via parent_query_id'); + is($r{inner_zero_parent}, '0', + 'nested SPI is not reported as top-level'); + is($r{outer_nonzero_parent}, '0', + 'outer call still reports parent_query_id = 0'); +}; + +subtest 'log event inside nested SPI links queryid -> outer' => sub { + cmp_ok($r{warn_rows}, '>=', 1, 'RAISE WARNING log event landed'); + cmp_ok($r{warn_linked_to_outer}, '>=', 1, + "log event parent_query_id = outer caller's query_id"); + cmp_ok($r{warn_qid_is_inner}, '>=', 1, + 'log event query_id = inner SPI statement (the running query)'); + is($r{warn_qid_is_outer}, '0', + 'log event query_id is NOT the outer caller (off-by-one regression)'); + is($r{warn_self_parent}, '0', + 'log event query_id != parent_query_id (no self-parent)'); +}; + +$node->stop(); +done_testing();