Skip to content

MDEV-39495 Parallel Query: Use temporary work tables to ship results: basic test#5176

Open
mariadb-RexJohnston wants to merge 2 commits into
mainfrom
13.0-MDEV-39495
Open

MDEV-39495 Parallel Query: Use temporary work tables to ship results: basic test#5176
mariadb-RexJohnston wants to merge 2 commits into
mainfrom
13.0-MDEV-39495

Conversation

@mariadb-RexJohnston
Copy link
Copy Markdown
Member

@mariadb-RexJohnston mariadb-RexJohnston commented Jun 4, 2026

Overview

Building on MDEV-39492's worker-thread scaffolding, this commit potentially chooses a non-const table of a top-level SELECT to get scanned by N parallel worker threads instead of by the main thread. Workers ship the scanned rows to the main thread (manager) through per-worker temporary tables; the manager feeds those rows into the first join_tab and runs the rest of the join itself.

Because the parallel scan table is not yet partitioned, each of N workers scans the whole table, so the manager sees N copies of the row set - correct results currently require parallel_worker_threads=1. This is a deliberate checkpoint, not a bug.

Files: sql_parallel_workers.{h,cc} (the worker/manager machinery), sql_select.cc (eligibility, table creation, teardown), sql_explain.{cc,h} (carry use_parallel_scan into EXPLAIN), and the two test pairs.


Conditions for initiating a parallel scan

It's decided in four stages during optimization/execution:

  1. Per-table eligibility - get_best_combination() (sql_select.cc:13660). A join_tab is flagged use_parallel_scan= true only when its access type is JT_ALL (full scan, no usable key) and the table is:
    - a real base table - s->tmp_table == NO_TMP_TABLE
    - free of blob-backed columns - s->blob_fields == 0 (excludes BLOB/TEXT/GEOMETRY/JSON, whose payload lives off record[0])
    - not the MERGE engine - file->ht->db_type != DB_TYPE_MRG_MYISAM (a bare open_table_from_share doesn't attach MERGE children → rnd_init asserts)
    - not fulltext-searched - !table->fulltext_searched (a MATCH … AGAINST relevance is derived from handler state, not from record[0])
  2. Prune to one - JOIN::choose_parallel_scan() (called at sql_select.cc:2835, right after get_best_combination()). get_best_combination may set the flag on several full-scan tables; this keeps it on just the one with the largest stats.data_file_length and clears the rest, then records it in the optimizer trace as chosen_for_parallel_scan.
  3. Create the batch tables - create_postjoin_aggr_table() (sql_select.cc:4543) calls create_parallel_workers_tmp_tables(tab) when no parallel table is chosen yet, parallel_worker_threads > 0, there are non-const tables, and the first non-const table has use_parallel_scan && type == JT_ALL. This needs a post-join tmp table to exist, i.e. the query needs one (SQL_BUFFER_RESULT, GROUP BY, etc.).
  4. Final veto before spawning - pwt_management::init_parallel_workers() (sql_parallel_workers.cc:605). After locating the flagged scan table, it falls back to a serial scan (spawns no workers; the batch tables go unused) if that table ended up with:
    - select->quick - a range/quick access method a full scan wouldn't honour
    - filesort - a sort that drives its own read of the table
    - keep_current_rowid - the rowid is needed (DuplicateWeedout semijoin, rowid-ordered sort); injected rows carry no valid handler position

Temporary-table lifecycles

(A) The N per-worker batch tables

Created - JOIN::create_parallel_workers_tmp_tables() (sql_select.cc:4401), on the manager thread during optimization. Builds an Item_field list mirroring the source table's columns, then loops parallel_worker_threads times calling create_tmp_table(do_not_open) + instantiate_tmp_table(cross_thread=true). Crucially it temporarily overrides tmp_memory_table_size → max_heap_table_size (4435–4437) so the tables are the in-memory HEAP engine - an on-disk Aria batch table would charge its file growth to the worker's tmp_space_used (it's written by the worker but freed by the manager), tripping THD::free_connection(). The tables are stored in JOIN_TAB::parallel_tmp_tables on the post-join aggr tab (which becomes parallel_scan_join_tab).

Used - at execution, init_parallel_workers() hands each worker one table (workers[i].batch_table = parallel_tmp_tables.at(i), line 704). Then the channel runs:

  • Producer (worker_produce_chunks): owns the table, truncates it (ha_delete_all_rows), fills up to PWT_CHUNK_ROWS (64) rows - copying field-by-field from its private source scan (our_scan_table) via copy_field_with_null + ha_write_tmp_row - then handoff_batch() sets batch_full, signals the manager, and blocks until the manager has drained it; it then truncates and refills. Backpressure is intrinsic: one outstanding batch per worker.
  • Consumer (parallel_scan_read_next, installed as the first join_tab's read_first_record, line 781): drains one worker's table at a time (cur_worker) with ha_rnd_next, copying each row into the first join_tab's record[0]; on EOF it clears batch_full and signals the worker to refill, then picks the next ready worker.

So each batch table is touched by exactly one thread at a time (fill → drain → refill) - no per-row locking.

Destroy - JOIN::free_parallel_tmp_tables() (sql_select.cc:4484): free_tmp_table each + free the Dynamic_array. Called from JOIN::cleanup() on the normal path, and from every error path (the function's own internal-failure path, and create_postjoin_aggr_table's err: label) so they're never leaked even when later setup fails. They live for the whole statement.

(B) The post-join aggregation / result table

Not parallel-specific, but the commit changes its handling. Create: create_postjoin_aggr_table() builds it in result/projection format (not source format) and instantiate_tmp_tables it early ("so the parallel worker manager can write to it"), then sets tab->table and tab->aggr. Use: the manager's nested-loop join - driven by the worker rows injected into the first join_tab - writes result rows into it (sub_select_postjoin_aggr), which are then sent to the client. Destroy: free_tmp_table in
JOIN::cleanup (gated on tab->aggr); on an instantiate failure (before tab->table is set) it's freed via the err: label rather than leaked.

(Aside) The worker's private source table our_scan_table

Not a tmp table, but a per-worker TABLE: init_parallel_workers opens it with open_table_from_share from the shared TABLE_SHARE (its own handler, in_use repointed to the worker) so workers scan concurrently without a shared-scan lock; the worker thread closes it (close_worker_scan_table) before its THD is destroyed.


The added tests

main/parallel_query.test - worker lifecycle + KILL

Creates t1 from seq_1_to_2 (2 rows), then:

  • parallel_worker_threads=3 → SELECT SQL_BUFFER_RESULT seq FROM t1 returns 6 rows (3×{1,2}) and 3 worker warnings; =10 → 20 rows. This demonstrates the N-copies prototype behaviour.
  • KILL QUERY path: a debug-sync (pwt_worker_pause_before_signal) pauses the workers; the default connection inspects information_schema.processlist (sees the 3 Parallel Worker n For Thread ID … rows plus the manager in state Reading data from parallel workers), issues KILL QUERY against a worker's id, releases the workers, waits for the manager to return to Sleep, and reaps → ER_QUERY_INTERRUPTED, proving a kill aimed at a worker is propagated to the manager and aborts the statement.
  • KILL (connection) path: same pause, but a plain KILL of a worker → the killee connection drops; reap → ER_QUERY_INTERRUPTED, connection dead.
  • Wrapped in count_sessions/wait_until_count_sessions to assert no sessions leak.

main/parallel_query_oom.test - diagnostic-queue OOM

With debug_dbug="+d,pwt_error_to_queue_oom" forcing error_to_queue() to fail and parallel_worker_threads=1, it runs SELECT SQL_BUFFER_RESULT * FROM t1. The worker's would-be warning/error can't be queued to the manager; SHOW WARNINGS confirms the manager instead surfaces a single ER_OUTOFMEMORY warning, so worker diagnostics aren't silently lost.

mysql-test/main/parallel_query_excluded.test - show examples of fully scanned tables execluded from parallel queries

An ordinary table is chosen; a table is rejected by the use_parallel_scan gate when it has a BLOB/TEXT (blob-backed) column, uses the MERGE engine, or is fulltext-searched (a MATCH ... AGAINST refers to it).

Introduces parallel_worker_threads variable to control the number
of worker threads created by a parallel execution query.

2 new files, sql_parallel_workers.h sql_parallel_workers.cc which
contain structures for the creation, management and deletion of
parallel worker threads (pwt_ in the name).  Main management
class created in the stack in JOIN::exec, implemented for the
top level select.

Current parallel_worker_thread_func sleeps for 10 seconds, generates
a warning, signals the main thread, sleeps 10 seconds, signals the
main thread again, sets it's finished flag and cleans it's THD.

The main thread loops through worker threads, looking for finished
thread and cleans them up if they have finished.
It then waits for a signal, then processes it's message queue.

Threads are registed in server_threads, so are visible in
information_schema.processlist and the show processlist command.

We check that a kill query on a parallel worker is passed onto it's
manager and the query is properly aborted, and that a kill connection
is handled properly in parallel_worker.test.

Review input 1: cleanup earlier

Do cleanup before we've finished sending the result to the client.
This way, one can see the errors (and eventually warnings) marshalled
back to the main thread and returned to the user:

MariaDB [test]> set parallel_worker_threads=10;
Query OK, 0 rows affected (0.001 sec)

MariaDB [test]> select seq from seq_1_to_10;
ERROR 4103 (HY000): Argument to the worker_busted_function() function does not belong to the range [0,1]

Assisted by Sergei Petrunia and Claude Code.
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements parallel query execution support (Parallel Worker Threads) under MDEV-39492, introducing the parallel_worker_threads system variable, worker thread management, row batching, and streaming mechanisms. The feedback highlights a critical memory management issue where pwt_management inherits from Sql_alloc, potentially leading to resource leaks because its destructor is not automatically called. Additionally, there are opportunities to clean up redundant synchronization variables (LOCK_pwt_manager and COND_pwt_new_message), improve macOS compilation robustness by using the standard __APPLE__ macro, and ensure consistency by updating JSON EXPLAIN to reflect parallel scan states alongside the traditional EXPLAIN format.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

/*
Class to create, manage and eventually destroy a "team" of worker threads.
*/
class pwt_management : public Sql_alloc
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Since pwt_management inherits from Sql_alloc, it is allocated on the mem_root. In MariaDB, C++ destructors of Sql_alloc objects are not automatically called when the mem_root is freed. If parallel_work_manager is not explicitly deleted via delete, its destructor ~pwt_management() will not run, which could lead to resource leaks (such as threads, mutexes, or heap-allocated memory) if finalize_parallel_workers is not called on all exit/error paths. Consider whether pwt_management needs to inherit from Sql_alloc. If it does, ensure it is explicitly deleted, or remove Sql_alloc inheritance and manage its lifetime using standard heap allocation or unique pointers.

Comment thread include/my_pthread.h
Comment on lines +634 to +636
#elif defined(__apple_build_version__)
#include <sys/proc_info.h>
#define MAX_THREAD_NAME MAXTHREADNAMESIZE
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The macro __apple_build_version__ is specific to Apple's Clang compiler. If MariaDB is compiled on macOS using a different compiler (such as GCC or upstream Clang), this macro will not be defined, and the build will fall back to the default MAX_THREAD_NAME 16 instead of using MAXTHREADNAMESIZE (which is 64). Using the standard platform macro __APPLE__ is more robust.

#elif defined(__APPLE__)
#include <sys/proc_info.h>
#define MAX_THREAD_NAME MAXTHREADNAMESIZE

Comment thread sql/sql_parallel_workers.h Outdated
Comment on lines +122 to +123
mysql_cond_t COND_pwt_new_message;
mysql_mutex_t LOCK_pwt_manager;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

LOCK_pwt_manager and COND_pwt_new_message are initialized, signaled, and destroyed, but they are never actually waited on anywhere in the code. The manager thread instead coordinates and waits using LOCK_data and COND_data_avail in parallel_scan_read_next. They can be safely removed to simplify the code and avoid unnecessary synchronization overhead.

Comment thread sql/sql_parallel_workers.cc Outdated
Comment on lines +437 to +440
// signal manager there is something in the queue,
mysql_mutex_lock(&worker->manager->LOCK_pwt_manager);
mysql_cond_signal(&worker->manager->COND_pwt_new_message);
mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Signaling COND_pwt_new_message under LOCK_pwt_manager is redundant because nothing waits on it. This block and its comment can be safely removed.

Comment thread sql/sql_parallel_workers.cc Outdated
Comment on lines +477 to +480
// signal manager again to wake up and end this thread
mysql_mutex_lock(&worker->manager->LOCK_pwt_manager);
mysql_cond_signal(&worker->manager->COND_pwt_new_message);
mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Signaling COND_pwt_new_message under LOCK_pwt_manager at thread exit is redundant because nothing waits on it. This block and its comment can be safely removed.

Comment thread sql/sql_parallel_workers.cc Outdated
Comment on lines +629 to +633
mysql_mutex_init(key_mutex_pwt_LOCK_manager, &LOCK_pwt_manager,
MY_MUTEX_INIT_SLOW);
mysql_mutex_init(key_mutex_pwt_LOCK_thread, &LOCK_pwt_thread,
MY_MUTEX_INIT_SLOW);
mysql_cond_init(key_COND_pwt_new_message, &COND_pwt_new_message, NULL);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since LOCK_pwt_manager and COND_pwt_new_message are unused, their initialization here can be removed.

    mysql_mutex_init(key_mutex_pwt_LOCK_thread, &LOCK_pwt_thread,
                     MY_MUTEX_INIT_SLOW);

Comment thread sql/sql_parallel_workers.cc Outdated
Comment on lines +824 to +826
mysql_cond_destroy(&COND_pwt_new_message);
mysql_mutex_destroy(&LOCK_pwt_manager);
mysql_mutex_destroy(&LOCK_pwt_thread);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since LOCK_pwt_manager and COND_pwt_new_message are unused, their destruction here can be removed.

  mysql_mutex_destroy(&LOCK_pwt_thread);

Comment thread sql/sql_parallel_workers.cc Outdated
Comment on lines +1133 to +1135
mysql_cond_destroy(&COND_pwt_new_message);
mysql_mutex_destroy(&LOCK_pwt_manager);
mysql_mutex_destroy(&LOCK_pwt_thread);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since LOCK_pwt_manager and COND_pwt_new_message are unused, their destruction here can be removed.

  mysql_mutex_destroy(&LOCK_pwt_thread);

Comment thread sql/sql_explain.cc
Comment on lines +1564 to +1567
if (type == JT_ALL && use_parallel_scan)
push_str(thd, &item_list, "PARALLEL");
else
push_str(thd, &item_list, join_type_str[type]);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While traditional tabular EXPLAIN is updated to print "PARALLEL" when use_parallel_scan is true, JSON EXPLAIN (e.g., in Explain_table_access::print_explain_json or similar) does not seem to be updated to reflect this parallel scan state. This creates an inconsistency between traditional and JSON EXPLAIN formats. Please ensure that JSON EXPLAIN is also updated to output a property indicating that a parallel scan is being used (e.g., "parallel_scan": true or similar).

… basic test

Built on top of MDEV-39492, the parallel worker manager creates N
temporary tables and gives them to each parallel worker to populate.
At the conclusion of all worker threads, each row from the above tables
is added to the join_tab->table representing the query result.  To use

MariaDB [test]> set session parallel_worker_threads=10;
Query OK, 0 rows affected (0.001 sec)

MariaDB [test]> select SQL_BUFFER_RESULT * from t1 join seq_1_to_5 on t1.a = seq;
+------+------+-----+
| a    | b    | seq |
+------+------+-----+
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
|    1 | one  |   1 |
|    2 | two  |   2 |
|    1 | un   |   1 |
+------+------+-----+
30 rows in set (0.010 sec)

Each worker has read the first table with a JT_ALL access method and
replicated the results back to the manager thread, which executes the
rest of the join.  In the above example there are 10 worker threads, so
10 copies of the join result.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

2 participants