MDEV-39495 Parallel Query: Use temporary work tables to ship results: basic test#5176
MDEV-39495 Parallel Query: Use temporary work tables to ship results: basic test#5176mariadb-RexJohnston wants to merge 2 commits into
Conversation
Introduces parallel_worker_threads variable to control the number of worker threads created by a parallel execution query. 2 new files, sql_parallel_workers.h sql_parallel_workers.cc which contain structures for the creation, management and deletion of parallel worker threads (pwt_ in the name). Main management class created in the stack in JOIN::exec, implemented for the top level select. Current parallel_worker_thread_func sleeps for 10 seconds, generates a warning, signals the main thread, sleeps 10 seconds, signals the main thread again, sets it's finished flag and cleans it's THD. The main thread loops through worker threads, looking for finished thread and cleans them up if they have finished. It then waits for a signal, then processes it's message queue. Threads are registed in server_threads, so are visible in information_schema.processlist and the show processlist command. We check that a kill query on a parallel worker is passed onto it's manager and the query is properly aborted, and that a kill connection is handled properly in parallel_worker.test. Review input 1: cleanup earlier Do cleanup before we've finished sending the result to the client. This way, one can see the errors (and eventually warnings) marshalled back to the main thread and returned to the user: MariaDB [test]> set parallel_worker_threads=10; Query OK, 0 rows affected (0.001 sec) MariaDB [test]> select seq from seq_1_to_10; ERROR 4103 (HY000): Argument to the worker_busted_function() function does not belong to the range [0,1] Assisted by Sergei Petrunia and Claude Code.
There was a problem hiding this comment.
Code Review
This pull request implements parallel query execution support (Parallel Worker Threads) under MDEV-39492, introducing the parallel_worker_threads system variable, worker thread management, row batching, and streaming mechanisms. The feedback highlights a critical memory management issue where pwt_management inherits from Sql_alloc, potentially leading to resource leaks because its destructor is not automatically called. Additionally, there are opportunities to clean up redundant synchronization variables (LOCK_pwt_manager and COND_pwt_new_message), improve macOS compilation robustness by using the standard __APPLE__ macro, and ensure consistency by updating JSON EXPLAIN to reflect parallel scan states alongside the traditional EXPLAIN format.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| /* | ||
| Class to create, manage and eventually destroy a "team" of worker threads. | ||
| */ | ||
| class pwt_management : public Sql_alloc |
There was a problem hiding this comment.
Since pwt_management inherits from Sql_alloc, it is allocated on the mem_root. In MariaDB, C++ destructors of Sql_alloc objects are not automatically called when the mem_root is freed. If parallel_work_manager is not explicitly deleted via delete, its destructor ~pwt_management() will not run, which could lead to resource leaks (such as threads, mutexes, or heap-allocated memory) if finalize_parallel_workers is not called on all exit/error paths. Consider whether pwt_management needs to inherit from Sql_alloc. If it does, ensure it is explicitly deleted, or remove Sql_alloc inheritance and manage its lifetime using standard heap allocation or unique pointers.
| #elif defined(__apple_build_version__) | ||
| #include <sys/proc_info.h> | ||
| #define MAX_THREAD_NAME MAXTHREADNAMESIZE |
There was a problem hiding this comment.
The macro __apple_build_version__ is specific to Apple's Clang compiler. If MariaDB is compiled on macOS using a different compiler (such as GCC or upstream Clang), this macro will not be defined, and the build will fall back to the default MAX_THREAD_NAME 16 instead of using MAXTHREADNAMESIZE (which is 64). Using the standard platform macro __APPLE__ is more robust.
#elif defined(__APPLE__)
#include <sys/proc_info.h>
#define MAX_THREAD_NAME MAXTHREADNAMESIZE| mysql_cond_t COND_pwt_new_message; | ||
| mysql_mutex_t LOCK_pwt_manager; |
There was a problem hiding this comment.
LOCK_pwt_manager and COND_pwt_new_message are initialized, signaled, and destroyed, but they are never actually waited on anywhere in the code. The manager thread instead coordinates and waits using LOCK_data and COND_data_avail in parallel_scan_read_next. They can be safely removed to simplify the code and avoid unnecessary synchronization overhead.
| // signal manager there is something in the queue, | ||
| mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); | ||
| mysql_cond_signal(&worker->manager->COND_pwt_new_message); | ||
| mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); |
| // signal manager again to wake up and end this thread | ||
| mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); | ||
| mysql_cond_signal(&worker->manager->COND_pwt_new_message); | ||
| mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); |
| mysql_mutex_init(key_mutex_pwt_LOCK_manager, &LOCK_pwt_manager, | ||
| MY_MUTEX_INIT_SLOW); | ||
| mysql_mutex_init(key_mutex_pwt_LOCK_thread, &LOCK_pwt_thread, | ||
| MY_MUTEX_INIT_SLOW); | ||
| mysql_cond_init(key_COND_pwt_new_message, &COND_pwt_new_message, NULL); |
| mysql_cond_destroy(&COND_pwt_new_message); | ||
| mysql_mutex_destroy(&LOCK_pwt_manager); | ||
| mysql_mutex_destroy(&LOCK_pwt_thread); |
| mysql_cond_destroy(&COND_pwt_new_message); | ||
| mysql_mutex_destroy(&LOCK_pwt_manager); | ||
| mysql_mutex_destroy(&LOCK_pwt_thread); |
| if (type == JT_ALL && use_parallel_scan) | ||
| push_str(thd, &item_list, "PARALLEL"); | ||
| else | ||
| push_str(thd, &item_list, join_type_str[type]); |
There was a problem hiding this comment.
While traditional tabular EXPLAIN is updated to print "PARALLEL" when use_parallel_scan is true, JSON EXPLAIN (e.g., in Explain_table_access::print_explain_json or similar) does not seem to be updated to reflect this parallel scan state. This creates an inconsistency between traditional and JSON EXPLAIN formats. Please ensure that JSON EXPLAIN is also updated to output a property indicating that a parallel scan is being used (e.g., "parallel_scan": true or similar).
… basic test Built on top of MDEV-39492, the parallel worker manager creates N temporary tables and gives them to each parallel worker to populate. At the conclusion of all worker threads, each row from the above tables is added to the join_tab->table representing the query result. To use MariaDB [test]> set session parallel_worker_threads=10; Query OK, 0 rows affected (0.001 sec) MariaDB [test]> select SQL_BUFFER_RESULT * from t1 join seq_1_to_5 on t1.a = seq; +------+------+-----+ | a | b | seq | +------+------+-----+ | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | +------+------+-----+ 30 rows in set (0.010 sec) Each worker has read the first table with a JT_ALL access method and replicated the results back to the manager thread, which executes the rest of the join. In the above example there are 10 worker threads, so 10 copies of the join result.
4b746d0 to
2482c68
Compare
Overview
Building on MDEV-39492's worker-thread scaffolding, this commit potentially chooses a non-const table of a top-level SELECT to get scanned by N parallel worker threads instead of by the main thread. Workers ship the scanned rows to the main thread (manager) through per-worker temporary tables; the manager feeds those rows into the first join_tab and runs the rest of the join itself.
Because the parallel scan table is not yet partitioned, each of N workers scans the whole table, so the manager sees N copies of the row set - correct results currently require parallel_worker_threads=1. This is a deliberate checkpoint, not a bug.
Files: sql_parallel_workers.{h,cc} (the worker/manager machinery), sql_select.cc (eligibility, table creation, teardown), sql_explain.{cc,h} (carry use_parallel_scan into EXPLAIN), and the two test pairs.
Conditions for initiating a parallel scan
It's decided in four stages during optimization/execution:
- a real base table - s->tmp_table == NO_TMP_TABLE
- free of blob-backed columns - s->blob_fields == 0 (excludes BLOB/TEXT/GEOMETRY/JSON, whose payload lives off record[0])
- not the MERGE engine - file->ht->db_type != DB_TYPE_MRG_MYISAM (a bare open_table_from_share doesn't attach MERGE children → rnd_init asserts)
- not fulltext-searched - !table->fulltext_searched (a MATCH … AGAINST relevance is derived from handler state, not from record[0])
- select->quick - a range/quick access method a full scan wouldn't honour
- filesort - a sort that drives its own read of the table
- keep_current_rowid - the rowid is needed (DuplicateWeedout semijoin, rowid-ordered sort); injected rows carry no valid handler position
Temporary-table lifecycles
(A) The N per-worker batch tables
Created - JOIN::create_parallel_workers_tmp_tables() (sql_select.cc:4401), on the manager thread during optimization. Builds an Item_field list mirroring the source table's columns, then loops parallel_worker_threads times calling create_tmp_table(do_not_open) + instantiate_tmp_table(cross_thread=true). Crucially it temporarily overrides tmp_memory_table_size → max_heap_table_size (4435–4437) so the tables are the in-memory HEAP engine - an on-disk Aria batch table would charge its file growth to the worker's tmp_space_used (it's written by the worker but freed by the manager), tripping THD::free_connection(). The tables are stored in JOIN_TAB::parallel_tmp_tables on the post-join aggr tab (which becomes parallel_scan_join_tab).
Used - at execution, init_parallel_workers() hands each worker one table (workers[i].batch_table = parallel_tmp_tables.at(i), line 704). Then the channel runs:
So each batch table is touched by exactly one thread at a time (fill → drain → refill) - no per-row locking.
Destroy - JOIN::free_parallel_tmp_tables() (sql_select.cc:4484): free_tmp_table each + free the Dynamic_array. Called from JOIN::cleanup() on the normal path, and from every error path (the function's own internal-failure path, and create_postjoin_aggr_table's err: label) so they're never leaked even when later setup fails. They live for the whole statement.
(B) The post-join aggregation / result table
Not parallel-specific, but the commit changes its handling. Create: create_postjoin_aggr_table() builds it in result/projection format (not source format) and instantiate_tmp_tables it early ("so the parallel worker manager can write to it"), then sets tab->table and tab->aggr. Use: the manager's nested-loop join - driven by the worker rows injected into the first join_tab - writes result rows into it (sub_select_postjoin_aggr), which are then sent to the client. Destroy: free_tmp_table in
JOIN::cleanup (gated on tab->aggr); on an instantiate failure (before tab->table is set) it's freed via the err: label rather than leaked.
(Aside) The worker's private source table our_scan_table
Not a tmp table, but a per-worker TABLE: init_parallel_workers opens it with open_table_from_share from the shared TABLE_SHARE (its own handler, in_use repointed to the worker) so workers scan concurrently without a shared-scan lock; the worker thread closes it (close_worker_scan_table) before its THD is destroyed.
The added tests
main/parallel_query.test - worker lifecycle + KILL
Creates t1 from seq_1_to_2 (2 rows), then:
main/parallel_query_oom.test - diagnostic-queue OOM
With debug_dbug="+d,pwt_error_to_queue_oom" forcing error_to_queue() to fail and parallel_worker_threads=1, it runs SELECT SQL_BUFFER_RESULT * FROM t1. The worker's would-be warning/error can't be queued to the manager; SHOW WARNINGS confirms the manager instead surfaces a single ER_OUTOFMEMORY warning, so worker diagnostics aren't silently lost.
mysql-test/main/parallel_query_excluded.test - show examples of fully scanned tables execluded from parallel queries
An ordinary table is chosen; a table is rejected by the use_parallel_scan gate when it has a BLOB/TEXT (blob-backed) column, uses the MERGE engine, or is fulltext-searched (a MATCH ... AGAINST refers to it).