From c359e95fed0de1fce9a4967761fd1f2f9045f968 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 7 May 2026 14:33:31 -0500 Subject: [PATCH 1/9] Intern queued query text in shared HTAB to bound DSA usage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Without interning, every queued event owned a private DSA copy of the normalized query text — live DSA usage grew as `queued_events * query_len` and exhausted the bounded DSA pool well before the queue reached capacity. Repeated long normalized queries were the worst case. Add a shared, partition-locked HTAB whose entries point at refcount-managed DSA bodies, and route TryEnqueueLocked / PschDequeueEvent through it for query text. Live DSA usage drops to `distinct_live_query_texts * query_len`. Error messages stay per-event for now (separate optimization). The pattern mirrors pg_stat_statements (shared HTAB sized via hash_estimate_size + ShmemInitHash) and pgstat_shmem (refcounted DSA bodies freed only after the HTAB entry is removed). Adds t/032_query_intern.pl: 6000 EXECUTEs of a long normalized query through an 8MB DSA pool exit with dsa_oom_count == 0; the same workload without interning would push ~12MB through an 8MB pool and OOM. --- src/queue/query_intern.c | 322 +++++++++++++++++++++++++++++++++++++++ src/queue/query_intern.h | 86 +++++++++++ src/queue/shmem.c | 61 ++++++-- t/032_query_intern.pl | 106 +++++++++++++ 4 files changed, 563 insertions(+), 12 deletions(-) create mode 100644 src/queue/query_intern.c create mode 100644 src/queue/query_intern.h create mode 100644 t/032_query_intern.pl diff --git a/src/queue/query_intern.c b/src/queue/query_intern.c new file mode 100644 index 0000000..4ec7cfb --- /dev/null +++ b/src/queue/query_intern.c @@ -0,0 +1,322 @@ +// pg_stat_ch shared query-text interner. +// +// See query_intern.h for the architectural overview. This file implements +// the acquire/release lifecycle around a shared HTAB and the existing DSA +// area. The pattern mirrors pg_stat_statements (shared HTAB sized via +// hash_estimate_size and ShmemInitHash) and pgstat_shmem (HTAB entries point +// at variable-size DSA bodies, refcounts protect lifetime, the DSA body is +// freed only after the HTAB entry is removed). + +#include "postgres.h" + +#include + +#include "common/hashfn.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "utils/dsa.h" +#include "utils/hsearch.h" + +#include "config/guc.h" +#include "queue/psch_dsa.h" +#include "queue/query_intern.h" + +// Magic number stamped into every DSA-allocated query object. Used purely +// as a sanity check when resolving a dsa_pointer that came from a ring slot. +#define PSCH_QUERY_INTERN_MAGIC 0x51544348u // "QTCH" + +// Hash-table key. Combining (dbid, queryid, query_hash, query_len) makes +// accidental collisions exceedingly rare without requiring full text in the +// key. PostgreSQL usually assigns the same query_id to same-shaped queries +// so most lookups hit by query_id alone; the extra fields harden the key. +typedef struct PschQueryInternKey { + Oid dbid; + uint64 queryid; + uint64 query_hash; + uint16 query_len; +} PschQueryInternKey; + +// Hash-table entry stored in the shared HTAB. `key` must be first so dynahash +// can locate it from the entry pointer. The variable-size query bytes live +// in the DSA area; `object` points at a PschQueryInternObject there. +typedef struct PschQueryInternEntry { + PschQueryInternKey key; + dsa_pointer object; + uint32 refcount; +} PschQueryInternEntry; + +// DSA-resident body. Stamped with the magic + a copy of the key so the +// bgworker can recover the lock partition from a bare dsa_pointer. +typedef struct PschQueryInternObject { + PschQueryInternKey key; + uint32 magic; + char query[FLEXIBLE_ARRAY_MEMBER]; +} PschQueryInternObject; + +// Shared HTAB handle (process-local pointer to a struct that lives in shmem). +static HTAB* psch_query_intern_htab = NULL; + +// Base of the partition LWLock array (index 0 .. PSCH_QUERY_INTERN_PARTITIONS-1). +// Lives in MainLWLockArray inside the pg_stat_ch named tranche. Entries are +// cache-line padded (LWLockPadded), so iteration must go through the padded +// type rather than `LWLock *` arithmetic. +static LWLockPadded* psch_query_intern_locks = NULL; + +static long PschQueryInternMaxEntries(void) { + // There can never be more distinct live interned objects than there are + // queue slots: every interned entry must be referenced by at least one + // queued event (refcount >= 1). Sizing the HTAB at the queue capacity + // gives a tight, predictable upper bound. + return (long)psch_queue_capacity; +} + +Size PschQueryInternShmemSize(void) { + return hash_estimate_size(PschQueryInternMaxEntries(), sizeof(PschQueryInternEntry)); +} + +int PschQueryInternLockCount(void) { + return PSCH_QUERY_INTERN_PARTITIONS; +} + +void PschQueryInternShmemInit(void* lwlock_base) { + HASHCTL info; + + psch_query_intern_locks = (LWLockPadded*)lwlock_base; + + MemSet(&info, 0, sizeof(info)); + info.keysize = sizeof(PschQueryInternKey); + info.entrysize = sizeof(PschQueryInternEntry); + + psch_query_intern_htab = + ShmemInitHash("pg_stat_ch query intern", PschQueryInternMaxEntries(), + PschQueryInternMaxEntries(), &info, HASH_ELEM | HASH_BLOBS); +} + +static void MakeKey(PschQueryInternKey* key, Oid dbid, uint64 queryid, const char* query, + uint16 query_len) { + // Zero pad bytes so HASH_BLOBS hashing is deterministic. + MemSet(key, 0, sizeof(*key)); + key->dbid = dbid; + key->queryid = queryid; + key->query_hash = hash_any_extended((const unsigned char*)query, query_len, 0); + key->query_len = query_len; +} + +static LWLock* PartitionLockFor(uint64 query_hash) { + uint32 idx = (uint32)(query_hash & (PSCH_QUERY_INTERN_PARTITIONS - 1)); + return &psch_query_intern_locks[idx].lock; +} + +// Allocate a fresh DSA object holding the query text and key. +// Returns InvalidDsaPointer on DSA OOM (and bumps the OOM counter, mirroring +// PschDsaAllocString). +static dsa_pointer AllocInternObject(dsa_area* dsa, const PschQueryInternKey* key, + const char* query, uint16 query_len) { + Size alloc_size; + dsa_pointer dp; + PschQueryInternObject* obj; + + alloc_size = offsetof(PschQueryInternObject, query) + query_len + 1; + dp = dsa_allocate_extended(dsa, alloc_size, DSA_ALLOC_NO_OOM); + if (!DsaPointerIsValid(dp)) { + pg_atomic_fetch_add_u64(&psch_shared_state->dsa_oom_count, 1); + return InvalidDsaPointer; + } + + obj = (PschQueryInternObject*)dsa_get_address(dsa, dp); + obj->key = *key; + obj->magic = PSCH_QUERY_INTERN_MAGIC; + memcpy(obj->query, query, query_len); + obj->query[query_len] = '\0'; + return dp; +} + +// Compare a query text against the bytes already stored in an interned object. +// Returns true if the existing object holds exactly the same text. A false +// here on a hash hit is a (dbid, queryid, query_hash, query_len) collision — +// extremely unlikely but the caller must handle it. +static bool ObjectMatches(dsa_area* dsa, dsa_pointer dp, const char* query, uint16 query_len) { + PschQueryInternObject* obj = (PschQueryInternObject*)dsa_get_address(dsa, dp); + + if (obj->magic != PSCH_QUERY_INTERN_MAGIC) { + return false; + } + if (obj->key.query_len != query_len) { + return false; + } + return memcmp(obj->query, query, query_len) == 0; +} + +dsa_pointer PschQueryInternAcquire(Oid dbid, uint64 queryid, const char* query, uint16 query_len) { + PschQueryInternKey key; + LWLock* partition; + PschQueryInternEntry* entry; + dsa_area* dsa; + dsa_pointer new_dp; + bool found; + + if (query_len == 0 || psch_query_intern_htab == NULL) { + return InvalidDsaPointer; + } + + dsa = PschDsaGetArea(); + if (dsa == NULL) { + return InvalidDsaPointer; + } + + MakeKey(&key, dbid, queryid, query, query_len); + partition = PartitionLockFor(key.query_hash); + + // First lookup: fast path for a hit. + LWLockAcquire(partition, LW_EXCLUSIVE); + entry = (PschQueryInternEntry*)hash_search(psch_query_intern_htab, &key, HASH_FIND, NULL); + if (entry != NULL && ObjectMatches(dsa, entry->object, query, query_len)) { + entry->refcount++; + LWLockRelease(partition); + return entry->object; + } + if (entry != NULL) { + // Hash hit but bytes differ — collision. Treat as a miss; the safe + // fallback below will attempt to install our own entry, which can't + // happen because the slot is taken. Return InvalidDsaPointer so the + // caller exports empty query text rather than wrong SQL. + LWLockRelease(partition); + return InvalidDsaPointer; + } + LWLockRelease(partition); + + // Miss: allocate the new object outside the partition lock so we don't + // hold it across DSA work. + new_dp = AllocInternObject(dsa, &key, query, query_len); + if (!DsaPointerIsValid(new_dp)) { + return InvalidDsaPointer; + } + + // Re-lock partition and re-check. Another backend may have inserted the + // same key while we were allocating. + LWLockAcquire(partition, LW_EXCLUSIVE); + entry = (PschQueryInternEntry*)hash_search(psch_query_intern_htab, &key, HASH_ENTER_NULL, &found); + if (entry == NULL) { + // Hash table is full — back out the loser allocation and report miss. + LWLockRelease(partition); + dsa_free(dsa, new_dp); + return InvalidDsaPointer; + } + + if (found) { + dsa_pointer existing = entry->object; + + if (ObjectMatches(dsa, existing, query, query_len)) { + entry->refcount++; + LWLockRelease(partition); + dsa_free(dsa, new_dp); + return existing; + } + + // Lost the race AND the winner stored different bytes (collision). + // Don't disturb the winner; back out and report miss. + LWLockRelease(partition); + dsa_free(dsa, new_dp); + return InvalidDsaPointer; + } + + entry->object = new_dp; + entry->refcount = 1; + LWLockRelease(partition); + return new_dp; +} + +// Drop one reference to `ref`. Frees the DSA object when refcount hits zero. +// Caller has already copied any data it needs out of the object. +static void ReleaseRef(dsa_pointer ref) { + dsa_area* dsa; + PschQueryInternObject* obj; + PschQueryInternKey key; + LWLock* partition; + PschQueryInternEntry* entry; + dsa_pointer freed_dp = InvalidDsaPointer; + + if (!DsaPointerIsValid(ref) || psch_query_intern_htab == NULL) { + return; + } + + dsa = PschDsaGetArea(); + if (dsa == NULL) { + return; + } + + obj = (PschQueryInternObject*)dsa_get_address(dsa, ref); + if (obj->magic != PSCH_QUERY_INTERN_MAGIC) { + // Object was never an interned body (or was already freed). Don't + // touch the HTAB; just leak the pointer — better than corrupting it. + return; + } + + key = obj->key; + partition = PartitionLockFor(key.query_hash); + + LWLockAcquire(partition, LW_EXCLUSIVE); + entry = (PschQueryInternEntry*)hash_search(psch_query_intern_htab, &key, HASH_FIND, NULL); + if (entry != NULL && entry->object == ref) { + Assert(entry->refcount > 0); + entry->refcount--; + if (entry->refcount == 0) { + freed_dp = entry->object; + hash_search(psch_query_intern_htab, &key, HASH_REMOVE, NULL); + } + } + LWLockRelease(partition); + + // Free outside the partition lock so we don't hold it across DSA work. + if (DsaPointerIsValid(freed_dp)) { + dsa_free(dsa, freed_dp); + } +} + +void PschQueryInternResolveAndRelease(dsa_pointer ref, char* dst, uint16 dst_size, + uint16* out_len) { + dsa_area* dsa; + PschQueryInternObject* obj; + uint16 copy_len; + + if (!DsaPointerIsValid(ref) || dst == NULL || dst_size == 0 || out_len == NULL) { + if (dst != NULL && dst_size > 0) { + dst[0] = '\0'; + } + if (out_len != NULL) { + *out_len = 0; + } + return; + } + + dsa = PschDsaGetArea(); + if (dsa == NULL) { + dst[0] = '\0'; + *out_len = 0; + return; + } + + obj = (PschQueryInternObject*)dsa_get_address(dsa, ref); + if (obj->magic != PSCH_QUERY_INTERN_MAGIC) { + dst[0] = '\0'; + *out_len = 0; + return; + } + + // We hold a reference (the caller's slot), so the object cannot be freed + // underneath us during the copy. No partition lock is needed for the + // read itself. + copy_len = obj->key.query_len; + if (copy_len >= dst_size) { + copy_len = dst_size - 1; + } + memcpy(dst, obj->query, copy_len); + dst[copy_len] = '\0'; + *out_len = copy_len; + + ReleaseRef(ref); +} + +void PschQueryInternRelease(dsa_pointer ref) { + ReleaseRef(ref); +} diff --git a/src/queue/query_intern.h b/src/queue/query_intern.h new file mode 100644 index 0000000..f119b37 --- /dev/null +++ b/src/queue/query_intern.h @@ -0,0 +1,86 @@ +// pg_stat_ch shared query-text interner. +// +// Without interning, every queued event owns a private DSA copy of the +// normalized query text. For repeated long-running queries the live DSA +// footprint is `queued_events * query_len`, which exhausts the bounded DSA +// pool well before the queue fills. +// +// The interner deduplicates query text across queued events: each distinct +// (dbid, queryid, query_hash, query_len) tuple maps to a single DSA-allocated +// `PschQueryInternObject`. Queue slots store a `dsa_pointer` to that object +// and a refcount-managed shared HTAB tracks lifetime. Live DSA usage drops to +// `distinct_live_query_texts * query_len`. +// +// Layout in shared memory: +// - HTAB allocated from extension shmem (sized via hash_estimate_size). +// - Variable-length query bodies live inside the existing pg_stat_ch DSA +// area (same pool used for err_message strings). +// +// Concurrency: a small set of partitioned LWLocks (PSCH_QUERY_INTERN_PARTITIONS) +// protects the HTAB and per-entry refcount. The partition is selected from +// the query_hash so unrelated query texts contend on different locks. +#ifndef PG_STAT_CH_SRC_QUEUE_QUERY_INTERN_H_ +#define PG_STAT_CH_SRC_QUEUE_QUERY_INTERN_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "postgres.h" + +#include "utils/dsa.h" + +// Number of LWLock partitions guarding the interner HTAB. Power of two so +// the partition index can be derived from query_hash with a bitmask. +#define PSCH_QUERY_INTERN_PARTITIONS 32 + +// Returns the HTAB shmem requirement (hash_estimate_size for the interner) +// that callers must fold into their total RequestAddinShmemSpace reservation. +Size PschQueryInternShmemSize(void); + +// Returns the number of additional LWLocks required for interner HTAB. +int PschQueryInternLockCount(void); + +// Initialize the shared HTAB and bind the partition LWLocks. Must be called +// from PschShmemStartupHook with AddinShmemInitLock held, after the main +// PschSharedState has been initialized (so `psch_shared_state` is valid). +// +// `lwlock_base` must point at the first interner partition lock inside the +// pg_stat_ch named tranche (i.e. immediately after the main queue lock). +void PschQueryInternShmemInit(void* lwlock_base); + +// Acquire (or create) an intern reference for the given query text. +// +// On hit: increments the entry's refcount and returns the existing DSA +// pointer. On miss: allocates a new DSA object, inserts it with refcount=1, +// and returns the new DSA pointer. Returns InvalidDsaPointer if `len == 0`, +// the DSA handle is unavailable, DSA allocation fails, the shared HTAB is +// full, or a hash collision is detected against a different query text +// (collisions are treated as a miss with no insert — exporting empty query +// text is preferable to exporting the wrong SQL). +// +// Attaches lazily via PschDsaGetArea; returns InvalidDsaPointer if DSA is +// unavailable. +dsa_pointer PschQueryInternAcquire(Oid dbid, uint64 queryid, const char* query, uint16 query_len); + +// Resolve `ref` into the caller's buffer and drop one reference. +// +// Copies up to `dst_size - 1` bytes of the interned query into `dst` and +// null-terminates. Sets `*out_len` to the number of bytes copied. Then +// decrements the refcount; if the refcount reaches zero the entry is removed +// from the HTAB and the DSA object is freed. +// +// If `ref` is InvalidDsaPointer, sets `dst[0] = '\0'` and `*out_len = 0`. +// +// Called by the bgworker (single consumer) on every dequeued event. +void PschQueryInternResolveAndRelease(dsa_pointer ref, char* dst, uint16 dst_size, uint16* out_len); + +// Drop one reference without resolving the bytes. Used when an enqueue path +// has acquired a reference but decides not to publish the ring slot. +void PschQueryInternRelease(dsa_pointer ref); + +#ifdef __cplusplus +} +#endif + +#endif // PG_STAT_CH_SRC_QUEUE_QUERY_INTERN_H_ diff --git a/src/queue/shmem.c b/src/queue/shmem.c index e3c1a45..93f8181 100644 --- a/src/queue/shmem.c +++ b/src/queue/shmem.c @@ -42,6 +42,7 @@ #include "hooks/hooks.h" #include "queue/local_batch.h" #include "queue/psch_dsa.h" +#include "queue/query_intern.h" #include "queue/ring_entry.h" #include "queue/shmem.h" @@ -120,9 +121,23 @@ static bool TryEnqueueLocked(const PschEvent* event, uint32 capacity) { slot->err_message_len = 0; // Lost string on OOM — numeric data preserved } - slot->query_dsa = PschDsaAllocString(event->query, event->query_len, PSCH_MAX_QUERY_LEN); - if (event->query_len > 0 && !DsaPointerIsValid(slot->query_dsa)) { - slot->query_len = 0; // Lost string on OOM — numeric data preserved + // Query text goes through the shared interner so repeated identical + // normalized queries share a single DSA-allocated body. See query_intern.h + // for the design rationale. On miss + DSA OOM, miss + hash-full, or hash + // collision we drop the query bytes (numeric data is preserved). + if (event->query_len > 0) { + // Clamp the input length to what we'd have stored anyway, so the intern + // key doesn't include trailing bytes the consumer would have truncated. + uint16 clamped_len = Min(event->query_len, (uint16)(PSCH_MAX_QUERY_LEN - 1)); + slot->query_dsa = PschQueryInternAcquire(event->dbid, event->queryid, + event->query, clamped_len); + if (!DsaPointerIsValid(slot->query_dsa)) { + slot->query_len = 0; + } else { + slot->query_len = clamped_len; + } + } else { + slot->query_dsa = InvalidDsaPointer; } // CRITICAL: Memory barrier ensures the event data is written to shared memory @@ -147,9 +162,9 @@ static void PschShmemShutdown(int code pg_attribute_unused(), } } -Size PschShmemSize(void) { - // Layout: [PschSharedState] [PschRingEntry × capacity] [DSA area] - // See psch_dsa.h for diagram. DSA start is MAXALIGN'd for internal alignment. +// Size of the contiguous shmem block that ShmemInitStruct("pg_stat_ch", ...) +// allocates. Layout: [PschSharedState] [PschRingEntry × capacity] [DSA area]. +static Size PschSharedBlockSize(void) { Size ring_end = add_size(sizeof(PschSharedState), mul_size(psch_queue_capacity, sizeof(PschRingEntry))); Size dsa_offset = MAXALIGN(ring_end); @@ -157,9 +172,18 @@ Size PschShmemSize(void) { return MAXALIGN(total); } +Size PschShmemSize(void) { + // Total shmem requested from the postmaster: the shared block plus the + // interner HTAB (which ShmemInitHash carves out of the same pool). + return add_size(PschSharedBlockSize(), PschQueryInternShmemSize()); +} + static void RequestSharedResources(void) { RequestAddinShmemSpace(PschShmemSize()); - RequestNamedLWLockTranche("pg_stat_ch", 1); + // 1 main queue lock + N partition locks for the query-text interner, all + // in the single "pg_stat_ch" named tranche so we read them as a contiguous + // LWLockPadded[] from GetNamedLWLockTranche(). + RequestNamedLWLockTranche("pg_stat_ch", 1 + PschQueryInternLockCount()); } #if PG_VERSION_NUM >= 150000 @@ -174,7 +198,11 @@ static void PschShmemRequestHook(void) { // Initialize shared state fields on first-time setup. // Called with AddinShmemInitLock held. static void InitializeSharedState(void) { - psch_shared_state->lock = &(GetNamedLWLockTranche("pg_stat_ch"))->lock; + // The pg_stat_ch named tranche owns 1 main queue lock at index 0 and + // PSCH_QUERY_INTERN_PARTITIONS partition locks at indices 1..N for the + // query-text interner (see PschQueryInternShmemInit). + LWLockPadded* lwlocks = GetNamedLWLockTranche("pg_stat_ch"); + psch_shared_state->lock = &lwlocks[0].lock; psch_shared_state->capacity = psch_queue_capacity; pg_atomic_init_u64(&psch_shared_state->head, 0); pg_atomic_init_u64(&psch_shared_state->enqueued, 0); @@ -213,7 +241,12 @@ static void PschShmemStartupHook(void) { LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - psch_shared_state = (PschSharedState*)ShmemInitStruct("pg_stat_ch", PschShmemSize(), &found); + // ShmemInitStruct gets just the contiguous shared block (state + ring + DSA); + // the interner HTAB lives in a separately-named shmem segment allocated by + // PschQueryInternShmemInit -> ShmemInitHash, drawing from the same pool that + // RequestAddinShmemSpace(PschShmemSize()) reserved. + psch_shared_state = + (PschSharedState*)ShmemInitStruct("pg_stat_ch", PschSharedBlockSize(), &found); if (psch_shared_state == NULL) { LWLockRelease(AddinShmemInitLock); @@ -225,6 +258,10 @@ static void PschShmemStartupHook(void) { InitializeSharedState(); } + // Initialize (or attach to) the query intern HTAB. All backends get same handle. + LWLockPadded* lwlocks = GetNamedLWLockTranche("pg_stat_ch"); + PschQueryInternShmemInit(&lwlocks[1]); + LWLockRelease(AddinShmemInitLock); on_shmem_exit(PschShmemShutdown, 0); @@ -396,11 +433,11 @@ bool PschDequeueEvent(PschEvent* event) { // client_addr, lengths — everything before the variable-length data). memcpy(event, slot, kFixedPrefixSize); - // 2. Resolve err_message and query from DSA into PschEvent's inline buffers + // 2. Resolve err_message (per-event DSA) and query text (shared interner). PschDsaResolveString(slot->err_message_dsa, slot->err_message_len, event->err_message, PSCH_MAX_ERR_MSG_LEN, &event->err_message_len); - PschDsaResolveString(slot->query_dsa, slot->query_len, event->query, PSCH_MAX_QUERY_LEN, - &event->query_len); + PschQueryInternResolveAndRelease(slot->query_dsa, event->query, PSCH_MAX_QUERY_LEN, + &event->query_len); // CRITICAL: Write barrier ensures all reads and DSA frees complete before we // update tail. Producers cannot reuse this slot until tail advances past it. diff --git a/t/032_query_intern.pl b/t/032_query_intern.pl new file mode 100644 index 0000000..56a47f7 --- /dev/null +++ b/t/032_query_intern.pl @@ -0,0 +1,106 @@ +#!/usr/bin/env perl +# Test: Query-text interner deduplicates DSA storage across queued events. +# +# Without interning, every queued event owns a private DSA copy of the +# normalized query text — `queued_events * query_len` total live bytes. A +# tightly-bounded DSA pool exhausts well before the queue itself fills. +# +# With interning, repeated identical normalized queries share a single DSA +# body and live DSA usage collapses to `distinct_live_query_texts * query_len`. +# +# Strategy: +# 1. Configure an unreachable ClickHouse so the bgworker cannot drain the +# queue. Events accumulate. +# 2. Set `string_area_size = 8MB` (the minimum allowed) so the DSA pool is +# tight. 6000 × ~2KB unique copies would be ~12MB. +# 3. PREPARE/EXECUTE a single long normalized query that clamps near the +# 2047-byte truncation limit. +# 4. After many EXECUTEs, assert: +# - the queue actually filled with events (proves the path under test ran) +# - dsa_oom_count == 0 (proves storage was deduplicated, not duplicated) + +use strict; +use warnings; +use lib 't'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use psch; + +# --------------------------------------------------------------------------- +# Node config: tight DSA pool, large queue, unreachable ClickHouse. +# --------------------------------------------------------------------------- +my $node = PostgreSQL::Test::Cluster->new('query_intern'); +$node->init(); +$node->append_conf('postgresql.conf', qq{ +shared_preload_libraries = 'pg_stat_ch' +pg_stat_ch.enabled = on +pg_stat_ch.queue_capacity = 8192 +pg_stat_ch.string_area_size = 8MB +pg_stat_ch.flush_interval_ms = 60000 +pg_stat_ch.batch_max = 1000 +pg_stat_ch.clickhouse_host = '127.0.0.1' +pg_stat_ch.clickhouse_port = 1 +}); +$node->start(); +$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_ch'); + +psch_reset_stats($node); + +# --------------------------------------------------------------------------- +# Build a normalized query whose cached text clamps near 2047 bytes. +# +# Each "relname != 'tN'" condition is roughly 18 bytes after normalization +# (the literal becomes $N). ~140 conditions takes the normalized form well +# past 2KB so the cache clamps to PSCH_MAX_QUERY_LEN-1 = 2047 bytes — the +# worst case for per-event DSA cost. +# --------------------------------------------------------------------------- +my @conditions = map { "relname != 'table_$_'" } (1..140); +my $long_sql = "SELECT count(*) FROM pg_class WHERE " . + join(" AND ", @conditions); + +# Prepare the statement and execute it many times in one persistent backend. +my $exec_count = 6000; +my $session = $node->background_psql('postgres', on_error_stop => 1); +my (undef, $ret) = $session->query("PREPARE intern_test AS $long_sql"); +is($ret, 0, 'PREPARE long normalized query succeeds'); + +# Drive the EXECUTEs in chunks of multi-statement SQL. Sending all 6000 in +# one giant string is fine for psql, but chunking keeps the protocol output +# small enough to digest if anything goes wrong. Build each chunk as a single +# scalar (the `x` operator on a scalar repeats it) — passing a list to query() +# would only use the first element. +my $chunk_size = 1000; +my $chunk_sql = "EXECUTE intern_test;\n" x $chunk_size; +my $chunks = int($exec_count / $chunk_size); +for (my $i = 0; $i < $chunks; $i++) { + (undef, $ret) = $session->query($chunk_sql); + last if $ret != 0; +} +is($ret, 0, "EXECUTE intern_test x $exec_count succeeds"); +$session->quit(); + +my $stats = psch_get_stats($node); +note(sprintf("stats: enqueued=%s dropped=%s queue_size=%s dsa_oom=%s", + $stats->{enqueued}, $stats->{dropped}, + $stats->{queue_size}, $stats->{dsa_oom})); + +# --------------------------------------------------------------------------- +# Assertions +# +# - We expect at least 5000 enqueued events, proving EXECUTE actually drove +# the producer path (rather than every call being filtered out). +# - With interning in place, dsa_oom_count must be 0: 6000 events sharing a +# single ~2KB interned body fits comfortably in 8MB. Without interning, +# this same workload would push 12MB through an 8MB pool and OOM long +# before the queue filled. +# --------------------------------------------------------------------------- +cmp_ok($stats->{enqueued}, '>=', 5000, + 'queue captured the bulk of repeated EXECUTEs'); +is($stats->{dsa_oom}, 0, + 'interned repeated query text avoids DSA OOM under tight 8MB pool'); + +$node->stop(); +done_testing(); From d1005c8ccc8a6a1c8f1c193507874ab8f8e1905a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= <159546+serprex@users.noreply.github.com> Date: Tue, 26 May 2026 01:20:01 +0000 Subject: [PATCH 2/9] address cursor code review --- src/queue/query_intern.c | 43 +++++++++++++++++++++++++++++----------- src/queue/query_intern.h | 10 ++++++---- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/queue/query_intern.c b/src/queue/query_intern.c index 4ec7cfb..0afb111 100644 --- a/src/queue/query_intern.c +++ b/src/queue/query_intern.c @@ -86,10 +86,17 @@ void PschQueryInternShmemInit(void* lwlock_base) { MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(PschQueryInternKey); info.entrysize = sizeof(PschQueryInternEntry); - - psch_query_intern_htab = - ShmemInitHash("pg_stat_ch query intern", PschQueryInternMaxEntries(), - PschQueryInternMaxEntries(), &info, HASH_ELEM | HASH_BLOBS); + info.num_partitions = PSCH_QUERY_INTERN_PARTITIONS; + + // HASH_PARTITION is required when external partition locks guard concurrent + // mutation: it disables on-the-fly bucket splits and gives each partition + // its own spinlock-protected freelist. Our external partition lock for a + // given key is derived from dynahash's own hashcode (get_hash_value) so the + // external and internal partition agree, matching the LockTagHashCode / + // LockHashPartitionLock pattern in src/backend/storage/lmgr/lock.c. + psch_query_intern_htab = ShmemInitHash( + "pg_stat_ch query intern", PschQueryInternMaxEntries(), PschQueryInternMaxEntries(), &info, + HASH_ELEM | HASH_BLOBS | HASH_PARTITION); } static void MakeKey(PschQueryInternKey* key, Oid dbid, uint64 queryid, const char* query, @@ -102,8 +109,13 @@ static void MakeKey(PschQueryInternKey* key, Oid dbid, uint64 queryid, const cha key->query_len = query_len; } -static LWLock* PartitionLockFor(uint64 query_hash) { - uint32 idx = (uint32)(query_hash & (PSCH_QUERY_INTERN_PARTITIONS - 1)); +// Partition lock index must come from dynahash's own hashcode (the low-order +// bits, per dynahash.c "we expect callers to use the low-order bits of a +// lookup key's hash value as a partition number"). Using a different hash +// (e.g. our PschQueryInternKey.query_hash field) would mis-align the external +// lock with dynahash's internal partition and risk freelist/bucket corruption. +static LWLock* PartitionLockFor(uint32 hashcode) { + uint32 idx = hashcode & (PSCH_QUERY_INTERN_PARTITIONS - 1); return &psch_query_intern_locks[idx].lock; } @@ -149,6 +161,7 @@ static bool ObjectMatches(dsa_area* dsa, dsa_pointer dp, const char* query, uint dsa_pointer PschQueryInternAcquire(Oid dbid, uint64 queryid, const char* query, uint16 query_len) { PschQueryInternKey key; + uint32 hashcode; LWLock* partition; PschQueryInternEntry* entry; dsa_area* dsa; @@ -165,11 +178,13 @@ dsa_pointer PschQueryInternAcquire(Oid dbid, uint64 queryid, const char* query, } MakeKey(&key, dbid, queryid, query, query_len); - partition = PartitionLockFor(key.query_hash); + hashcode = get_hash_value(psch_query_intern_htab, &key); + partition = PartitionLockFor(hashcode); // First lookup: fast path for a hit. LWLockAcquire(partition, LW_EXCLUSIVE); - entry = (PschQueryInternEntry*)hash_search(psch_query_intern_htab, &key, HASH_FIND, NULL); + entry = (PschQueryInternEntry*)hash_search_with_hash_value(psch_query_intern_htab, &key, hashcode, + HASH_FIND, NULL); if (entry != NULL && ObjectMatches(dsa, entry->object, query, query_len)) { entry->refcount++; LWLockRelease(partition); @@ -195,7 +210,8 @@ dsa_pointer PschQueryInternAcquire(Oid dbid, uint64 queryid, const char* query, // Re-lock partition and re-check. Another backend may have inserted the // same key while we were allocating. LWLockAcquire(partition, LW_EXCLUSIVE); - entry = (PschQueryInternEntry*)hash_search(psch_query_intern_htab, &key, HASH_ENTER_NULL, &found); + entry = (PschQueryInternEntry*)hash_search_with_hash_value(psch_query_intern_htab, &key, hashcode, + HASH_ENTER_NULL, &found); if (entry == NULL) { // Hash table is full — back out the loser allocation and report miss. LWLockRelease(partition); @@ -232,6 +248,7 @@ static void ReleaseRef(dsa_pointer ref) { dsa_area* dsa; PschQueryInternObject* obj; PschQueryInternKey key; + uint32 hashcode; LWLock* partition; PschQueryInternEntry* entry; dsa_pointer freed_dp = InvalidDsaPointer; @@ -253,16 +270,18 @@ static void ReleaseRef(dsa_pointer ref) { } key = obj->key; - partition = PartitionLockFor(key.query_hash); + hashcode = get_hash_value(psch_query_intern_htab, &key); + partition = PartitionLockFor(hashcode); LWLockAcquire(partition, LW_EXCLUSIVE); - entry = (PschQueryInternEntry*)hash_search(psch_query_intern_htab, &key, HASH_FIND, NULL); + entry = (PschQueryInternEntry*)hash_search_with_hash_value(psch_query_intern_htab, &key, hashcode, + HASH_FIND, NULL); if (entry != NULL && entry->object == ref) { Assert(entry->refcount > 0); entry->refcount--; if (entry->refcount == 0) { freed_dp = entry->object; - hash_search(psch_query_intern_htab, &key, HASH_REMOVE, NULL); + hash_search_with_hash_value(psch_query_intern_htab, &key, hashcode, HASH_REMOVE, NULL); } } LWLockRelease(partition); diff --git a/src/queue/query_intern.h b/src/queue/query_intern.h index f119b37..a83e60d 100644 --- a/src/queue/query_intern.h +++ b/src/queue/query_intern.h @@ -16,9 +16,11 @@ // - Variable-length query bodies live inside the existing pg_stat_ch DSA // area (same pool used for err_message strings). // -// Concurrency: a small set of partitioned LWLocks (PSCH_QUERY_INTERN_PARTITIONS) -// protects the HTAB and per-entry refcount. The partition is selected from -// the query_hash so unrelated query texts contend on different locks. +// Concurrency: HASH_PARTITION enables dynahash's partitioned mode (per-partition +// freelists, no on-the-fly bucket splits) and PSCH_QUERY_INTERN_PARTITIONS +// LWLocks guard mutation. The partition index for both the external lock and +// dynahash's internal freelist is the low-order bits of get_hash_value(), so +// they always agree (same pattern as LockHashPartitionLock in lock.c). #ifndef PG_STAT_CH_SRC_QUEUE_QUERY_INTERN_H_ #define PG_STAT_CH_SRC_QUEUE_QUERY_INTERN_H_ @@ -31,7 +33,7 @@ extern "C" { #include "utils/dsa.h" // Number of LWLock partitions guarding the interner HTAB. Power of two so -// the partition index can be derived from query_hash with a bitmask. +// partition index can be derived from dynahash's get_hash_value() with bitmask. #define PSCH_QUERY_INTERN_PARTITIONS 32 // Returns the HTAB shmem requirement (hash_estimate_size for the interner) From c2331c40c348c855181630fffb4f15a13d13a599 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= <159546+serprex@users.noreply.github.com> Date: Tue, 26 May 2026 01:55:32 +0000 Subject: [PATCH 3/9] remove unused function --- src/queue/query_intern.c | 4 ---- src/queue/query_intern.h | 4 ---- 2 files changed, 8 deletions(-) diff --git a/src/queue/query_intern.c b/src/queue/query_intern.c index 0afb111..13c0b15 100644 --- a/src/queue/query_intern.c +++ b/src/queue/query_intern.c @@ -335,7 +335,3 @@ void PschQueryInternResolveAndRelease(dsa_pointer ref, char* dst, uint16 dst_siz ReleaseRef(ref); } - -void PschQueryInternRelease(dsa_pointer ref) { - ReleaseRef(ref); -} diff --git a/src/queue/query_intern.h b/src/queue/query_intern.h index a83e60d..356951d 100644 --- a/src/queue/query_intern.h +++ b/src/queue/query_intern.h @@ -77,10 +77,6 @@ dsa_pointer PschQueryInternAcquire(Oid dbid, uint64 queryid, const char* query, // Called by the bgworker (single consumer) on every dequeued event. void PschQueryInternResolveAndRelease(dsa_pointer ref, char* dst, uint16 dst_size, uint16* out_len); -// Drop one reference without resolving the bytes. Used when an enqueue path -// has acquired a reference but decides not to publish the ring slot. -void PschQueryInternRelease(dsa_pointer ref); - #ifdef __cplusplus } #endif From 19fe985f6f81dde1700b900b3cefbb673431bfe6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= <159546+serprex@users.noreply.github.com> Date: Tue, 2 Jun 2026 17:25:56 +0000 Subject: [PATCH 4/9] copilot minces words Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/queue/query_intern.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/queue/query_intern.c b/src/queue/query_intern.c index 13c0b15..3d230d6 100644 --- a/src/queue/query_intern.c +++ b/src/queue/query_intern.c @@ -191,10 +191,9 @@ dsa_pointer PschQueryInternAcquire(Oid dbid, uint64 queryid, const char* query, return entry->object; } if (entry != NULL) { - // Hash hit but bytes differ — collision. Treat as a miss; the safe - // fallback below will attempt to install our own entry, which can't - // happen because the slot is taken. Return InvalidDsaPointer so the - // caller exports empty query text rather than wrong SQL. + // Hash hit but bytes differ — collision. Don't attempt to insert a second + // entry for the same key; return InvalidDsaPointer so the caller exports + // empty query text rather than wrong SQL. LWLockRelease(partition); return InvalidDsaPointer; } From fed790d8d6c13a7fd79e9490ec71e673e459d735 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= <159546+serprex@users.noreply.github.com> Date: Tue, 2 Jun 2026 17:33:45 +0000 Subject: [PATCH 5/9] StaticAssert why not Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/queue/query_intern.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/queue/query_intern.h b/src/queue/query_intern.h index 356951d..1d567c2 100644 --- a/src/queue/query_intern.h +++ b/src/queue/query_intern.h @@ -35,6 +35,8 @@ extern "C" { // Number of LWLock partitions guarding the interner HTAB. Power of two so // partition index can be derived from dynahash's get_hash_value() with bitmask. #define PSCH_QUERY_INTERN_PARTITIONS 32 +StaticAssertDecl((PSCH_QUERY_INTERN_PARTITIONS & (PSCH_QUERY_INTERN_PARTITIONS - 1)) == 0, + "PSCH_QUERY_INTERN_PARTITIONS must be a power of two"); // Returns the HTAB shmem requirement (hash_estimate_size for the interner) // that callers must fold into their total RequestAddinShmemSpace reservation. From 0df7375e50d10f47c748a06eea504fcae2d2b37a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= <159546+serprex@users.noreply.github.com> Date: Tue, 2 Jun 2026 17:37:03 +0000 Subject: [PATCH 6/9] type safety in my C? it's more likely than you think --- src/queue/query_intern.c | 4 ++-- src/queue/query_intern.h | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/queue/query_intern.c b/src/queue/query_intern.c index 3d230d6..213dd73 100644 --- a/src/queue/query_intern.c +++ b/src/queue/query_intern.c @@ -78,10 +78,10 @@ int PschQueryInternLockCount(void) { return PSCH_QUERY_INTERN_PARTITIONS; } -void PschQueryInternShmemInit(void* lwlock_base) { +void PschQueryInternShmemInit(LWLockPadded* lwlock_base) { HASHCTL info; - psch_query_intern_locks = (LWLockPadded*)lwlock_base; + psch_query_intern_locks = lwlock_base; MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(PschQueryInternKey); diff --git a/src/queue/query_intern.h b/src/queue/query_intern.h index 1d567c2..e748e40 100644 --- a/src/queue/query_intern.h +++ b/src/queue/query_intern.h @@ -30,6 +30,7 @@ extern "C" { #include "postgres.h" +#include "storage/lwlock.h" #include "utils/dsa.h" // Number of LWLock partitions guarding the interner HTAB. Power of two so @@ -51,7 +52,7 @@ int PschQueryInternLockCount(void); // // `lwlock_base` must point at the first interner partition lock inside the // pg_stat_ch named tranche (i.e. immediately after the main queue lock). -void PschQueryInternShmemInit(void* lwlock_base); +void PschQueryInternShmemInit(LWLockPadded* lwlock_base); // Acquire (or create) an intern reference for the given query text. // From c1be19dc4b6ccd867ae861130197dbdbbf56bba1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= <159546+serprex@users.noreply.github.com> Date: Tue, 2 Jun 2026 17:41:27 +0000 Subject: [PATCH 7/9] die if copilot is ever right --- t/032_query_intern.pl | 1 + 1 file changed, 1 insertion(+) diff --git a/t/032_query_intern.pl b/t/032_query_intern.pl index 56a47f7..a1715be 100644 --- a/t/032_query_intern.pl +++ b/t/032_query_intern.pl @@ -73,6 +73,7 @@ # scalar (the `x` operator on a scalar repeats it) — passing a list to query() # would only use the first element. my $chunk_size = 1000; +die "exec_count must be a multiple of chunk_size" if $exec_count % $chunk_size; my $chunk_sql = "EXECUTE intern_test;\n" x $chunk_size; my $chunks = int($exec_count / $chunk_size); for (my $i = 0; $i < $chunks; $i++) { From 686c413eac7f7dfab9995504c96987ab6baf85b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= <159546+serprex@users.noreply.github.com> Date: Tue, 2 Jun 2026 17:42:30 +0000 Subject: [PATCH 8/9] explicit is better than implicit in your Perl? it's more likely than you think Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- t/032_query_intern.pl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/032_query_intern.pl b/t/032_query_intern.pl index a1715be..bb0c941 100644 --- a/t/032_query_intern.pl +++ b/t/032_query_intern.pl @@ -23,10 +23,10 @@ use warnings; use lib 't'; +use PostgreSQL::Test::BackgroundPsql; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; - use psch; # --------------------------------------------------------------------------- From ba3ee9203a07a252ba4b4f3dbfe6439b67202754 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= <159546+serprex@users.noreply.github.com> Date: Tue, 2 Jun 2026 19:12:34 +0000 Subject: [PATCH 9/9] be strict about C's unspecified padding --- src/queue/query_intern.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/queue/query_intern.c b/src/queue/query_intern.c index 213dd73..a7509cd 100644 --- a/src/queue/query_intern.c +++ b/src/queue/query_intern.c @@ -109,6 +109,14 @@ static void MakeKey(PschQueryInternKey* key, Oid dbid, uint64 queryid, const cha key->query_len = query_len; } +static void CopyKeyFields(PschQueryInternKey* dst, const PschQueryInternKey* src) { + MemSet(dst, 0, sizeof(*dst)); + dst->dbid = src->dbid; + dst->queryid = src->queryid; + dst->query_hash = src->query_hash; + dst->query_len = src->query_len; +} + // Partition lock index must come from dynahash's own hashcode (the low-order // bits, per dynahash.c "we expect callers to use the low-order bits of a // lookup key's hash value as a partition number"). Using a different hash @@ -136,7 +144,7 @@ static dsa_pointer AllocInternObject(dsa_area* dsa, const PschQueryInternKey* ke } obj = (PschQueryInternObject*)dsa_get_address(dsa, dp); - obj->key = *key; + CopyKeyFields(&obj->key, key); obj->magic = PSCH_QUERY_INTERN_MAGIC; memcpy(obj->query, query, query_len); obj->query[query_len] = '\0'; @@ -268,7 +276,7 @@ static void ReleaseRef(dsa_pointer ref) { return; } - key = obj->key; + CopyKeyFields(&key, &obj->key); hashcode = get_hash_value(psch_query_intern_htab, &key); partition = PartitionLockFor(hashcode);