From cdd860782bcb68614de9465983f7d902aab1a2a3 Mon Sep 17 00:00:00 2001 From: Rex Johnston Date: Wed, 6 May 2026 11:18:47 +1200 Subject: [PATCH 1/2] MDEV-39492 Parallel Query: Study how to create worker threads Introduces parallel_worker_threads variable to control the number of worker threads created by a parallel execution query. 2 new files, sql_parallel_workers.h sql_parallel_workers.cc which contain structures for the creation, management and deletion of parallel worker threads (pwt_ in the name). Main management class created in the stack in JOIN::exec, implemented for the top level select. Current parallel_worker_thread_func sleeps for 10 seconds, generates a warning, signals the main thread, sleeps 10 seconds, signals the main thread again, sets it's finished flag and cleans it's THD. The main thread loops through worker threads, looking for finished thread and cleans them up if they have finished. It then waits for a signal, then processes it's message queue. Threads are registed in server_threads, so are visible in information_schema.processlist and the show processlist command. We check that a kill query on a parallel worker is passed onto it's manager and the query is properly aborted, and that a kill connection is handled properly in parallel_worker.test. Review input 1: cleanup earlier Do cleanup before we've finished sending the result to the client. This way, one can see the errors (and eventually warnings) marshalled back to the main thread and returned to the user: MariaDB [test]> set parallel_worker_threads=10; Query OK, 0 rows affected (0.001 sec) MariaDB [test]> select seq from seq_1_to_10; ERROR 4103 (HY000): Argument to the worker_busted_function() function does not belong to the range [0,1] Assisted by Sergei Petrunia and Claude Code. --- include/my_pthread.h | 14 + libmysqld/CMakeLists.txt | 1 + mysql-test/main/mysqld--help.result | 4 + mysql-test/main/parallel_query.result | 63 ++ mysql-test/main/parallel_query.test | 107 +++ mysql-test/main/parallel_query_oom.result | 21 + mysql-test/main/parallel_query_oom.test | 33 + .../r/sysvars_server_notembedded.result | 10 + mysys/my_thread_name.cc | 5 - sql/CMakeLists.txt | 1 + sql/mysqld.cc | 12 +- sql/mysqld.h | 5 + sql/privilege.h | 2 + sql/sql_class.cc | 5 +- sql/sql_class.h | 5 +- sql/sql_parallel_workers.cc | 619 ++++++++++++++++++ sql/sql_parallel_workers.h | 117 ++++ sql/sql_select.cc | 27 + sql/sql_select.h | 3 + sql/sys_vars.cc | 10 + 20 files changed, 1054 insertions(+), 10 deletions(-) create mode 100644 mysql-test/main/parallel_query.result create mode 100644 mysql-test/main/parallel_query.test create mode 100644 mysql-test/main/parallel_query_oom.result create mode 100644 mysql-test/main/parallel_query_oom.test create mode 100644 sql/sql_parallel_workers.cc create mode 100644 sql/sql_parallel_workers.h diff --git a/include/my_pthread.h b/include/my_pthread.h index 6fe76c40cba54..0d64090701a05 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -624,6 +624,20 @@ typedef uint64 my_thread_id; */ #define MY_THREAD_ID_MAX UINT32_MAX +#ifdef _WIN32 +#define MAX_THREAD_NAME 256 +#elif defined(__linux__) +#define MAX_THREAD_NAME 16 +#elif defined(__FreeBSD__) || defined(__OpenBSD__) +#define MAX_THREAD_NAME 19 +#include +#elif defined(__apple_build_version__) +#include +#define MAX_THREAD_NAME MAXTHREADNAMESIZE +#else +#define MAX_THREAD_NAME 16 +#endif + extern void my_threadattr_global_init(void); extern my_bool my_thread_global_init(void); extern void my_thread_set_name(const char *); diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index f59354625b345..14f7a12381db9 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -72,6 +72,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ../sql/mf_iocache.cc ../sql/my_decimal.cc ../sql/net_serv.cc ../sql/opt_range.cc ../sql/opt_group_by_cardinality.cc + ../sql/sql_parallel_workers.cc ../sql/opt_rewrite_date_cmp.cc ../sql/opt_rewrite_remove_casefold.cc ../sql/opt_sargable_left.cc diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result index 17458f8cf56ba..c5314ead01525 100644 --- a/mysql-test/main/mysqld--help.result +++ b/mysql-test/main/mysqld--help.result @@ -1012,6 +1012,9 @@ The following specify which files/extra groups are read (specified before remain Cost of checking the row against the WHERE clause. Increasing this will have the optimizer to prefer plans with less row combinations + --parallel-worker-threads=# + Number of worker threads available for parallel query + execution. 0 means parallel execution is disabled --path=name Comma-separated list of schema names that defines the search order for stored routines --performance-schema @@ -2004,6 +2007,7 @@ optimizer-trace optimizer-trace-max-mem-size 1048576 optimizer-use-condition-selectivity 4 optimizer-where-cost 0.032 +parallel-worker-threads 0 path CURRENT_SCHEMA performance-schema FALSE performance-schema-accounts-size -1 diff --git a/mysql-test/main/parallel_query.result b/mysql-test/main/parallel_query.result new file mode 100644 index 0000000000000..3224f1fc85439 --- /dev/null +++ b/mysql-test/main/parallel_query.result @@ -0,0 +1,63 @@ +SET DEBUG_SYNC='RESET'; +# +# MDEV-39492 Parallel Query: Study how to create worker threads +# +set session parallel_worker_threads=3; +# we should currently see 3 warnings; +select seq from seq_1_to_2; +seq +1 +2 +Warnings: +Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +set session parallel_worker_threads=10; +# the 10th workers throws out an error +select seq from seq_1_to_2; +ERROR HY000: Argument to the worker_busted_function() function does not belong to the range [0,1] +set session parallel_worker_threads=0; +connect killee, localhost, root, , ; +# check that kill query on a parallel worker is passed to the manager +connection killee; +set session parallel_worker_threads=3; +SET @save_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; +select seq from seq_1_to_2;; +# now use the default connection to view/kill the thread group executing +# the parallel work +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +# wait for all 3 workers to hit our debug sync point +# then have a look at what is in our process list without thread ID +select USER, DB, STATE, SUBSTRING(INFO, 1, 32) +from information_schema.processlist +where (INFO regexp '.*parallel worker.*' or +STATE regexp '.*parallel worker.*') and +NOT INFO regexp '.*information_schema\.processlist.*'; +USER DB STATE SUBSTRING(INFO, 1, 32) +root test debug sync point: now Parallel Worker 3 For Thread ID +root test debug sync point: now Parallel Worker 2 For Thread ID +root test debug sync point: now Parallel Worker 1 For Thread ID +root test Reading data from parallel workers select seq from seq_1_to_2 +kill query ID; +# signal our workers to continue execution +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +# then wait for the manager thread to clean up and go back to sleep +# review error message on --reap +connection killee; +ERROR 70100: Query execution was interrupted +SET DEBUG_SYNC = "RESET"; +# save as above, but kill a worker with a simple kill and see the +# connection drop +select seq from seq_1_to_2;; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +kill ID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +connection killee; +ERROR 70100: Query execution was interrupted +# killee connection now dead, confirmed below +connection default; +SET GLOBAL debug_dbug = @save_dbug; +SET DEBUG_SYNC = "RESET"; diff --git a/mysql-test/main/parallel_query.test b/mysql-test/main/parallel_query.test new file mode 100644 index 0000000000000..218d6176add8f --- /dev/null +++ b/mysql-test/main/parallel_query.test @@ -0,0 +1,107 @@ +# +# Test KILL and KILL QUERY statements. +# + +-- source include/count_sessions.inc +-- source include/not_embedded.inc +# this will be used in the future -- source include/have_innodb.inc +-- source include/have_sequence.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +SET DEBUG_SYNC='RESET'; + +--disable_service_connection + +--echo # +--echo # MDEV-39492 Parallel Query: Study how to create worker threads +--echo # + +set session parallel_worker_threads=3; +--echo # we should currently see 3 warnings; +select seq from seq_1_to_2; + +set session parallel_worker_threads=10; +--echo # the 10th workers throws out an error +--error ER_ARGUMENT_OUT_OF_RANGE +select seq from seq_1_to_2; + +set session parallel_worker_threads=0; +connect (killee, localhost, root, , ); + +--echo # check that kill query on a parallel worker is passed to the manager + +--connection killee +let $id= `select connection_id()`; +set session parallel_worker_threads=3; + +# worker THDs don't inherit session vars, we are using a debug_sync_set_action +# to pause execution in our worker threads just after they have done something + +SET @save_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; + +--send select seq from seq_1_to_2; +--echo # now use the default connection to view/kill the thread group executing +--echo # the parallel work +--connection default +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +let $name= "Parallel Worker 1 For Thread ID $id"; +let $killID= `SELECT @kid:=ID from information_schema.processlist + where info like $name limit 1`; +--echo # wait for all 3 workers to hit our debug sync point +let $wait_condition= + select count(*) = 3 from information_schema.processlist + where state like 'debug sync%' and info like '%Thread ID $id'; +--source include/wait_condition.inc +--echo # then have a look at what is in our process list without thread ID +select USER, DB, STATE, SUBSTRING(INFO, 1, 32) + from information_schema.processlist + where (INFO regexp '.*parallel worker.*' or + STATE regexp '.*parallel worker.*') and + NOT INFO regexp '.*information_schema\.processlist.*'; +--replace_result $killID ID +eval kill query $killID; +--echo # signal our workers to continue execution +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +--echo # then wait for the manager thread to clean up and go back to sleep +let $wait_condition= + select count(*) = 1 from information_schema.processlist + where command = 'Sleep' and id = $id; +--source include/wait_condition.inc +#select * from information_schema.processlist; +--echo # review error message on --reap +--connection killee +--error ER_QUERY_INTERRUPTED +reap; + +SET DEBUG_SYNC = "RESET"; + +--echo # save as above, but kill a worker with a simple kill and see the +--echo # connection drop + +--send select seq from seq_1_to_2; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +let $name= "Parallel Worker 1 For Thread ID $id"; +let $killID= `SELECT @kid:=ID from information_schema.processlist + where info like $name limit 1`; +--replace_result $killID ID +eval kill $killID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +let $wait_condition= + select count(*) = 0 from information_schema.processlist + where command = 'Sleep' and id = $id; +--source include/wait_condition.inc +connection killee; +--disable_warnings +--error ER_QUERY_INTERRUPTED +reap; + +--echo # killee connection now dead, confirmed below + +connection default; + +SET GLOBAL debug_dbug = @save_dbug; +SET DEBUG_SYNC = "RESET"; + +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/main/parallel_query_oom.result b/mysql-test/main/parallel_query_oom.result new file mode 100644 index 0000000000000..c42bf2cf2abf7 --- /dev/null +++ b/mysql-test/main/parallel_query_oom.result @@ -0,0 +1,21 @@ +# +# MDEV-39492 Parallel Query: OOM in worker error_to_queue surfaces a +# single ER_OUTOFMEMORY warning so worker diagnostics aren't silently +# lost. +# +set @save_dbug=@@global.debug_dbug; +set global debug_dbug="+d,pwt_error_to_queue_oom"; +set session parallel_worker_threads=1; +# The prototype worker emits either a warning or my_error() depending on the +# parity of its thread id. Either path runs through error_to_queue; the +# DBUG injection forces both into the OOM branch, so neither the original +# warning nor the original error reaches the user. We expect just the +# manager-surfaced ER_OUTOFMEMORY warning. +select count(*) from seq_1_to_2; +count(*) +2 +show warnings; +Level Code Message +Warning 1037 Parallel worker diagnostics were dropped due to memory allocation failure +set global debug_dbug=@save_dbug; +set session parallel_worker_threads=default; diff --git a/mysql-test/main/parallel_query_oom.test b/mysql-test/main/parallel_query_oom.test new file mode 100644 index 0000000000000..6a8afd3266182 --- /dev/null +++ b/mysql-test/main/parallel_query_oom.test @@ -0,0 +1,33 @@ +# +# Test OOM handling in the parallel worker error/warning queue. +# + +-- source include/count_sessions.inc +-- source include/not_embedded.inc +-- source include/have_sequence.inc +-- source include/have_debug.inc + +--disable_service_connection + +--echo # +--echo # MDEV-39492 Parallel Query: OOM in worker error_to_queue surfaces a +--echo # single ER_OUTOFMEMORY warning so worker diagnostics aren't silently +--echo # lost. +--echo # + +set @save_dbug=@@global.debug_dbug; +set global debug_dbug="+d,pwt_error_to_queue_oom"; +set session parallel_worker_threads=1; + +--echo # The prototype worker emits either a warning or my_error() depending on the +--echo # parity of its thread id. Either path runs through error_to_queue; the +--echo # DBUG injection forces both into the OOM branch, so neither the original +--echo # warning nor the original error reaches the user. We expect just the +--echo # manager-surfaced ER_OUTOFMEMORY warning. +select count(*) from seq_1_to_2; +show warnings; + +set global debug_dbug=@save_dbug; +set session parallel_worker_threads=default; + +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result index 268c06632b8b4..573cb113d9efe 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result @@ -2992,6 +2992,16 @@ NUMERIC_BLOCK_SIZE NULL ENUM_VALUE_LIST NULL READ_ONLY NO COMMAND_LINE_ARGUMENT REQUIRED +VARIABLE_NAME PARALLEL_WORKER_THREADS +VARIABLE_SCOPE SESSION +VARIABLE_TYPE BIGINT UNSIGNED +VARIABLE_COMMENT Number of worker threads available for parallel query execution. 0 means parallel execution is disabled +NUMERIC_MIN_VALUE 0 +NUMERIC_MAX_VALUE 100 +NUMERIC_BLOCK_SIZE 1 +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT REQUIRED VARIABLE_NAME PATH VARIABLE_SCOPE SESSION VARIABLE_TYPE VARCHAR diff --git a/mysys/my_thread_name.cc b/mysys/my_thread_name.cc index 9f32dae269ab0..90418c5cd8825 100644 --- a/mysys/my_thread_name.cc +++ b/mysys/my_thread_name.cc @@ -20,12 +20,7 @@ #include #ifdef _WIN32 -#define MAX_THREAD_NAME 256 typedef HRESULT (*func_SetThreadDescription)(HANDLE,PCWSTR); -#elif defined(__linux__) -#define MAX_THREAD_NAME 16 -#elif defined(__FreeBSD__) || defined(__OpenBSD__) -#include #endif #if defined(HAVE_PSI_THREAD_INTERFACE) && !defined DBUG_OFF diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index e54e894e1d0fc..5ede64143d08a 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -115,6 +115,7 @@ SET (SQL_SOURCE ../sql-common/client_plugin.c opt_range.cc vector_mhnsw.cc opt_group_by_cardinality.cc + sql_parallel_workers.cc opt_rewrite_date_cmp.cc opt_rewrite_remove_casefold.cc opt_sargable_left.cc diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 1aa5e0cfe7ddf..67293cd2553e4 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -9911,6 +9911,7 @@ PSI_stage_info stage_starting= { 0, "starting", 0}; PSI_stage_info stage_waiting_for_flush= { 0, "Waiting for non trans tables to be flushed", 0}; PSI_stage_info stage_waiting_for_ddl= { 0, "Waiting for DDLs", 0}; PSI_stage_info stage_waiting_for_reset_master= { 0, "Waiting for a running RESET MASTER to complete", 0}; +PSI_stage_info stage_reading_data_from_parallel_worker= { 0, "Reading data from parallel workers", 0}; #ifdef WITH_WSREP // Additional Galera thread states @@ -10003,6 +10004,10 @@ PSI_memory_key key_memory_user_var_entry_value; PSI_memory_key key_memory_String_value; PSI_memory_key key_memory_WSREP; PSI_memory_key key_memory_trace_ddl_info; +PSI_memory_key key_memory_pwt_queued_event; +PSI_memory_key key_memory_pwt_error_message; +PSI_memory_key key_memory_pwt_workers; +PSI_memory_key key_memory_pwt_db; #ifdef HAVE_PSI_INTERFACE @@ -10144,7 +10149,8 @@ PSI_stage_info *all_server_stages[]= & stage_reading_semi_sync_ack, & stage_waiting_for_deadlock_kill, & stage_starting, - & stage_waiting_for_reset_master + & stage_waiting_for_reset_master, + & stage_reading_data_from_parallel_worker #ifdef WITH_WSREP , & stage_waiting_isolation, @@ -10256,6 +10262,9 @@ static PSI_memory_info all_server_memory[]= { &key_memory_trace_ddl_info, "TRACE_DDL_INFO", 0} }; + +extern void pwt_init_psi_keys(void); + /** Initialise all the performance schema instrumentation points used by the server. @@ -10342,6 +10351,7 @@ void init_server_psi_keys(void) stmt_info_rpl.m_flags= PSI_FLAG_MUTABLE; mysql_statement_register(category, &stmt_info_rpl, 1); #endif + pwt_init_psi_keys(); } #endif /* HAVE_PSI_INTERFACE */ diff --git a/sql/mysqld.h b/sql/mysqld.h index 9ee5bc33740e2..1264b0d4904a2 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -499,6 +499,10 @@ extern PSI_memory_key key_memory_Table_trigger_dispatcher; extern PSI_memory_key key_memory_native_functions; extern PSI_memory_key key_memory_WSREP; extern PSI_memory_key key_memory_trace_ddl_info; +extern PSI_memory_key key_memory_pwt_queued_event; +extern PSI_memory_key key_memory_pwt_error_message; +extern PSI_memory_key key_memory_pwt_workers; +extern PSI_memory_key key_memory_pwt_db; /* MAINTAINER: Please keep this list in order, to limit merge collisions. @@ -646,6 +650,7 @@ extern PSI_stage_info stage_slave_background_wait_request; extern PSI_stage_info stage_waiting_for_deadlock_kill; extern PSI_stage_info stage_starting; extern PSI_stage_info stage_waiting_for_reset_master; +extern PSI_stage_info stage_reading_data_from_parallel_worker; #ifdef WITH_WSREP // Additional Galera thread states extern PSI_stage_info stage_waiting_isolation; diff --git a/sql/privilege.h b/sql/privilege.h index b3791a8e3a2f5..80d9d3b80f254 100644 --- a/sql/privilege.h +++ b/sql/privilege.h @@ -452,6 +452,8 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MAX_CONNECT_ERRORS= // Was SUPER_ACL prior to 10.5.2 constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MAX_PASSWORD_ERRORS= CONNECTION_ADMIN_ACL; +constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_PARALLEL_WORKER_THREADS= + CONNECTION_ADMIN_ACL; // Was SUPER_ACL prior to 10.5.2 constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_PROXY_PROTOCOL_NETWORKS= CONNECTION_ADMIN_ACL; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 5423eb393573d..fa6068a382d09 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -714,7 +714,7 @@ const char *thd_where(THD *thd) THD::THD(my_thread_id id, bool is_wsrep_applier) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), - rli_fake(0), rgi_fake(0), rgi_slave(NULL), + rli_fake(0), rgi_fake(0), rgi_slave(NULL), pwt_worker_info(NULL), protocol_text(this), protocol_binary(this), initial_status_var(0), m_current_stage_key(0), m_psi(0), start_time(0), start_time_sec_part(0), in_sub_stmt(0), log_all_errors(0), @@ -5462,8 +5462,7 @@ void destroy_thd(MYSQL_THD thd) /** Create a THD that only has auxiliary functions - It will never be added to the global connection list - server_threads. It does not represent any client connection. + It does not represent any client connection. It should never be counted, because it will stall the shutdown. It is solely for engine's internal use, diff --git a/sql/sql_class.h b/sql/sql_class.h index 9897df33766b9..d34ce97369622 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -872,6 +872,7 @@ typedef struct system_variables ulong server_id; ulong session_track_transaction_info; ulong threadpool_priority; + ulong parallel_worker_threads; ulong vers_alter_history; /* deadlock detection */ @@ -3154,7 +3155,7 @@ enum class THD_WHERE const char *thd_where(THD *thd); - +class pwt_worker; /** @class THD @@ -3199,6 +3200,8 @@ class THD: public THD_count, /* this must be first */ /* Slave applier execution context */ rpl_group_info* rgi_slave; + pwt_worker *pwt_worker_info; + union { rpl_io_thread_info *rpl_io_info; rpl_sql_thread_info *rpl_sql_info; diff --git a/sql/sql_parallel_workers.cc b/sql/sql_parallel_workers.cc new file mode 100644 index 0000000000000..26b6e68fa5d9d --- /dev/null +++ b/sql/sql_parallel_workers.cc @@ -0,0 +1,619 @@ +/* + Copyright (c) 2026, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 + USA */ + +/** + @file + + Implementation of parallel worker threads (PWT) management and execution + logic. + + Contains + error_to_queue + push an error message onto our queue to send to the manager + PWT_error_handler + intercept error and warnings, queue them to the manager + parallel_worker_thread_func + Entry point for our worker threads + abort_worker + pwt_management::free_queue + helper for error conditions + pwt_management::init_parallel_workers + Initialise our parallel worker threads + pwt_init_psi_keys + initialize PSI keys + pwt_management::join_parallel_workers + process information from worker threads until they are finished +*/ + + +#include "sql_parallel_workers.h" +#include "debug_sync.h" + +#ifdef HAVE_PSI_INTERFACE +static PSI_thread_key key_thread_pwt; +static PSI_thread_info all_pwt_threads[]= +{ + { &key_thread_pwt, WORKER_NAME, PSI_FLAG_GLOBAL}, +}; + +static PSI_mutex_key key_mutex_pwt_LOCK_manager, + key_mutex_pwt_LOCK_thread, + key_mutex_pwt_LOCK_worker; +static PSI_mutex_info all_pwt_mutexes[]= +{ + { &key_mutex_pwt_LOCK_manager, "pwt_management::LOCK_pwt_manager", 0}, + { &key_mutex_pwt_LOCK_thread, "pwt_management::LOCK_pwt_thread", 0}, + { &key_mutex_pwt_LOCK_worker, "pwt_worker::LOCK_worker", 0}, +}; + +static PSI_cond_key key_COND_pwt_new_message, key_COND_pwt_worker; +static PSI_cond_info all_pwt_conds[]= +{ + { &key_COND_pwt_new_message, "pwt_management::COND_pwt_new_message", 0}, + { &key_COND_pwt_worker, "pwt_worker::COND_worker", 0}, +}; + +static PSI_memory_info all_pwt_memory[]= +{ + { &key_memory_pwt_queued_event, "pwt_queued_event", 0}, + { &key_memory_pwt_error_message, "pwt_error_message", 0}, + { &key_memory_pwt_workers, "pwt_management::workers", 0}, + { &key_memory_pwt_db, "pwt_worker::db", 0}, +}; +#endif /* HAVE_PSI_INTERFACE */ + + +/** + @brief + push an error message onto our queue to send to the manager + + @return + true an error occurred + false error or warning is queued +*/ + +bool error_to_queue(THD *thd, pwt_queued_event **event, uint error, + Sql_condition::enum_warning_level level, const char *msg) +{ + DBUG_EXECUTE_IF("pwt_error_to_queue_oom", + { *event= nullptr; return true; }); + *event= (pwt_queued_event*) my_malloc(key_memory_pwt_queued_event, + sizeof(pwt_queued_event), + MYF(0)); + if (!*event) + return true; + (*event)->error= (pwt_error_message*) my_malloc(key_memory_pwt_error_message, + sizeof(pwt_error_message), + MYF(0)); + if (!(*event)->error) + { + my_free(*event); + *event= nullptr; + return true; + } + (*event)->data= nullptr; + (*event)->error->level= level; + if (level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR) + (*event)->error->worker_errno= thd->killed_errno(); + else + (*event)->error->worker_errno= 0; + (*event)->error->code= error; + (*event)->error->message= (char *) my_malloc(key_memory_pwt_error_message, + strlen(msg)+1, + MYF(0)); + if (!(*event)->error->message) + { + my_free((*event)->error); + my_free(*event); + *event= nullptr; + return true; + } + strmake((*event)->error->message, msg, strlen(msg)); + return false; +} + + +class PWT_error_handler : public Internal_error_handler +{ +public: + bool handle_condition(THD *thd, + uint sql_errno, + const char* sql_state, + Sql_condition::enum_warning_level *level, + const char* msg, + Sql_condition ** cond_hdl) override + { + if (pwt_worker *worker= thd->pwt_worker_info) + { + pwt_queued_event *event; + if (error_to_queue(thd, &event, sql_errno, *level, msg)) + { + /* + Couldn't allocate the queued event. The worker THD's diagnostics + area is discarded when the worker exits, so flag the manager so it + can surface a single ER_OUTOFMEMORY warning to the user instead of + letting this condition vanish. + */ + mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); + worker->manager->messages_dropped= true; + mysql_mutex_unlock(&worker->manager->LOCK_pwt_thread); + mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); + mysql_cond_signal(&worker->manager->COND_pwt_new_message); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + return true; + } + mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); + worker->manager->parallel_messages.push_back(event); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_thread); + } + return true; // no further processing in worker thread + } + +}; + + +/** + @brief + Entry point for our worker threads, arg supplied by manager details what + needs to be run +*/ + +static void *parallel_worker_thread_func(void *arg) +{ + pwt_worker *worker= (pwt_worker*) arg; + struct timespec abs_timeout; + PSI_stage_info old_stage; + + PWT_error_handler error_handler; + abs_timeout.tv_nsec= 0; + /* + Set current_thd and thread local storage (my_thread_var) for our new THD + to ensure they have their own local objects/errors/warnings etc + */ + void *save= thd_attach_thd(worker->thd); + my_thread_set_name(worker->thd->connection_name.str); + THD_STAGE_INFO(worker->thd, stage_sending_data); + worker->thd->push_internal_handler(&error_handler); + + /* + START: in lieu of work, wait 1 seconds, push out an error or a warning, + wait another 1 seconds then exit + */ + abs_timeout.tv_sec= time(0)+1; + mysql_mutex_lock(&worker->LOCK_worker); + worker->thd->ENTER_COND(&worker->COND_worker, &worker->LOCK_worker, + &stage_sending_data, &old_stage); + mysql_cond_timedwait(&worker->COND_worker, &worker->LOCK_worker, + &abs_timeout); + worker->thd->EXIT_COND(&old_stage); + +#ifdef ENABLED_DEBUG_SYNC + /* + we can't sync on the managers or our THD, spin the whole thing about + and use the global signal pool, NO_CLEAR_EVENT is needed because we have + multiple workers and the wrong one will likely consume the signal. + */ + DBUG_EXECUTE_IF("pwt_worker_pause_before_signal", + DBUG_ASSERT(!debug_sync_set_action(worker->thd, STRING_WITH_LEN( + "now SIGNAL pwt_worker_paused WAIT_FOR pwt_worker_continue NO_CLEAR_EVENT" + )));); +#endif + + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + if (worker->thd->killed) + { + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + my_error(ER_QUERY_INTERRUPTED, MYF(0)); + goto worker_thread_exit; + } + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + + if (worker->parallel_scan_job) + push_warning(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, + "This is an example warning to show we can push a " + "warning from a worker thread to its manager "); + else + my_error(ER_ARGUMENT_OUT_OF_RANGE, MYF(0), "worker_busted_function()"); + + // signal manager there is something in the queue, + mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); + mysql_cond_signal(&worker->manager->COND_pwt_new_message); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + + abs_timeout.tv_sec= time(0)+5; + mysql_mutex_lock(&worker->LOCK_worker); + worker->thd->ENTER_COND(&worker->COND_worker, &worker->LOCK_worker, + &stage_sending_data, &old_stage); + mysql_cond_timedwait(&worker->COND_worker, &worker->LOCK_worker, + &abs_timeout); + worker->thd->EXIT_COND(&old_stage); + + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + if (worker->thd->killed) + { + my_error(ER_QUERY_INTERRUPTED, MYF(0)); + } + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + + // END: in lieu of work + +worker_thread_exit: + + // manager needs to see this as atomic + mysql_mutex_lock(&worker->LOCK_worker); + /* + LOCK_thd_kill is the canonical guard for thd->killed; a user-issued + KILL on this worker's thread_id goes through THD::awake() which holds + LOCK_thd_kill but not LOCK_worker, so we must nest both to get a + race-free snapshot for the manager. + Lock order matches join_parallel_workers(). + */ + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + worker->killed= worker->thd->killed; // save this flag, THD is destroyed + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + worker->thd->pop_internal_handler(); // maybe not needed + worker->finished= true; + THD *thd= worker->thd; + worker->thd= nullptr; + mysql_mutex_unlock(&worker->LOCK_worker); + + // signal manager again to wake up and end this thread + mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); + mysql_cond_signal(&worker->manager->COND_pwt_new_message); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + + /* + executing this sets my_thread_var to null, stopping our ability use + the normal mutex mechanisms, so we operate this outside the locked region + on a copy of our THD pointer + */ + thd_detach_thd(save); + server_threads.erase(thd); + destroy_background_thd(thd); + + return nullptr; +} + +/** + @brief + Abort this worker, called as part of an error condition + + The worker may already be tearing itself down: parallel_worker_thread_func + nulls worker->thd and destroys the THD under LOCK_worker. Take that lock + and only awake() if the worker hasn't yet entered its exit section; if + it has, the worker is on its way out and pthread_join will reap it. +*/ + +void abort_worker(pwt_worker *worker) +{ + mysql_mutex_lock(&worker->LOCK_worker); + if (worker->thd) + worker->thd->awake(ABORT_QUERY); + mysql_mutex_unlock(&worker->LOCK_worker); + pthread_join(worker->pthread, nullptr); + mysql_mutex_destroy(&worker->LOCK_worker); + mysql_cond_destroy(&worker->COND_worker); +} + + +/** + @brief + Free our message queue, discard the messages +*/ + +void pwt_management::free_queue() +{ + // process queue + if (!parallel_messages.head()) + return; + + mysql_mutex_lock(&LOCK_pwt_thread); + pwt_queued_event *event; + while ((event= parallel_messages.get())) + { + if (pwt_error_message *err= event->error) + { + my_free(err->message); + my_free(err); + } + if (event->data) + { + // TODO: free associated + } + my_free(event); + } + mysql_mutex_unlock(&LOCK_pwt_thread); +} + + +/** + @brief + Initialise our parallel worker threads, setting their own new THD objects. + Set up our mutexs for synchronization. + Register our new threads in server_threads. + + Called from the management thread for applicable queries at the top level. +*/ + +bool pwt_management::init_parallel_workers(THD *thd) +{ + bool result= false; + uint i= 0; + + if (const uint n= thd->variables.parallel_worker_threads) + { + workers= (pwt_worker *) my_malloc(key_memory_pwt_workers, + n * sizeof(pwt_worker), + MYF(MY_WME | MY_ZEROFILL)); + if (!workers) + return true; + + nworkers= n; + + mysql_mutex_init(key_mutex_pwt_LOCK_manager, &LOCK_pwt_manager, + MY_MUTEX_INIT_SLOW); + mysql_mutex_init(key_mutex_pwt_LOCK_thread, &LOCK_pwt_thread, + MY_MUTEX_INIT_SLOW); + mysql_cond_init(key_COND_pwt_new_message, &COND_pwt_new_message, NULL); + for (i= 0; i < n; i++) + { + workers[i].thd= create_background_thd(); + if (!workers[i].thd) + { + result= true; + goto cleanup_old_workers; + } + + workers[i].manager= this; + mysql_mutex_init(key_mutex_pwt_LOCK_worker, &workers[i].LOCK_worker, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_pwt_worker, &workers[i].COND_worker, nullptr); + workers[i].thd->system_thread= SYSTEM_THREAD_GENERIC; + size_t len= my_snprintf(workers[i].conn_name, MAX_THREAD_NAME, + WORKER_NAME); + workers[i].thd->connection_name.str= workers[i].conn_name; + workers[i].thd->connection_name.length= len; + workers[i].thd->security_ctx= thd->security_ctx; + workers[i].thd->set_command(thd->get_command()); + if (thd->db.str) + { + // explicit call in ~THD/THD::free_connection()/my_free, so we do this + workers[i].thd->db.str= (char*)my_malloc(key_memory_pwt_db, + thd->db.length+1, + MYF(0)); + if (!workers[i].thd->db.str) + { + result= true; + goto cleanup_db_string; + } + + strmake(const_cast(workers[i].thd->db.str), thd->db.str, + thd->db.length); + workers[i].thd->db.length= thd->db.length; + } + else + { + workers[i].thd->db.str= nullptr; + workers[i].thd->db.length= 0; + } + workers[i].thd->start_utime= thd->start_utime; + workers[i].thd->thread_id= next_thread_id(); + my_snprintf(workers[i].info, sizeof(workers[i].info), + WORKER_NAME " %u " CONNECTION_NAME_THREAD " %llu", + i+1, thd->thread_id); + workers[i].thd->query_string= CSET_STRING(workers[i].info, + strlen(workers[i].info), + workers[i].thd->query_charset()); + workers[i].thd->pwt_worker_info= workers+i; + workers[i].finished= workers[i].joined= false; + workers[i].killed= NOT_KILLED; + if ((i+1)%10) // determines error or warning in a deterministic way + workers[i].parallel_scan_job= (void*)0x1; + server_threads.insert(workers[i].thd); // +information_schema.processlist + + if (mysql_thread_create(key_thread_pwt, &workers[i].pthread, nullptr, + parallel_worker_thread_func, &workers[i])) + { + result= true; + goto cleanup_thread_create; + } + } + this->thd= thd; + return result; + } + else + return false; + +cleanup_thread_create: + server_threads.erase(workers[i].thd); + +cleanup_db_string: + /* + destroy_background_thd() requires current_thd to be NULL because it + re-attaches the background THD to this thread's TLS. We are running on + the user's query thread (current_thd == manager thd), so save/null/ + restore around the call. Mirrors the create_background_thd() pattern. + */ + { + THD *save_thd= current_thd; + set_current_thd(nullptr); + destroy_background_thd(workers[i].thd); + set_current_thd(save_thd); + } + mysql_mutex_destroy(&workers[i].LOCK_worker); + mysql_cond_destroy(&workers[i].COND_worker); + +cleanup_old_workers: + for (uint j= 0; j < i; j++) + abort_worker(workers+j); + free_queue(); + my_free(workers); + workers= nullptr; + nworkers= 0; + mysql_cond_destroy(&COND_pwt_new_message); + mysql_mutex_destroy(&LOCK_pwt_manager); + mysql_mutex_destroy(&LOCK_pwt_thread); + + return result; +} + +#ifdef HAVE_PSI_INTERFACE +void pwt_init_psi_keys(void) +{ + const char *category= "sql"; + int count; + count= array_elements(all_pwt_threads); + PSI_server->register_thread(category, all_pwt_threads, count); + count= array_elements(all_pwt_mutexes); + mysql_mutex_register(category, all_pwt_mutexes, count); + count= array_elements(all_pwt_conds); + mysql_cond_register(category, all_pwt_conds, count); + count= array_elements(all_pwt_memory); + mysql_memory_register(category, all_pwt_memory, count); +} +#endif + + +/** + @brief + Process data {errors, warnings, data, signals} from the workers. + + Currently this is called in the main thread after JOIN::exec_inner, but + this will need to be disassembled and integrated into the above (or vice + versa). +*/ + +void pwt_management::join_parallel_workers(THD *thd) +{ + bool all_done= false, workers_killed= false; + PSI_stage_info old_stage; + struct timespec wait_max; + wait_max.tv_nsec= 0; + int killed_from= -1; + + while (!all_done) + { + wait_max.tv_sec= time(0)+1; // wait 1s + mysql_mutex_lock(&LOCK_pwt_manager); + thd->ENTER_COND(&COND_pwt_new_message, &LOCK_pwt_manager, + &stage_reading_data_from_parallel_worker, &old_stage); + mysql_cond_timedwait(&COND_pwt_new_message, &LOCK_pwt_manager, &wait_max); + thd->EXIT_COND(&old_stage); + + all_done= true; + + // delete worker threads that are finished + for (uint i= 0; i < nworkers; i++) + { + if (workers[i].joined) // already done + continue; + mysql_mutex_lock(&workers[i].LOCK_worker); + if (workers[i].finished) + { + mysql_mutex_unlock(&workers[i].LOCK_worker); + if (workers[i].killed) + { + killed_from= i; + thd->awake(workers[i].killed); + } + pthread_join(workers[i].pthread, nullptr); + mysql_mutex_destroy(&workers[i].LOCK_worker); + mysql_cond_destroy(&workers[i].COND_worker); + workers[i].joined= true; + } + else + { + mysql_mutex_unlock(&workers[i].LOCK_worker); + all_done= false; + } + } + + if (thd->killed && !workers_killed) + { + // inform our workers that they are killed + for (uint i= 0; i < nworkers; i++) + { + if (workers[i].joined) + continue; + mysql_mutex_lock(&workers[i].LOCK_worker); + if (workers[i].finished) + { + mysql_mutex_unlock(&workers[i].LOCK_worker); + continue; + } + + if ((int)i != killed_from) + { + mysql_mutex_lock(&workers[i].thd->LOCK_thd_kill); + workers[i].thd->killed= thd->killed; + mysql_mutex_unlock(&workers[i].thd->LOCK_thd_kill); + mysql_cond_signal(&workers[i].COND_worker); + } + mysql_mutex_unlock(&workers[i].LOCK_worker); + } + workers_killed= true; + } + else + { + // process queue + bool surface_drop; + mysql_mutex_lock(&LOCK_pwt_thread); + surface_drop= messages_dropped; + messages_dropped= false; + pwt_queued_event *event; + while ((event= parallel_messages.get())) + { + if (pwt_error_message *err= event->error) + { + /* + set_overwrite_status to capture a message in our worker THD + TODO, look at getting rid of this if we can + */ + // thd->get_stmt_da()->set_overwrite_status(true); + if (err->level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR) + my_message_sql(err->code, err->message, MYF(0)); + else + push_warning(thd, err->level, err->code, err->message); + + my_free(err->message); + my_free(err); + } + if (event->data) + { + // process data from our worker thread + } + my_free(event); + } + mysql_mutex_unlock(&LOCK_pwt_thread); + + if (surface_drop) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, + ER_OUTOFMEMORY, + "Parallel worker diagnostics were dropped due " + "to memory allocation failure"); + } + } + + if (nworkers) + { + mysql_cond_destroy(&COND_pwt_new_message); + mysql_mutex_destroy(&LOCK_pwt_manager); + mysql_mutex_destroy(&LOCK_pwt_thread); + my_free(workers); + workers= nullptr; + } +} diff --git a/sql/sql_parallel_workers.h b/sql/sql_parallel_workers.h new file mode 100644 index 0000000000000..e7117e8155fc6 --- /dev/null +++ b/sql/sql_parallel_workers.h @@ -0,0 +1,117 @@ +#ifndef SQL_PARALLEL_WORKERS_H +#define SQL_PARALLEL_WORKERS_H + +#include "mariadb.h" +#include "sql_class.h" +#include "mysqld.h" +#include "sql_error.h" + +extern MYSQL_THD create_background_thd(); +extern void destroy_background_thd(MYSQL_THD thd); +extern void *thd_attach_thd(MYSQL_THD thd); +extern void thd_detach_thd(void *save); + +// PWT Parallel Worker Thread + + +/* + Message Types +*/ +class pwt_error_message +{ +public: + uint worker_errno; + uint code; + Sql_condition::enum_warning_level level; + char *message; +}; + +class pwt_data_message +{ +public: + TABLE *tmp_table; +}; + + +/* + Event type. Inherits ilink so it can live in an I_List. +*/ +class pwt_queued_event : public ilink +{ +public: + pwt_error_message *error; + pwt_data_message *data; +}; + +class pwt_management; + + +#define WORKER_NAME "Parallel Worker" +#define WORKER_ID_LENGTH 3 +#define WORKER_NAME_LENGTH 15 +#define CONNECTION_NAME_THREAD "For Thread ID" +#define CONNECTION_NAME_THREAD_LENGTH 13 +#define THREAD_ID_LENGTH 20 // ull can occupy 20 chars + +/* + Parallel Worker Thread specific attributes +*/ +class pwt_worker +{ +public: + THD *thd; + pwt_management *manager; + pthread_t pthread; + mysql_mutex_t LOCK_worker; + mysql_cond_t COND_worker; + char conn_name[MAX_THREAD_NAME+1]; + /* + This is displayed in information_schema.processlist.info + Currently "Parallel Worker {1..N} For Thread M" + */ + char info[WORKER_NAME_LENGTH+ + 1+WORKER_ID_LENGTH+1+ + CONNECTION_NAME_THREAD_LENGTH+ + 1+THREAD_ID_LENGTH+1]; + bool joined; + bool finished; + killed_state killed; + void *parallel_scan_job; +}; + + +/* + Class to create, manage and eventually destroy a "team" of worker threads. +*/ +class pwt_management : public Sql_alloc +{ +public: + pwt_worker *workers; + uint nworkers; + I_List parallel_messages; + mysql_cond_t COND_pwt_new_message; + mysql_mutex_t LOCK_pwt_manager; + mysql_mutex_t LOCK_pwt_thread; + THD *thd; + /* + Set under LOCK_pwt_thread when a worker fails to allocate a queued event. + The manager surfaces a single ER_OUTOFMEMORY warning so the user sees + that worker diagnostics were dropped instead of silently disappearing. + */ + bool messages_dropped; + pwt_management(): + workers(nullptr), + nworkers(0), + messages_dropped(false) + {} + ~pwt_management() + { + if (workers) + join_parallel_workers(current_thd); + } + bool init_parallel_workers(THD *thd); + void join_parallel_workers(THD *thd); + void free_queue(); +}; + +#endif diff --git a/sql/sql_select.cc b/sql/sql_select.cc index dbd3283c45c5c..376fde7b339b8 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -26,6 +26,7 @@ */ #include "mariadb.h" +#include "mysqld_error.h" #include "sql_priv.h" #include "unireg.h" #include "sql_select.h" @@ -70,6 +71,7 @@ #include "derived_handler.h" #include "opt_hints.h" #include "opt_group_by_cardinality.h" +#include "sql_parallel_workers.h" /* A key part number that means we're using a fulltext scan. @@ -4909,6 +4911,21 @@ int JOIN::exec() select_lex->select_number)) dbug_serve_apcs(thd, 1); ); + + // If we are a top level select statement + // TEMPORARY-PARALLEL-WORK-TEST: + if (!select_lex->outer_select() && thd->lex->sql_command == SQLCOM_SELECT && + !parallel_work_manager) + { + if (!(parallel_work_manager= new (thd->mem_root) pwt_management) || + parallel_work_manager->init_parallel_workers(thd)) + { + my_error(ER_INTERNAL_ERROR, MYF(0), + "Failed to initialize parallel work mgr"); + return 1; + } + } + ANALYZE_START_TRACKING(thd, &explain->time_tracker); res= exec_inner(); ANALYZE_STOP_TRACKING(thd, &explain->time_tracker); @@ -17316,6 +17333,16 @@ void JOIN::cleanup(bool full) if (full) have_query_plan= QEP_DELETED; + // TEMPORARY-PARALLEL-WORK-TEST: + if (parallel_work_manager) + { + // Finish work and delete the manager + if (parallel_work_manager->workers) + parallel_work_manager->join_parallel_workers(thd); + delete parallel_work_manager; + parallel_work_manager= NULL; + } + if (original_join_tab) { /* Free the original optimized join created for the group_by_handler */ diff --git a/sql/sql_select.h b/sql/sql_select.h index 2507a871a6005..0b6b0cfdb8156 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -33,6 +33,7 @@ #include "sql_update.h" #include "cset_narrowing.h" +#include "sql_parallel_workers.h" typedef struct st_join_table JOIN_TAB; /* Values in optimize */ @@ -1753,6 +1754,8 @@ class JOIN :public Sql_alloc */ Sql_cmd_dml *sql_cmd_dml; + pwt_management *parallel_work_manager{0}; + JOIN(THD *thd_arg, List &fields_arg, ulonglong select_options_arg, select_result *result_arg) :fields_list(fields_arg) diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index d6becc62d8632..c2580ad215f55 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2496,6 +2496,16 @@ Sys_slave_parallel_workers( ON_UPDATE(fix_slave_parallel_threads)); +static Sys_var_on_access_global +Sys_parallel_worker_threads( + "parallel_worker_threads", + "Number of worker threads available for parallel query execution. " + "0 means parallel execution is disabled", + SESSION_VAR(parallel_worker_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,100), DEFAULT(0), BLOCK_SIZE(1)); + + static bool check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var) { From 2482c68018f10149a9bcc07216c18fb9d0151480 Mon Sep 17 00:00:00 2001 From: Rex Johnston Date: Wed, 27 May 2026 15:13:46 +1200 Subject: [PATCH 2/2] MDEV-39495 Parallel Query: Use temporary work tables to ship results: basic test Built on top of MDEV-39492, the parallel worker manager creates N temporary tables and gives them to each parallel worker to populate. At the conclusion of all worker threads, each row from the above tables is added to the join_tab->table representing the query result. To use MariaDB [test]> set session parallel_worker_threads=10; Query OK, 0 rows affected (0.001 sec) MariaDB [test]> select SQL_BUFFER_RESULT * from t1 join seq_1_to_5 on t1.a = seq; +------+------+-----+ | a | b | seq | +------+------+-----+ | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | | 1 | one | 1 | | 2 | two | 2 | | 1 | un | 1 | +------+------+-----+ 30 rows in set (0.010 sec) Each worker has read the first table with a JT_ALL access method and replicated the results back to the manager thread, which executes the rest of the join. In the above example there are 10 worker threads, so 10 copies of the join result. --- mysql-test/main/parallel_query.result | 43 +- mysql-test/main/parallel_query.test | 13 +- .../main/parallel_query_excluded.result | 54 ++ mysql-test/main/parallel_query_excluded.test | 80 ++ mysql-test/main/parallel_query_oom.result | 7 +- mysql-test/main/parallel_query_oom.test | 4 +- sql/sql_explain.cc | 12 +- sql/sql_explain.h | 1 + sql/sql_parallel_workers.cc | 885 ++++++++++++++---- sql/sql_parallel_workers.h | 108 ++- sql/sql_select.cc | 236 ++++- sql/sql_select.h | 25 +- sql/sys_vars.cc | 3 +- 13 files changed, 1217 insertions(+), 254 deletions(-) create mode 100644 mysql-test/main/parallel_query_excluded.result create mode 100644 mysql-test/main/parallel_query_excluded.test diff --git a/mysql-test/main/parallel_query.result b/mysql-test/main/parallel_query.result index 3224f1fc85439..a074957f5d199 100644 --- a/mysql-test/main/parallel_query.result +++ b/mysql-test/main/parallel_query.result @@ -2,20 +2,40 @@ SET DEBUG_SYNC='RESET'; # # MDEV-39492 Parallel Query: Study how to create worker threads # +create table t1 select seq from seq_1_to_2; set session parallel_worker_threads=3; # we should currently see 3 warnings; -select seq from seq_1_to_2; +select SQL_BUFFER_RESULT seq from t1; seq 1 2 -Warnings: -Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager -Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager -Warning 1105 This is an example warning to show we can push a warning from a worker thread to its manager +1 +2 +1 +2 set session parallel_worker_threads=10; -# the 10th workers throws out an error -select seq from seq_1_to_2; -ERROR HY000: Argument to the worker_busted_function() function does not belong to the range [0,1] +select SQL_BUFFER_RESULT seq from t1; +seq +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 set session parallel_worker_threads=0; connect killee, localhost, root, , ; # check that kill query on a parallel worker is passed to the manager @@ -23,7 +43,7 @@ connection killee; set session parallel_worker_threads=3; SET @save_dbug= @@global.debug_dbug; SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; -select seq from seq_1_to_2;; +select SQL_BUFFER_RESULT seq from t1;; # now use the default connection to view/kill the thread group executing # the parallel work connection default; @@ -39,7 +59,7 @@ USER DB STATE SUBSTRING(INFO, 1, 32) root test debug sync point: now Parallel Worker 3 For Thread ID root test debug sync point: now Parallel Worker 2 For Thread ID root test debug sync point: now Parallel Worker 1 For Thread ID -root test Reading data from parallel workers select seq from seq_1_to_2 +root test Reading data from parallel workers select SQL_BUFFER_RESULT seq fro kill query ID; # signal our workers to continue execution SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; @@ -50,7 +70,7 @@ ERROR 70100: Query execution was interrupted SET DEBUG_SYNC = "RESET"; # save as above, but kill a worker with a simple kill and see the # connection drop -select seq from seq_1_to_2;; +select SQL_BUFFER_RESULT seq from t1;; connection default; SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; kill ID; @@ -59,5 +79,6 @@ connection killee; ERROR 70100: Query execution was interrupted # killee connection now dead, confirmed below connection default; +drop table t1; SET GLOBAL debug_dbug = @save_dbug; SET DEBUG_SYNC = "RESET"; diff --git a/mysql-test/main/parallel_query.test b/mysql-test/main/parallel_query.test index 218d6176add8f..54a7b1b354bdd 100644 --- a/mysql-test/main/parallel_query.test +++ b/mysql-test/main/parallel_query.test @@ -16,14 +16,14 @@ SET DEBUG_SYNC='RESET'; --echo # MDEV-39492 Parallel Query: Study how to create worker threads --echo # +create table t1 select seq from seq_1_to_2; + set session parallel_worker_threads=3; --echo # we should currently see 3 warnings; -select seq from seq_1_to_2; +select SQL_BUFFER_RESULT seq from t1; set session parallel_worker_threads=10; ---echo # the 10th workers throws out an error ---error ER_ARGUMENT_OUT_OF_RANGE -select seq from seq_1_to_2; +select SQL_BUFFER_RESULT seq from t1; set session parallel_worker_threads=0; connect (killee, localhost, root, , ); @@ -40,7 +40,7 @@ set session parallel_worker_threads=3; SET @save_dbug= @@global.debug_dbug; SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; ---send select seq from seq_1_to_2; +--send select SQL_BUFFER_RESULT seq from t1; --echo # now use the default connection to view/kill the thread group executing --echo # the parallel work --connection default @@ -79,7 +79,7 @@ SET DEBUG_SYNC = "RESET"; --echo # save as above, but kill a worker with a simple kill and see the --echo # connection drop ---send select seq from seq_1_to_2; +--send select SQL_BUFFER_RESULT seq from t1; connection default; SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; let $name= "Parallel Worker 1 For Thread ID $id"; @@ -101,6 +101,7 @@ reap; connection default; +drop table t1; SET GLOBAL debug_dbug = @save_dbug; SET DEBUG_SYNC = "RESET"; diff --git a/mysql-test/main/parallel_query_excluded.result b/mysql-test/main/parallel_query_excluded.result new file mode 100644 index 0000000000000..f0a85a7c27424 --- /dev/null +++ b/mysql-test/main/parallel_query_excluded.result @@ -0,0 +1,54 @@ +SET @save_optimizer_trace= @@optimizer_trace; +SET optimizer_trace='enabled=on'; +CREATE TABLE t_plain (a INT); +INSERT INTO t_plain VALUES (1),(2),(3); +CREATE TABLE t_blob (a INT, b BLOB); +INSERT INTO t_blob VALUES (1,'x'),(2,'y'),(3,'z'); +CREATE TABLE t_text (a INT, b TEXT); +INSERT INTO t_text VALUES (1,'x'),(2,'y'),(3,'z'); +CREATE TABLE t_m1 (a INT) ENGINE=MyISAM; +INSERT INTO t_m1 VALUES (1),(2),(3); +CREATE TABLE t_m2 (a INT) ENGINE=MyISAM; +INSERT INTO t_m2 VALUES (4),(5),(6); +CREATE TABLE t_merge (a INT) ENGINE=MERGE UNION=(t_m1,t_m2) INSERT_METHOD=LAST; +CREATE TABLE t_ft (a INT, b VARCHAR(64), FULLTEXT(b)) ENGINE=MyISAM; +INSERT INTO t_ft VALUES (1,'alpha beta'),(2,'gamma delta'),(3,'epsilon zeta'); +set session parallel_worker_threads=2; +# plain table: chosen for parallel scan (expect 1) +SELECT a FROM t_plain; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +1 +# BLOB column present: excluded, not chosen (expect 0) +SELECT a FROM t_blob; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +0 +# TEXT column present: excluded, not chosen (expect 0) +SELECT a FROM t_text; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +0 +# MERGE engine: excluded, not chosen (expect 0) +SELECT a FROM t_merge; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +0 +# fulltext-searched (MATCH in select list): excluded, not chosen (expect 0) +SELECT a, MATCH(b) AGAINST('alpha') FROM t_ft; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +0 +DROP TABLE t_plain, t_blob, t_text, t_merge, t_m1, t_m2, t_ft; +SET optimizer_trace= @save_optimizer_trace; +set session parallel_worker_threads=default; diff --git a/mysql-test/main/parallel_query_excluded.test b/mysql-test/main/parallel_query_excluded.test new file mode 100644 index 0000000000000..e90c29d2e0003 --- /dev/null +++ b/mysql-test/main/parallel_query_excluded.test @@ -0,0 +1,80 @@ +# +# MDEV-39492 Parallel Query: tables that cannot be shipped row-by-row through a +# worker batch table are excluded from parallel worker scan. +# +# JOIN::choose_parallel_scan() records the table picked for the parallel scan +# in the optimizer trace as "chosen_for_parallel_scan". An ordinary table is +# chosen; a table is rejected by the use_parallel_scan gate when it has a +# BLOB/TEXT (blob-backed) column, uses the MERGE engine, or is fulltext-searched +# (a MATCH ... AGAINST refers to it). We assert presence/absence of that trace +# key (1/0) rather than the temporary row-multiplication behaviour. +# +--source include/not_embedded.inc + +SET @save_optimizer_trace= @@optimizer_trace; +SET optimizer_trace='enabled=on'; + +CREATE TABLE t_plain (a INT); +INSERT INTO t_plain VALUES (1),(2),(3); +CREATE TABLE t_blob (a INT, b BLOB); +INSERT INTO t_blob VALUES (1,'x'),(2,'y'),(3,'z'); +CREATE TABLE t_text (a INT, b TEXT); +INSERT INTO t_text VALUES (1,'x'),(2,'y'),(3,'z'); + +# MERGE table over two MyISAM children +CREATE TABLE t_m1 (a INT) ENGINE=MyISAM; +INSERT INTO t_m1 VALUES (1),(2),(3); +CREATE TABLE t_m2 (a INT) ENGINE=MyISAM; +INSERT INTO t_m2 VALUES (4),(5),(6); +CREATE TABLE t_merge (a INT) ENGINE=MERGE UNION=(t_m1,t_m2) INSERT_METHOD=LAST; + +# fulltext: MATCH in the select list keeps a full table scan and marks the +# table fulltext-searched +CREATE TABLE t_ft (a INT, b VARCHAR(64), FULLTEXT(b)) ENGINE=MyISAM; +INSERT INTO t_ft VALUES (1,'alpha beta'),(2,'gamma delta'),(3,'epsilon zeta'); + +set session parallel_worker_threads=2; + +--echo # plain table: chosen for parallel scan (expect 1) +--disable_result_log +SELECT a FROM t_plain; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +--echo # BLOB column present: excluded, not chosen (expect 0) +--disable_result_log +SELECT a FROM t_blob; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +--echo # TEXT column present: excluded, not chosen (expect 0) +--disable_result_log +SELECT a FROM t_text; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +--echo # MERGE engine: excluded, not chosen (expect 0) +--disable_result_log +SELECT a FROM t_merge; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +--echo # fulltext-searched (MATCH in select list): excluded, not chosen (expect 0) +--disable_result_log +SELECT a, MATCH(b) AGAINST('alpha') FROM t_ft; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +DROP TABLE t_plain, t_blob, t_text, t_merge, t_m1, t_m2, t_ft; +SET optimizer_trace= @save_optimizer_trace; +set session parallel_worker_threads=default; diff --git a/mysql-test/main/parallel_query_oom.result b/mysql-test/main/parallel_query_oom.result index c42bf2cf2abf7..573bbe3df23bc 100644 --- a/mysql-test/main/parallel_query_oom.result +++ b/mysql-test/main/parallel_query_oom.result @@ -11,11 +11,14 @@ set session parallel_worker_threads=1; # DBUG injection forces both into the OOM branch, so neither the original # warning nor the original error reaches the user. We expect just the # manager-surfaced ER_OUTOFMEMORY warning. -select count(*) from seq_1_to_2; -count(*) +create table t1 select seq from seq_1_to_2; +select SQL_BUFFER_RESULT * from t1; +seq +1 2 show warnings; Level Code Message Warning 1037 Parallel worker diagnostics were dropped due to memory allocation failure +drop table t1; set global debug_dbug=@save_dbug; set session parallel_worker_threads=default; diff --git a/mysql-test/main/parallel_query_oom.test b/mysql-test/main/parallel_query_oom.test index 6a8afd3266182..739accefd40d8 100644 --- a/mysql-test/main/parallel_query_oom.test +++ b/mysql-test/main/parallel_query_oom.test @@ -24,8 +24,10 @@ set session parallel_worker_threads=1; --echo # DBUG injection forces both into the OOM branch, so neither the original --echo # warning nor the original error reaches the user. We expect just the --echo # manager-surfaced ER_OUTOFMEMORY warning. -select count(*) from seq_1_to_2; +create table t1 select seq from seq_1_to_2; +select SQL_BUFFER_RESULT * from t1; show warnings; +drop table t1; set global debug_dbug=@save_dbug; set session parallel_worker_threads=default; diff --git a/sql/sql_explain.cc b/sql/sql_explain.cc index ba79a2b78fa6d..0d37f2f13fcda 100644 --- a/sql/sql_explain.cc +++ b/sql/sql_explain.cc @@ -1560,7 +1560,12 @@ int Explain_table_access::print_explain(select_result_sink *output, /* `type` column */ StringBuffer<64> join_type_buf; if (rowid_filter == NULL) - push_str(thd, &item_list, join_type_str[type]); + { + if (type == JT_ALL && use_parallel_scan) + push_str(thd, &item_list, "PARALLEL"); + else + push_str(thd, &item_list, join_type_str[type]); + } else { join_type_buf.append(join_type_str[type], strlen(join_type_str[type])); @@ -2034,7 +2039,10 @@ void Explain_table_access::print_explain_json(Explain_query *query, } } - writer->add_member("access_type").add_str(join_type_str[type]); + if (type == JT_ALL && use_parallel_scan) + writer->add_member("access_type").add_str("PARALLEL"); + else + writer->add_member("access_type").add_str(join_type_str[type]); add_json_keyset(writer, "possible_keys", &possible_keys); diff --git a/sql/sql_explain.h b/sql/sql_explain.h index 7a1ee3911611e..cdade594ed11b 100644 --- a/sql/sql_explain.h +++ b/sql/sql_explain.h @@ -809,6 +809,7 @@ class Explain_table_access : public Sql_alloc enum join_type type; bool used_partitions_set; + bool use_parallel_scan; /* Empty means "NULL" will be printed */ String_list possible_keys; diff --git a/sql/sql_parallel_workers.cc b/sql/sql_parallel_workers.cc index 26b6e68fa5d9d..95d76aee50ee1 100644 --- a/sql/sql_parallel_workers.cc +++ b/sql/sql_parallel_workers.cc @@ -35,13 +35,24 @@ Initialise our parallel worker threads pwt_init_psi_keys initialize PSI keys - pwt_management::join_parallel_workers - process information from worker threads until they are finished + worker_produce_chunks / worker_scan_table_to_manager + producer: scan the source table and stream rows to the manager + a batch at a time through the worker's reused batch table + pwt_management::handoff_batch + producer: hand the filled batch table to the manager and block + until it is drained + parallel_scan_read_first / parallel_scan_read_next + consumer: feed the first join_tab from the worker batch tables + pwt_management::finalize_parallel_workers + stop the workers, reap them, surface diagnostics, tear down */ #include "sql_parallel_workers.h" +#include "sql_select.h" #include "debug_sync.h" +#include "transaction.h" +#include "my_json_writer.h" #ifdef HAVE_PSI_INTERFACE static PSI_thread_key key_thread_pwt; @@ -50,33 +61,39 @@ static PSI_thread_info all_pwt_threads[]= { &key_thread_pwt, WORKER_NAME, PSI_FLAG_GLOBAL}, }; -static PSI_mutex_key key_mutex_pwt_LOCK_manager, - key_mutex_pwt_LOCK_thread, - key_mutex_pwt_LOCK_worker; +static PSI_mutex_key key_mutex_pwt_LOCK_thread, + key_mutex_pwt_LOCK_worker, + key_mutex_pwt_LOCK_data; static PSI_mutex_info all_pwt_mutexes[]= { - { &key_mutex_pwt_LOCK_manager, "pwt_management::LOCK_pwt_manager", 0}, - { &key_mutex_pwt_LOCK_thread, "pwt_management::LOCK_pwt_thread", 0}, - { &key_mutex_pwt_LOCK_worker, "pwt_worker::LOCK_worker", 0}, + { &key_mutex_pwt_LOCK_thread, "pwt_management::LOCK_pwt_thread", 0}, + { &key_mutex_pwt_LOCK_worker, "pwt_worker::LOCK_worker", 0}, + { &key_mutex_pwt_LOCK_data, "pwt_management::LOCK_data", 0}, }; -static PSI_cond_key key_COND_pwt_new_message, key_COND_pwt_worker; +static PSI_cond_key key_COND_pwt_worker, + key_COND_pwt_data_avail, key_COND_pwt_data_space; static PSI_cond_info all_pwt_conds[]= { - { &key_COND_pwt_new_message, "pwt_management::COND_pwt_new_message", 0}, { &key_COND_pwt_worker, "pwt_worker::COND_worker", 0}, + { &key_COND_pwt_data_avail, "pwt_management::COND_data_avail", 0}, + { &key_COND_pwt_data_space, "pwt_management::COND_data_space", 0}, }; static PSI_memory_info all_pwt_memory[]= { - { &key_memory_pwt_queued_event, "pwt_queued_event", 0}, - { &key_memory_pwt_error_message, "pwt_error_message", 0}, - { &key_memory_pwt_workers, "pwt_management::workers", 0}, - { &key_memory_pwt_db, "pwt_worker::db", 0}, + { &key_memory_pwt_queued_event, "pwt_queued_event", 0}, + { &key_memory_pwt_error_message, "pwt_error_message", 0}, + { &key_memory_pwt_workers, "pwt_management::workers", 0}, + { &key_memory_pwt_db, "pwt_worker::db", 0}, }; #endif /* HAVE_PSI_INTERFACE */ +// consumer read function, installed on the first join_tab by init below +static int parallel_scan_read_first(JOIN_TAB *tab); + + /** @brief push an error message onto our queue to send to the manager @@ -105,7 +122,6 @@ bool error_to_queue(THD *thd, pwt_queued_event **event, uint error, *event= nullptr; return true; } - (*event)->data= nullptr; (*event)->error->level= level; if (level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR) (*event)->error->worker_errno= thd->killed_errno(); @@ -151,9 +167,6 @@ class PWT_error_handler : public Internal_error_handler mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); worker->manager->messages_dropped= true; mysql_mutex_unlock(&worker->manager->LOCK_pwt_thread); - mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); - mysql_cond_signal(&worker->manager->COND_pwt_new_message); - mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); return true; } mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); @@ -166,6 +179,217 @@ class PWT_error_handler : public Internal_error_handler }; +/* + Copy one field's value from one row buffer to another, propagating the null + state. field_conv() copies only the value bytes, so the null bit has to be + transferred separately -- otherwise a NULL source would leave the + destination non-null (showing stale data) and a non-null source would not + clear a null bit left over from a previous row. set_null/set_notnull are + safe no-ops when the field is declared NOT NULL. +*/ +static inline void copy_field_with_null(Field *to, Field *from) +{ + if (from->is_null()) + to->set_null(); + else + { + to->set_notnull(); + field_conv(to, from); + } +} + + +/** + @brief + Hand this worker's filled batch table to the manager (producer side). + + Marks batch_table ready and blocks until the manager has drained it + (clears batch_full) or asks the producers to stop. On return the table is + the worker's again: either empty and ready to refill, or to be abandoned. + + @return + true the consumer asked us to stop (stop scanning) + false the table was drained; refill it +*/ + +bool pwt_management::handoff_batch(pwt_worker *worker) +{ + mysql_mutex_lock(&LOCK_data); + if (stop) + { + mysql_mutex_unlock(&LOCK_data); + return true; + } + worker->batch_full= true; + mysql_cond_signal(&COND_data_avail); // wake the consumer + while (worker->batch_full && !stop) + mysql_cond_wait(&COND_data_space, &LOCK_data); + bool stopped= stop; + mysql_mutex_unlock(&LOCK_data); + return stopped; +} + + +void inline close_worker_scan_table(pwt_worker *worker) +{ + if (worker->our_scan_table) + { + closefrm(worker->our_scan_table); + my_free(worker->our_scan_table); + worker->our_scan_table= nullptr; + } +} + +/** + @brief + Scan the worker's private source table and stream the rows to the manager, + a batch at a time, through the worker's reused batch table. + + Each worker scans its own private copy of the first non-const join table + (worker->our_scan_table, opened with in_use == this worker's thd) so the + workers scan truly concurrently -- no shared-scan lock is needed. It copies + up to PWT_CHUNK_ROWS rows field-by-field into batch_table (built in + the source-table column format, so field_conv is needed rather than a raw + record copy), hands that table to the manager, and blocks until the manager + has drained it (handoff_batch). It then truncates the table and refills it + for the next batch, until the source scan is exhausted. + + The worker only reproduces the source scan; it does not apply WHERE/JOIN + conditions or run the rest of the join. The manager consumes these rows and + drives the join itself as they arrive (see parallel_scan_read_next). + + Returns the handler error code (0 on success); HA_ERR_END_OF_FILE is mapped + to success. A clean stop requested by the manager (handoff_batch -> stop) + also returns success: the manager is done, not in error. +*/ + +static int worker_produce_chunks(pwt_worker *worker) +{ + TABLE *src= worker->our_scan_table; + TABLE *dst= worker->batch_table; + const uint nfields= src->s->fields; + int err; + + src->use_all_columns(); // read every column into record[0] + dst->use_all_columns(); // we write every column of the batch row + + /* + Our handle bypassed lock_tables(), so take the engine-level read lock + ourselves; InnoDB needs this to register the table with its trx before + a scan. Paired with the F_UNLCK below. + */ + if ((err= src->file->ha_external_lock(worker->thd, F_RDLCK))) + return err; + + src->file->ha_index_or_rnd_end(); // in case a prior scan was open + if ((err= src->file->ha_rnd_init_with_error(1))) + { + src->file->ha_external_lock(worker->thd, F_UNLCK); + return err; + } + + bool eof= false, killed= false; + while (!eof && !killed) + { + /* + The batch table is ours again (freshly created on the first pass, or + drained by the manager on later passes). Take ownership and empty it. + */ + dst->in_use= worker->thd; + if ((err= dst->file->ha_delete_all_rows())) + break; + + uint rows= 0; + while (rows < PWT_CHUNK_ROWS) + { + // honour a direct KILL of this worker's thread + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + killed= worker->thd->killed; + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + if (killed) + { + my_error(ER_QUERY_INTERRUPTED, MYF(0)); + break; // stop now; do not hand off + } + + if ((err= src->file->ha_rnd_next(src->record[0]))) + { + if (err == HA_ERR_END_OF_FILE) + { + err= 0; + eof= true; + } + break; + } + for (uint k= 0; k < nfields; k++) + copy_field_with_null(dst->field[k], src->field[k]); + if ((err= dst->file->ha_write_tmp_row(dst->record[0]))) + break; + rows++; + } + + if (err && err != HA_ERR_END_OF_FILE) + break; // real error; do not hand off + err= 0; + + if (rows && !killed) // hand the filled batch to the manager + { + if (worker->manager->handoff_batch(worker)) // manager asked us to stop + break; + } + } + + src->file->ha_rnd_end(); + src->file->ha_external_lock(worker->thd, F_UNLCK); + return err; +} + + +bool worker_scan_table_to_manager(pwt_worker *worker) +{ + pwt_management *mgr= worker->manager; + int err= worker_produce_chunks(worker); + + /* + End the worker's read transaction now, while we are still on the worker + thread. destroy_background_thd() -> THD::cleanup() rolls the transaction + back and asserts (trans_check) that the statement transaction is already + empty, so we must close it out here. Any commit failure is captured by the + installed PWT_error_handler. + */ + trans_commit_stmt(worker->thd); + trans_commit(worker->thd); + + /* + Mark this producer done so the consumer can detect EOF, and wake it in + case it is blocked waiting for data. A real engine error trips fatal_error + so the consumer aborts the join instead of returning a truncated result. + If this worker was killed (e.g. a user KILL aimed at it), record the kill + so the consumer can propagate it to the manager's THD and abort the join + with ER_QUERY_INTERRUPTED before any result is sent. + */ + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + killed_state killed= worker->thd->killed; + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + + mysql_mutex_lock(&mgr->LOCK_data); + if (err) + mgr->fatal_error= true; + if (killed && mgr->kill_signal == NOT_KILLED) + mgr->kill_signal= killed; + mgr->active_workers--; + mysql_cond_broadcast(&mgr->COND_data_avail); + mysql_mutex_unlock(&mgr->LOCK_data); + + if (err) + { + worker->our_scan_table->file->print_error(err, MYF(0)); + return true; + } + return false; +} + + /** @brief Entry point for our worker threads, arg supplied by manager details what @@ -175,11 +399,8 @@ class PWT_error_handler : public Internal_error_handler static void *parallel_worker_thread_func(void *arg) { pwt_worker *worker= (pwt_worker*) arg; - struct timespec abs_timeout; - PSI_stage_info old_stage; - PWT_error_handler error_handler; - abs_timeout.tv_nsec= 0; + /* Set current_thd and thread local storage (my_thread_var) for our new THD to ensure they have their own local objects/errors/warnings etc @@ -189,18 +410,12 @@ static void *parallel_worker_thread_func(void *arg) THD_STAGE_INFO(worker->thd, stage_sending_data); worker->thd->push_internal_handler(&error_handler); - /* - START: in lieu of work, wait 1 seconds, push out an error or a warning, - wait another 1 seconds then exit - */ - abs_timeout.tv_sec= time(0)+1; - mysql_mutex_lock(&worker->LOCK_worker); - worker->thd->ENTER_COND(&worker->COND_worker, &worker->LOCK_worker, - &stage_sending_data, &old_stage); - mysql_cond_timedwait(&worker->COND_worker, &worker->LOCK_worker, - &abs_timeout); - worker->thd->EXIT_COND(&old_stage); - + DBUG_EXECUTE_IF("pwt_error_to_queue_oom", + { + push_warning(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, + "This is an example warning to show we can push a " + "warning from a worker thread to its manager "); + }); #ifdef ENABLED_DEBUG_SYNC /* we can't sync on the managers or our THD, spin the whole thing about @@ -216,42 +431,11 @@ static void *parallel_worker_thread_func(void *arg) mysql_mutex_lock(&worker->thd->LOCK_thd_kill); if (worker->thd->killed) { - mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); my_error(ER_QUERY_INTERRUPTED, MYF(0)); - goto worker_thread_exit; } mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); - if (worker->parallel_scan_job) - push_warning(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, - "This is an example warning to show we can push a " - "warning from a worker thread to its manager "); - else - my_error(ER_ARGUMENT_OUT_OF_RANGE, MYF(0), "worker_busted_function()"); - - // signal manager there is something in the queue, - mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); - mysql_cond_signal(&worker->manager->COND_pwt_new_message); - mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); - - abs_timeout.tv_sec= time(0)+5; - mysql_mutex_lock(&worker->LOCK_worker); - worker->thd->ENTER_COND(&worker->COND_worker, &worker->LOCK_worker, - &stage_sending_data, &old_stage); - mysql_cond_timedwait(&worker->COND_worker, &worker->LOCK_worker, - &abs_timeout); - worker->thd->EXIT_COND(&old_stage); - - mysql_mutex_lock(&worker->thd->LOCK_thd_kill); - if (worker->thd->killed) - { - my_error(ER_QUERY_INTERRUPTED, MYF(0)); - } - mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); - - // END: in lieu of work - -worker_thread_exit: + worker_scan_table_to_manager(worker); // manager needs to see this as atomic mysql_mutex_lock(&worker->LOCK_worker); @@ -260,7 +444,6 @@ static void *parallel_worker_thread_func(void *arg) KILL on this worker's thread_id goes through THD::awake() which holds LOCK_thd_kill but not LOCK_worker, so we must nest both to get a race-free snapshot for the manager. - Lock order matches join_parallel_workers(). */ mysql_mutex_lock(&worker->thd->LOCK_thd_kill); worker->killed= worker->thd->killed; // save this flag, THD is destroyed @@ -271,15 +454,19 @@ static void *parallel_worker_thread_func(void *arg) worker->thd= nullptr; mysql_mutex_unlock(&worker->LOCK_worker); - // signal manager again to wake up and end this thread - mysql_mutex_lock(&worker->manager->LOCK_pwt_manager); - mysql_cond_signal(&worker->manager->COND_pwt_new_message); - mysql_mutex_unlock(&worker->manager->LOCK_pwt_manager); + /* + Close our private source table while we are still attached to our THD + (current_thd == thd) and, crucially, before destroy_background_thd() + tears down the THD's transaction: the engine handle's close frees state + that references that transaction (InnoDB's prebuilt). The manager never + touches a started worker's our_scan_table, so no lock is needed here. + */ + close_worker_scan_table(worker); /* - executing this sets my_thread_var to null, stopping our ability use - the normal mutex mechanisms, so we operate this outside the locked region - on a copy of our THD pointer + executing thd_detach_thd sets my_thread_var to null, stopping our ability + use the normal mutex mechanisms, so we operate this outside the locked + region on a copy of our THD pointer */ thd_detach_thd(save); server_threads.erase(thd); @@ -288,6 +475,7 @@ static void *parallel_worker_thread_func(void *arg) return nullptr; } + /** @brief Abort this worker, called as part of an error condition @@ -330,16 +518,13 @@ void pwt_management::free_queue() my_free(err->message); my_free(err); } - if (event->data) - { - // TODO: free associated - } my_free(event); } mysql_mutex_unlock(&LOCK_pwt_thread); } + /** @brief Initialise our parallel worker threads, setting their own new THD objects. @@ -349,13 +534,74 @@ void pwt_management::free_queue() Called from the management thread for applicable queries at the top level. */ -bool pwt_management::init_parallel_workers(THD *thd) +bool pwt_management::init_parallel_workers(THD *thd, JOIN *join) { bool result= false; uint i= 0; if (const uint n= thd->variables.parallel_worker_threads) { + this->join= join; + this->thd= thd; + JOIN_TAB *tab= join->parallel_scan_join_tab; + if (!tab) // nothing to do + { + nworkers= 0; + return false; + } + + /* + Locate the join_tab actually flagged for parallel scan -- it is not + necessarily the first non-const table in the array. The workers + reproduce a plain full table scan (ha_rnd_*) of it and feed the manager + row images, but never position the real handler on a row. Fall back to a + serial scan when that is unsafe: + + - select->quick: a range/quick access method was chosen (possibly after + use_parallel_scan was set, e.g. a range built from the WHERE clause); + a full scan would not honour it. + + - filesort: a sort tied to this table drives its own read of it, which + the injected read functions do not feed. + + - keep_current_rowid: the plan needs this table's engine rowid via + handler::position() -- e.g. DuplicateWeedout semijoin elimination or + a rowid-ordered sort. Injected rows carry no valid handler position, + so position() returns a stale/duplicate rowid for engines whose rowid + is positional (MyISAM), and weedout then drops rows. (InnoDB derives + the rowid from the PK in record[0], so it happens to survive, but we + must not rely on that.) + + The batch tables already created for this query are left unused (freed at + cleanup). + */ + JOIN_TAB *scan_tab= nullptr; + for (JOIN_TAB *jt= join->join_tab + join->const_tables; + jt < join->join_tab + join->top_join_tab_count; jt++) + { + if (jt->use_parallel_scan) + { + scan_tab= jt; + break; + } + } + if (scan_tab && + ((scan_tab->select && scan_tab->select->quick) || + scan_tab->filesort || + scan_tab->keep_current_rowid)) + { + nworkers= 0; + return false; + } + + /* + One batch table per worker was created up front by + create_parallel_workers_tmp_tables; bail if that count is out of step + with the worker count we are about to spawn. + */ + if (n != tab->parallel_tmp_tables.elements()) + return true; + workers= (pwt_worker *) my_malloc(key_memory_pwt_workers, n * sizeof(pwt_worker), MYF(MY_WME | MY_ZEROFILL)); @@ -364,11 +610,25 @@ bool pwt_management::init_parallel_workers(THD *thd) nworkers= n; - mysql_mutex_init(key_mutex_pwt_LOCK_manager, &LOCK_pwt_manager, - MY_MUTEX_INIT_SLOW); mysql_mutex_init(key_mutex_pwt_LOCK_thread, &LOCK_pwt_thread, MY_MUTEX_INIT_SLOW); - mysql_cond_init(key_COND_pwt_new_message, &COND_pwt_new_message, NULL); + + /* + Set up the streaming channel before any worker starts: a worker's first + action is to hand off a batch through handoff_batch(), which needs + LOCK_data and the conds live. active_workers must already equal n so the + consumer does not mistake "not started yet" for EOF. + */ + mysql_mutex_init(key_mutex_pwt_LOCK_data, &LOCK_data, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_pwt_data_avail, &COND_data_avail, nullptr); + mysql_cond_init(key_COND_pwt_data_space, &COND_data_space, nullptr); + active_workers= n; + fatal_error= false; + stop= false; + reaped= false; + cur_worker= nullptr; + kill_signal= NOT_KILLED; + for (i= 0; i < n; i++) { workers[i].thd= create_background_thd(); @@ -421,8 +681,66 @@ bool pwt_management::init_parallel_workers(THD *thd) workers[i].thd->pwt_worker_info= workers+i; workers[i].finished= workers[i].joined= false; workers[i].killed= NOT_KILLED; - if ((i+1)%10) // determines error or warning in a deterministic way - workers[i].parallel_scan_job= (void*)0x1; + workers[i].batch_full= false; + workers[i].batch_table= tab->parallel_tmp_tables.at(i); + + /* + Give this worker its own TABLE+handler for the manager's first + non-const source table, opened from the shared TABLE_SHARE. + + open_table_from_share() runs here on the manager thread, so the open + must happen with in_use == current_thd: handler::ha_thd() asserts + table->in_use == current_thd, and ha_innobase::open() calls it. So we + open under the manager's thd, then repoint in_use at the worker. + + That is enough for engines that cache the THD: InnoDB sets m_user_thd + lazily on the first operation (update_thd()), not at open time, so it + binds to the worker when the worker scans on its own thread. Each + worker thus gets a private handler and they scan concurrently without + a shared-scan lock. + + Arguments to open_table_from_share() below: + thd open under the manager's THD (the in_use rule above); + in_use is repointed to the worker afterwards. + src->s the shared TABLE_SHARE of the first non-const source + table -- the worker's TABLE is built from it. + &src->s->table_name + alias (name) for the opened TABLE. + HA_OPEN_KEYFILE | HA_TRY_READ_ONLY + db_stat (handler open mode): open the index/key file, + read-only -- the worker only scans the table. + EXTRA_RECORD + prgflag: allocate a second record buffer (record[1]) + in addition to record[0], as for a normal table open. + thd->open_options + ha_open_flags, the handler open options from the THD. + st outparam: the TABLE we just allocated, initialised here + as this worker's private table. + false is_create_table: this open is not part of CREATE TABLE. + nullptr partitions_to_open: open all partitions (no subset). + */ + { + TABLE *src= (join->join_tab + join->const_tables)->table; + TABLE *st= (TABLE*) my_malloc(key_memory_TABLE, sizeof(TABLE), + MYF(MY_WME | MY_ZEROFILL)); + if (!st) + { + result= true; + goto cleanup_db_string; + } + if (open_table_from_share(thd, src->s, &src->s->table_name, + HA_OPEN_KEYFILE | HA_TRY_READ_ONLY, + EXTRA_RECORD, thd->open_options, st, + false, nullptr)) + { + my_free(st); + result= true; + goto cleanup_db_string; + } + st->in_use= workers[i].thd; + workers[i].our_scan_table= st; + } + server_threads.insert(workers[i].thd); // +information_schema.processlist if (mysql_thread_create(key_thread_pwt, &workers[i].pthread, nullptr, @@ -432,7 +750,17 @@ bool pwt_management::init_parallel_workers(THD *thd) goto cleanup_thread_create; } } - this->thd= thd; + + /* + Feed the first join_tab from the channel instead of scanning the real + first table. exec_inner() runs after this returns and drives the join + through these read functions, consuming worker rows as they arrive. + */ + { + JOIN_TAB *first= join->join_tab + join->const_tables; + first->table->reginfo.join_tab= first; + first->read_first_record= parallel_scan_read_first; + } return result; } else @@ -440,6 +768,7 @@ bool pwt_management::init_parallel_workers(THD *thd) cleanup_thread_create: server_threads.erase(workers[i].thd); + close_worker_scan_table(workers+i); cleanup_db_string: /* @@ -458,15 +787,25 @@ bool pwt_management::init_parallel_workers(THD *thd) mysql_cond_destroy(&workers[i].COND_worker); cleanup_old_workers: + /* + A worker spawned before the failure may be blocked in handoff_batch() + waiting for the manager to drain its batch. Release them (stop + broadcast) + so abort_worker()'s join can complete. + */ + mysql_mutex_lock(&LOCK_data); + stop= true; + mysql_cond_broadcast(&COND_data_space); + mysql_mutex_unlock(&LOCK_data); for (uint j= 0; j < i; j++) abort_worker(workers+j); free_queue(); my_free(workers); workers= nullptr; nworkers= 0; - mysql_cond_destroy(&COND_pwt_new_message); - mysql_mutex_destroy(&LOCK_pwt_manager); mysql_mutex_destroy(&LOCK_pwt_thread); + mysql_cond_destroy(&COND_data_avail); + mysql_cond_destroy(&COND_data_space); + mysql_mutex_destroy(&LOCK_data); return result; } @@ -488,132 +827,290 @@ void pwt_init_psi_keys(void) #endif -/** - @brief - Process data {errors, warnings, data, signals} from the workers. - - Currently this is called in the main thread after JOIN::exec_inner, but - this will need to be disassembled and integrated into the above (or vice - versa). +/* + Consumer side of the streaming channel. These pluggable read functions feed + the first join_tab from the worker batch tables instead of scanning the real + first table: each row a worker wrote into its batch table is copied + field-by-field into the first table's record[0], so the manager's + nested-loop join evaluates exactly as if it had scanned the table itself, but + driven by the worker rows as they arrive. parallel_scan_read_first installs + the read function and returns the first row; parallel_scan_read_next returns + each subsequent row, blocking when no worker batch is momentarily ready. + + The manager drains one worker's batch table at a time (mgr->cur_worker): it + rnd-scans that table, and on EOF releases the worker to refill (clears + batch_full, signals COND_data_space) and picks the next ready worker. + + Return convention matches the engine read functions: 0 = row produced, + -1 = end of data, 1 = error (matching report_error()). */ - -void pwt_management::join_parallel_workers(THD *thd) +static int parallel_scan_read_next(READ_RECORD *info) { - bool all_done= false, workers_killed= false; - PSI_stage_info old_stage; - struct timespec wait_max; - wait_max.tv_nsec= 0; - int killed_from= -1; + TABLE *dst= info->table; // real first table + pwt_management *mgr= dst->reginfo.join_tab->join->parallel_work_manager; + const uint nfields= dst->s->fields; + struct timespec wait; + wait.tv_nsec= 0; - while (!all_done) + for (;;) { - wait_max.tv_sec= time(0)+1; // wait 1s - mysql_mutex_lock(&LOCK_pwt_manager); - thd->ENTER_COND(&COND_pwt_new_message, &LOCK_pwt_manager, - &stage_reading_data_from_parallel_worker, &old_stage); - mysql_cond_timedwait(&COND_pwt_new_message, &LOCK_pwt_manager, &wait_max); - thd->EXIT_COND(&old_stage); - - all_done= true; - - // delete worker threads that are finished - for (uint i= 0; i < nworkers; i++) + if (mgr->cur_worker) // draining a worker's batch { - if (workers[i].joined) // already done - continue; - mysql_mutex_lock(&workers[i].LOCK_worker); - if (workers[i].finished) - { - mysql_mutex_unlock(&workers[i].LOCK_worker); - if (workers[i].killed) - { - killed_from= i; - thd->awake(workers[i].killed); - } - pthread_join(workers[i].pthread, nullptr); - mysql_mutex_destroy(&workers[i].LOCK_worker); - mysql_cond_destroy(&workers[i].COND_worker); - workers[i].joined= true; - } - else + TABLE *bt= mgr->cur_worker->batch_table; + int err= bt->file->ha_rnd_next(bt->record[0]); + if (!err) { - mysql_mutex_unlock(&workers[i].LOCK_worker); - all_done= false; + /* + We are injecting a scanned row into the real first table's record[0]. + Some Field::store() paths (e.g. Field_bit_as_char) assert the field + is marked in write_set, but a plain scanned table has an empty + write_set. Mark all columns writable for the duration of the copy + (debug-only; a no-op in release). + */ + MY_BITMAP *old_map= dbug_tmp_use_all_columns(dst, &dst->write_set); + for (uint k= 0; k < nfields; k++) + copy_field_with_null(dst->field[k], bt->field[k]); + dbug_tmp_restore_column_map(&dst->write_set, old_map); + return 0; } + // batch drained (EOF) or a read error; end the scan and release worker + bt->file->ha_rnd_end(); + pwt_worker *w= mgr->cur_worker; + mysql_mutex_lock(&mgr->LOCK_data); + mgr->cur_worker= nullptr; + w->batch_full= false; // table is the worker's again + mysql_cond_broadcast(&mgr->COND_data_space); // wake it to refill + mysql_mutex_unlock(&mgr->LOCK_data); + if (err != HA_ERR_END_OF_FILE) + return 1; + // fall through and look for the next ready worker } - if (thd->killed && !workers_killed) + // find the next worker whose batch table is filled and ready + pwt_worker *next= nullptr; + PSI_stage_info old_stage; + mysql_mutex_lock(&mgr->LOCK_data); + for (;;) { - // inform our workers that they are killed - for (uint i= 0; i < nworkers; i++) - { - if (workers[i].joined) - continue; - mysql_mutex_lock(&workers[i].LOCK_worker); - if (workers[i].finished) - { - mysql_mutex_unlock(&workers[i].LOCK_worker); - continue; - } - - if ((int)i != killed_from) + for (uint i= 0; i < mgr->nworkers; i++) + if (mgr->workers[i].batch_full) { - mysql_mutex_lock(&workers[i].thd->LOCK_thd_kill); - workers[i].thd->killed= thd->killed; - mysql_mutex_unlock(&workers[i].thd->LOCK_thd_kill); - mysql_cond_signal(&workers[i].COND_worker); + next= &mgr->workers[i]; + break; } - mysql_mutex_unlock(&workers[i].LOCK_worker); + if (next) + break; + /* + A worker exited because it was killed: propagate the kill to the + manager's own THD so the join aborts now with ER_QUERY_INTERRUPTED, + before any result is sent. (The join's sub_select kill checks turn + thd->killed into the error message.) + */ + if (mgr->kill_signal != NOT_KILLED && !mgr->thd->killed) + { + killed_state ks= mgr->kill_signal; + mysql_mutex_unlock(&mgr->LOCK_data); + mysql_mutex_lock(&mgr->thd->LOCK_thd_kill); + mgr->thd->killed= ks; + mysql_mutex_unlock(&mgr->thd->LOCK_thd_kill); + return 1; } - workers_killed= true; - } - else - { - // process queue - bool surface_drop; - mysql_mutex_lock(&LOCK_pwt_thread); - surface_drop= messages_dropped; - messages_dropped= false; - pwt_queued_event *event; - while ((event= parallel_messages.get())) + if (mgr->fatal_error) // a worker failed { - if (pwt_error_message *err= event->error) - { - /* - set_overwrite_status to capture a message in our worker THD - TODO, look at getting rid of this if we can - */ - // thd->get_stmt_da()->set_overwrite_status(true); - if (err->level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR) - my_message_sql(err->code, err->message, MYF(0)); - else - push_warning(thd, err->level, err->code, err->message); - - my_free(err->message); - my_free(err); - } - if (event->data) - { - // process data from our worker thread - } - my_free(event); + mysql_mutex_unlock(&mgr->LOCK_data); + return 1; + } + if (!mgr->active_workers) // all producers done, drained + { + mysql_mutex_unlock(&mgr->LOCK_data); + return -1; } - mysql_mutex_unlock(&LOCK_pwt_thread); + if (mgr->thd->killed) + { + mysql_mutex_unlock(&mgr->LOCK_data); + return 1; + } + // wait for a batch, a finishing worker, or a 1s tick to re-check killed. + // ENTER_COND/EXIT_COND publish the "Reading data from parallel workers" + // stage and register the cond so a KILL of the manager wakes it. + wait.tv_sec= time(0) + 1; + mgr->thd->ENTER_COND(&mgr->COND_data_avail, &mgr->LOCK_data, + &stage_reading_data_from_parallel_worker, &old_stage); + mysql_cond_timedwait(&mgr->COND_data_avail, &mgr->LOCK_data, &wait); + mgr->thd->EXIT_COND(&old_stage); // unlocks LOCK_data + mysql_mutex_lock(&mgr->LOCK_data); // re-lock for the next pass + } + mgr->cur_worker= next; + mysql_mutex_unlock(&mgr->LOCK_data); + + // open a scan on the chosen worker's batch table, then loop to read it + TABLE *bt= next->batch_table; + bt->in_use= mgr->thd; + bt->file->ha_index_or_rnd_end(); // in case a scan was open + if (bt->file->ha_rnd_init_with_error(1)) + return 1; + } +} + + +static int parallel_scan_read_first(JOIN_TAB *tab) +{ + tab->table->status= 0; + tab->read_record.table= tab->table; + tab->read_record.read_record_func= parallel_scan_read_next; + return parallel_scan_read_next(&tab->read_record); +} + + +/** + @brief look through the JOIN_TABs, ensure only one use_parallel_scan flag + is enabled on the scan table with the largest on disk size. Reset + any other use_parallel_scan flag to false. + + @description + get_best_combination() may flag more than one full-scan table with + use_parallel_scan. Only the first non-const table is actually scanned in + parallel, so keep the flag on a single table -- the one with the largest + on-disk data size, where parallelising the scan pays off most -- and clear + it on the rest. (data_file_length was populated by make_join_statistics.) +*/ + +void JOIN::choose_parallel_scan() +{ + if (!join_tab) + return; - if (surface_drop) - push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, - ER_OUTOFMEMORY, - "Parallel worker diagnostics were dropped due " - "to memory allocation failure"); + JOIN_TAB *chosen= nullptr; + for (JOIN_TAB *tab= join_tab + const_tables; + tab < join_tab + top_join_tab_count; tab++) + { + if (!tab->use_parallel_scan) + continue; + if (chosen && + tab->table->file->stats.data_file_length <= + chosen->table->file->stats.data_file_length) + { + tab->use_parallel_scan= false; // smaller candidate: drop it + continue; } + if (chosen) + chosen->use_parallel_scan= false; // a larger table supersedes + chosen= tab; } - if (nworkers) + if (unlikely(thd->trace_started()) && chosen) { - mysql_cond_destroy(&COND_pwt_new_message); - mysql_mutex_destroy(&LOCK_pwt_manager); - mysql_mutex_destroy(&LOCK_pwt_thread); - my_free(workers); - workers= nullptr; + Json_writer_object trace_one_table(thd); + trace_one_table.add("chosen_for_parallel_scan", + chosen->table->alias.c_ptr()); } } + + +/** + @brief + Stop the producers and pthread_join them. + + The workers read this join's source and batch tables, so they must be + reaped before JOIN::join_free()->cleanup() frees those tables -- otherwise a + worker that has not yet observed the stop request dereferences a freed + table->file. That is why this is called from join_free(), ahead of the table + teardown, and again (idempotently, guarded by 'reaped') from finalize. + + On a normal completion the workers have already finished; on an early-out + (LIMIT) or an abort (KILL/error) we ask them to stop here. A worker + re-checks 'stop' at each batch hand-off, so it exits within at most one + batch -- without raising ER_QUERY_INTERRUPTED, which matters for a normal + early-out. +*/ + +void pwt_management::quiesce_workers() +{ + if (!workers || reaped) + return; + + /* + On an early-out/error the manager may have stopped mid-batch with a scan + still open on cur_worker's table; end it before that table is freed. + */ + if (cur_worker) + { + cur_worker->batch_table->file->ha_rnd_end(); + cur_worker= nullptr; + } + + mysql_mutex_lock(&LOCK_data); + stop= true; + mysql_cond_broadcast(&COND_data_space); + mysql_mutex_unlock(&LOCK_data); + + for (uint i= 0; i < nworkers; i++) + { + pthread_join(workers[i].pthread, nullptr); + mysql_mutex_destroy(&workers[i].LOCK_worker); + mysql_cond_destroy(&workers[i].COND_worker); + } + reaped= true; +} + + +/** + @brief + Reap the workers (if not already) and tear the channel down. + + Called from JOIN::exec() once exec_inner() has finished. Worker errors and + warnings collected by PWT_error_handler are surfaced here, after the join's + own result has been produced. +*/ + +void pwt_management::finalize_parallel_workers(THD *thd, JOIN *join) +{ + if (!workers) + return; + + quiesce_workers(); // stop + join (no-op if already reaped) + + /* + Surface errors/warnings the workers queued via PWT_error_handler. A worker + error that mattered to the result has already aborted the join during + execution (fatal_error or a propagated kill), so thd is already in error by + the time we get here; raising another error would trip the "can't overwrite + status" assertion in the diagnostics area. So only raise a queued ERROR + when thd is not already in error -- otherwise keep it as a warning. Plain + warnings are always safe to add. + */ + bool surface_drop; + mysql_mutex_lock(&LOCK_pwt_thread); + surface_drop= messages_dropped; + messages_dropped= false; + pwt_queued_event *event; + while ((event= parallel_messages.get())) + { + if (pwt_error_message *err= event->error) + { + if (err->level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR && + !thd->is_error()) + my_message_sql(err->code, err->message, MYF(0)); + else + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, err->code, + err->message); + my_free(err->message); + my_free(err); + } + my_free(event); + } + mysql_mutex_unlock(&LOCK_pwt_thread); + + if (surface_drop) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_OUTOFMEMORY, + "Parallel worker diagnostics were dropped due to " + "memory allocation failure"); + + mysql_cond_destroy(&COND_data_avail); + mysql_cond_destroy(&COND_data_space); + mysql_mutex_destroy(&LOCK_data); + mysql_mutex_destroy(&LOCK_pwt_thread); + my_free(workers); + workers= nullptr; + nworkers= 0; +} diff --git a/sql/sql_parallel_workers.h b/sql/sql_parallel_workers.h index e7117e8155fc6..4d88834579153 100644 --- a/sql/sql_parallel_workers.h +++ b/sql/sql_parallel_workers.h @@ -26,13 +26,6 @@ class pwt_error_message char *message; }; -class pwt_data_message -{ -public: - TABLE *tmp_table; -}; - - /* Event type. Inherits ilink so it can live in an I_List. */ @@ -40,9 +33,20 @@ class pwt_queued_event : public ilink { public: pwt_error_message *error; - pwt_data_message *data; }; + +/* + Number of rows a worker packs into its batch temporary table before handing + it to the manager. The worker hands rows to the manager a batch at a time + rather than one at a time so the channel mutex is touched once per + PWT_CHUNK_ROWS rows instead of once per row. Each worker reuses a single + batch table (see pwt_worker::batch_table): it fills the table, hands + it to the manager and blocks until the manager has drained it, then truncates + and refills it for the next batch. +*/ +#define PWT_CHUNK_ROWS 64 + class pwt_management; @@ -76,7 +80,33 @@ class pwt_worker bool joined; bool finished; killed_state killed; - void *parallel_scan_job; + /* + Per-worker copy of the manager's first non-const source table, opened + from the same TABLE_SHARE with in_use == this worker's thd. Gives the + worker a private handler so it can scan concurrently with the other + workers and the manager. Engines like InnoDB cache the THD pointer + (m_user_thd) at open time, so a shared handler with a swapped in_use is + not enough; each worker needs its own. Opened in init_parallel_workers, + closed in the worker thread before its THD is destroyed. + */ + TABLE *our_scan_table; + /* + This worker's single batch temporary table, built in the first non-const + source table's column format and created up front by + JOIN::create_parallel_workers_tmp_tables (one per worker, stored in + JOIN_TAB::parallel_tmp_tables). The worker reuses it for every chunk: fill + it with up to PWT_CHUNK_ROWS rows, hand it to the manager, block until the + manager has drained it, then truncate and refill. The worker and the + manager never touch it at the same time, so it needs no per-row locking. + */ + TABLE *batch_table; + /* + Hand-off flag for batch_table, guarded by pwt_management::LOCK_data. + The worker sets it true once the table is filled and ready for the manager; + the manager clears it once the table is drained, releasing the worker to + refill. See pwt_management::handoff_batch / the consumer read functions. + */ + bool batch_full; }; @@ -89,28 +119,74 @@ class pwt_management : public Sql_alloc pwt_worker *workers; uint nworkers; I_List parallel_messages; - mysql_cond_t COND_pwt_new_message; - mysql_mutex_t LOCK_pwt_manager; mysql_mutex_t LOCK_pwt_thread; THD *thd; + JOIN *join; /* Set under LOCK_pwt_thread when a worker fails to allocate a queued event. The manager surfaces a single ER_OUTOFMEMORY warning so the user sees that worker diagnostics were dropped instead of silently disappearing. */ bool messages_dropped; + + /* + Streaming channel. Each worker (producer) fills its single reused batch + table (batch_table) and hands it to the manager (single consumer) + by setting its batch_full flag; the manager drains the table from its first + join_tab read function and runs the rest of the join as the batches arrive, + instead of waiting for every worker to finish first. + + LOCK_data guards cur_worker, the workers' batch_full flags, active_workers + and the flags below. COND_data_avail wakes the consumer when a worker + fills its table or finishes; COND_data_space wakes a worker when the + manager has drained its table so it may refill. Because each worker owns + one table and blocks until it is drained, at most one batch per worker is + ever outstanding -- the single table is the natural backpressure bound. + EOF for the consumer is the state (no worker has batch_full set && + active_workers == 0). + */ + mysql_mutex_t LOCK_data; + mysql_cond_t COND_data_avail; + mysql_cond_t COND_data_space; + pwt_worker *cur_worker; // worker whose table the consumer drains + uint active_workers; // producers still running + bool fatal_error; // a producer hit a real engine error + bool stop; // consumer wants producers to stop + /* + Set once the workers have been stopped and pthread_join'd (quiesce_workers). + Workers read this join's source and batch tables, so they must be reaped + before JOIN::join_free()->cleanup() frees those tables; quiesce_workers is + therefore called from join_free, and again (idempotently) from finalize. + */ + bool reaped; + /* + Set (under LOCK_data) to a worker's killed_state when that worker exits + because it was killed -- e.g. a user KILL [QUERY] aimed at a parallel + worker. The consumer propagates it to the manager's own THD so the join + aborts with the right error (ER_QUERY_INTERRUPTED) before any result is + sent, rather than completing and trying to raise the error too late. + */ + killed_state kill_signal; + pwt_management(): workers(nullptr), nworkers(0), - messages_dropped(false) + messages_dropped(false), + cur_worker(nullptr), + active_workers(0), + fatal_error(false), + stop(false), + reaped(false), + kill_signal(NOT_KILLED) {} ~pwt_management() { - if (workers) - join_parallel_workers(current_thd); + finalize_parallel_workers(current_thd, join); } - bool init_parallel_workers(THD *thd); - void join_parallel_workers(THD *thd); + bool init_parallel_workers(THD *thd, JOIN *join); + void quiesce_workers(); + void finalize_parallel_workers(THD *thd, JOIN *join); + bool handoff_batch(pwt_worker *worker); void free_queue(); }; diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 376fde7b339b8..d29d20e73c80f 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -525,6 +525,7 @@ void JOIN::init(THD *thd_arg, List &fields_arg, spl_opt_info= 0; need_tmp= 0; hidden_group_fields= 0; /*safety*/ + parallel_scan_join_tab= nullptr; error= 0; select= 0; return_tab= 0; @@ -2825,6 +2826,14 @@ int JOIN::optimize_stage2() if (get_best_combination()) DBUG_RETURN(1); + /* + The access methods are now fixed and use_parallel_scan has been set on the + eligible full-scan tables: prune it to the single table we will actually + scan in parallel. + */ + if (thd->variables.parallel_worker_threads) + choose_parallel_scan(); + if (make_range_rowid_filters()) DBUG_RETURN(1); @@ -4374,6 +4383,116 @@ bool JOIN::make_aggr_tables_info() } +/** + @brief + Create one temporary scan table per parallel worker. + + Per-worker temporary tables for the parallel scan of the first non-const + join table. Each worker reuses its table to ship that table's rows to the + manager a batch at a time; the manager reads those rows back into the first + join_tab and drives the rest of the join itself. + + These tmp tables are therefore built in the *source table's* format -- one + column per field of the first table -- not in the result-projection format + of the post-join aggregation table. Only meaningful when that first table is + read by a full table scan (JT_ALL). +*/ + +bool JOIN::create_parallel_workers_tmp_tables(JOIN_TAB *tab) +{ + const uint n= thd->variables.parallel_worker_threads; + TABLE *src= (join_tab + const_tables)->table; + + List scan_fields; + for (Field **f= src->field; *f; f++) + { + Item *item= new (thd->mem_root) Item_field(thd, *f); + if (!item || scan_fields.push_back(item, thd->mem_root)) + return true; + } + + TMP_TABLE_PARAM *sparam= new (thd->mem_root) TMP_TABLE_PARAM; + if (!sparam) + return true; + sparam->init(); + count_field_types(select_lex, sparam, scan_fields, false); + sparam->skip_create_table= true; + + /* + Force the batch tables into the in-memory HEAP engine. With + tmp_memory_table_size == 0 Create_tmp_table::choose_engine would otherwise + pick the on-disk Aria engine; but a batch table is created and freed by the + manager thread while its rows are written by a worker thread, and the + temp-file size callback charges Aria file growth to whichever thread is + current (the worker), releasing it only when the file is deleted (on the + manager). That cross-thread split leaves the worker's + status_var.tmp_space_used non-zero at thread exit, tripping + THD::free_connection()'s assertion. A HEAP table creates no temp file, so + the callback never fires and the accounting stays balanced. BLOB/TEXT + tables are excluded from parallel scan, so batch rows are fixed-width and a + PWT_CHUNK_ROWS-sized batch stays comfortably in memory. + */ + ulonglong saved_tmp_memory_table_size= thd->variables.tmp_memory_table_size; + if (!saved_tmp_memory_table_size) + thd->variables.tmp_memory_table_size= thd->variables.max_heap_table_size; + + bool error= false; + tab->parallel_tmp_tables.init(PSI_INSTRUMENT_MEM); + for (uint i= 0; i < n; i++) + { + TABLE* tmp_table= create_tmp_table(thd, sparam, scan_fields, + nullptr, false, false, + select_options, HA_POS_ERROR, + &empty_clex_str, true, false); + if (!tmp_table) + { + error= true; + break; + } + tmp_table->reginfo.join_tab= tab; + tab->parallel_tmp_tables.append(tmp_table); + // instantiate this now so the parallel workers can write to it + if (instantiate_tmp_table(tmp_table, sparam->keyinfo, + sparam->start_recinfo, + &sparam->recinfo, + select_options, + true /*cross_thread*/)) + { + error= true; + break; + } + } + + thd->variables.tmp_memory_table_size= saved_tmp_memory_table_size; + if (error) + { + /* + Our caller just returns on failure without reaching create_postjoin_ + aggr_table's err: cleanup, so free the batch tables we already created + here rather than leak them. + */ + free_parallel_tmp_tables(tab); + return true; + } + parallel_scan_join_tab= tab; + + return false; +} + + +/* + Free the per-worker batch tables created by create_parallel_workers_tmp_tables + and reset the array. Safe to call when none were created (no-op) and safe to + call more than once. +*/ + +void JOIN::free_parallel_tmp_tables(JOIN_TAB *tab) +{ + while (tab->parallel_tmp_tables.elements()) + free_tmp_table(thd, tab->parallel_tmp_tables.pop()); + tab->parallel_tmp_tables.free_memory(); +} + bool JOIN::create_postjoin_aggr_table(JOIN_TAB *tab, List *table_fields, @@ -4407,13 +4526,37 @@ JOIN::create_postjoin_aggr_table(JOIN_TAB *tab, List *table_fields, if (tmp_table_keep_current_rowid) add_fields_for_current_rowid(tab, table_fields); tab->tmp_table_param->skip_create_table= true; + + /* + When the first non-const table is read by a full scan (JT_ALL) and parallel + workers are enabled, create one per-worker batch table now. At execution + JOIN::exec spins up workers that stream that table's rows through those + batch tables into the first join_tab instead of the manager scanning it + itself. (create_parallel_workers_tmp_tables sets parallel_scan_join_tab.) + */ + if (!parallel_scan_join_tab && + thd->variables.parallel_worker_threads && + top_join_tab_count > const_tables && + (join_tab + const_tables)->use_parallel_scan && + (join_tab + const_tables)->type == JT_ALL) + { + if (create_parallel_workers_tmp_tables(tab)) + DBUG_RETURN(true); + } + TABLE* table= create_tmp_table(thd, tab->tmp_table_param, *table_fields, table_group, distinct, save_sum_fields, select_options, table_rows_limit, &empty_clex_str, true, keep_row_order); if (!table) - DBUG_RETURN(true); + goto err; + // instantiate this now so the parallel worker manager can write to it + if (instantiate_tmp_table(table, tab->tmp_table_param->keyinfo, + tab->tmp_table_param->start_recinfo, + &tab->tmp_table_param->recinfo, + select_options)) + goto err; tmp_table_param.using_outer_summary_function= tab->tmp_table_param->using_outer_summary_function; tab->join= this; @@ -4479,6 +4622,12 @@ JOIN::create_postjoin_aggr_table(JOIN_TAB *tab, List *table_fields, if (table != NULL) free_tmp_table(thd, table); tab->table= NULL; + /* + On every path that reaches here tab->aggr is not set yet, so JOIN::cleanup + will not free the per-worker batch tables -- release them here. No-op when + parallel scan was not used for this query. + */ + free_parallel_tmp_tables(tab); DBUG_RETURN(true); } @@ -4912,14 +5061,20 @@ int JOIN::exec() dbug_serve_apcs(thd, 1); ); - // If we are a top level select statement - // TEMPORARY-PARALLEL-WORK-TEST: - if (!select_lex->outer_select() && thd->lex->sql_command == SQLCOM_SELECT && + /* + If we need worker threads, are a top level select statement and + we haven't initialized our parallel work manager + */ + if (thd->variables.parallel_worker_threads && + !select_lex->outer_select() && + thd->lex->sql_command == SQLCOM_SELECT && !parallel_work_manager) { if (!(parallel_work_manager= new (thd->mem_root) pwt_management) || - parallel_work_manager->init_parallel_workers(thd)) + parallel_work_manager->init_parallel_workers(thd, this)) { + delete parallel_work_manager; + parallel_work_manager= NULL; my_error(ER_INTERNAL_ERROR, MYF(0), "Failed to initialize parallel work mgr"); return 1; @@ -4930,6 +5085,18 @@ int JOIN::exec() res= exec_inner(); ANALYZE_STOP_TRACKING(thd, &explain->time_tracker); + /* + exec_inner() drove the join by consuming the worker channel. Stop the + workers, reap them and surface any worker diagnostics, then delete the + manager. + */ + if (parallel_work_manager) + { + parallel_work_manager->finalize_parallel_workers(thd, this); + delete parallel_work_manager; + parallel_work_manager= NULL; + } + DBUG_EXECUTE_IF("show_explain_probe_join_exec_end", if (dbug_user_var_equals_int(thd, "show_explain_probe_select_id", @@ -13471,7 +13638,33 @@ bool JOIN::get_best_combination() j->index= cur_pos->forced_index; } else + { j->type= JT_ALL; + /* + Parallel workers ship each scanned row to the manager by value + through a per-worker batch table. BLOB/TEXT columns (and the other + blob-backed types -- GEOMETRY, JSON) keep their payload in memory off + the record buffer, so they are not safely reproduced on the manager + side. Exclude any table that has such a column, alongside internal/ + temporary tables, from parallel scan. (s->blob_fields counts every + blob-backed column.) + + Also exclude the MERGE engine (ha_myisammrg): a worker opens its own + handler straight from the TABLE_SHARE, which does not attach the + MERGE children, so ha_myisammrg::rnd_init() asserts. + + Also exclude fulltext-searched tables: a MATCH ... AGAINST relevance + is not a stored column but is derived from the handler's fulltext + state for the current row, which the injected row images do not carry, + so the relevance would be computed incorrectly. + */ + if (thd->variables.parallel_worker_threads && + j->table->s->tmp_table == NO_TMP_TABLE && + j->table->s->blob_fields == 0 && + j->table->file->ht->db_type != DB_TYPE_MRG_MYISAM && + !j->table->fulltext_searched) + j->use_parallel_scan= true; + } if (cur_pos->use_join_buffer && tablenr != const_tables) full_join= 1; @@ -16593,6 +16786,7 @@ make_join_readinfo(JOIN *join, ulonglong options, uint no_jbuf_after) tab->read_first_record= join_read_first; /* Read with index_first / index_next */ tab->type= tab->type == JT_ALL ? JT_NEXT : JT_HASH_NEXT; + tab->use_parallel_scan= false; } } if (have_quick_select && @@ -17262,6 +17456,16 @@ void JOIN::join_free() bool can_unlock= full; DBUG_ENTER("JOIN::join_free"); + /* + Parallel-scan workers read this join's source and batch tables. cleanup() + below frees those tables, so the workers must be stopped and reaped first + -- otherwise a worker that has not yet seen the stop request dereferences a + freed table->file. On normal completion the workers have already finished, + so this is a cheap no-op. + */ + if (parallel_work_manager) + parallel_work_manager->quiesce_workers(); + cleanup(full); for (tmp_unit= select_lex->first_inner_unit(); @@ -17333,16 +17537,6 @@ void JOIN::cleanup(bool full) if (full) have_query_plan= QEP_DELETED; - // TEMPORARY-PARALLEL-WORK-TEST: - if (parallel_work_manager) - { - // Finish work and delete the manager - if (parallel_work_manager->workers) - parallel_work_manager->join_parallel_workers(thd); - delete parallel_work_manager; - parallel_work_manager= NULL; - } - if (original_join_tab) { /* Free the original optimized join created for the group_by_handler */ @@ -17387,6 +17581,7 @@ void JOIN::cleanup(bool full) { free_tmp_table(thd, curr_tab->table); curr_tab->table= NULL; + free_parallel_tmp_tables(curr_tab); delete curr_tab->tmp_table_param; curr_tab->tmp_table_param= NULL; curr_tab->aggr= NULL; @@ -23337,12 +23532,13 @@ bool Virtual_tmp_table::check_assignability_from(const TABLE &table, } -bool open_tmp_table(TABLE *table) +bool open_tmp_table(TABLE *table, bool cross_thread) { int error; if (unlikely((error= table->file->ha_open(table, table->s->path.str, O_RDWR, HA_OPEN_TMP_TABLE | - HA_OPEN_INTERNAL_TABLE | + (cross_thread? 0 : + HA_OPEN_INTERNAL_TABLE) | HA_OPEN_SIZE_TRACKING)))) { table->file->print_error(error, MYF(0)); /* purecov: inspected */ @@ -24349,7 +24545,8 @@ do_select(JOIN *join, Procedure *procedure) bool instantiate_tmp_table(TABLE *table, KEY *keyinfo, TMP_ENGINE_COLUMNDEF *start_recinfo, TMP_ENGINE_COLUMNDEF **recinfo, - ulonglong options) + ulonglong options, + bool cross_thread) { DBUG_ASSERT(table->s->keys == 0 || table->key_info == keyinfo); DBUG_ASSERT(table->s->keys <= 1); @@ -24367,7 +24564,7 @@ bool instantiate_tmp_table(TABLE *table, KEY *keyinfo, empty_record(table); table->status= STATUS_NO_RECORD; } - if (open_tmp_table(table)) + if (open_tmp_table(table, cross_thread)) return TRUE; return FALSE; @@ -31220,6 +31417,7 @@ bool JOIN_TAB::save_explain_data(Explain_table_access *eta, tab_type= type == JT_HASH ? JT_HASH_RANGE : JT_RANGE; } eta->type= tab_type; + eta->use_parallel_scan= use_parallel_scan; /* Build "possible_keys" value */ // psergey-todo: why does this use thd MEM_ROOT??? Doesn't this diff --git a/sql/sql_select.h b/sql/sql_select.h index 0b6b0cfdb8156..7d7522955d5ab 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -253,6 +253,14 @@ class SplM_opt_info; typedef struct st_join_table { TABLE *table; + /* + One batch temporary table per parallel worker, built in this (first + non-const) table's column format and created up front by + JOIN::create_parallel_workers_tmp_tables. Each worker reuses its table to + ship scanned source rows to the manager a batch at a time. Empty unless + this is the parallel-scan source table. + */ + Dynamic_array parallel_tmp_tables; TABLE_LIST *tab_list; KEYUSE *keyuse; /**< pointer to first used key */ KEY *hj_key; /**< descriptor of the used best hash join key @@ -429,6 +437,7 @@ typedef struct st_join_table { bool shortcut_for_distinct; bool sorted; bool cached_pfs_batch_update; + bool use_parallel_scan; /* If it's not 0 the number stored this field indicates that the index @@ -1617,6 +1626,13 @@ class JOIN :public Sql_alloc bool need_tmp; bool hidden_group_fields; + /* + First non-const join_tab whose table is read by a full scan (JT_ALL) and + is therefore eligible for parallel scanning. When set (and + parallel_worker_threads > 0) JOIN::exec spins up workers that stream that + table's rows to the manager through the pwt_management channel. + */ + JOIN_TAB *parallel_scan_join_tab; /* TRUE if there was full cleanup of the JOIN */ bool cleaned; DYNAMIC_ARRAY keyuse; @@ -1788,6 +1804,7 @@ class JOIN :public Sql_alloc int optimize_inner(); int optimize_stage2(); int optimize_stage2_and_finish(); + void choose_parallel_scan(); bool build_explain(); int reinit(); int init_execution(); @@ -1985,6 +2002,9 @@ class JOIN :public Sql_alloc */ void optimize_distinct(); + bool create_parallel_workers_tmp_tables(JOIN_TAB *join_tab); + void free_parallel_tmp_tables(JOIN_TAB *join_tab); + void cleanup_item_list(List &items) const; bool add_having_as_table_cond(JOIN_TAB *tab); bool make_aggr_tables_info(); @@ -2693,8 +2713,9 @@ bool create_internal_tmp_table(TABLE *table, KEY *keyinfo, bool instantiate_tmp_table(TABLE *table, KEY *keyinfo, TMP_ENGINE_COLUMNDEF *start_recinfo, TMP_ENGINE_COLUMNDEF **recinfo, - ulonglong options); -bool open_tmp_table(TABLE *table); + ulonglong options, + bool cross_thread= false); +bool open_tmp_table(TABLE *table, bool cross_thread= false); void fix_list_after_tbl_changes(SELECT_LEX *new_parent, List *tlist); void optimize_keyuse(JOIN *join, DYNAMIC_ARRAY *keyuse_array); bool sort_and_filter_keyuse(JOIN *join, DYNAMIC_ARRAY *keyuse, diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index c2580ad215f55..76975c9e7c1bb 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2497,13 +2497,14 @@ Sys_slave_parallel_workers( static Sys_var_on_access_global + PRIV_SET_SYSTEM_GLOBAL_VAR_PARALLEL_WORKER_THREADS> Sys_parallel_worker_threads( "parallel_worker_threads", "Number of worker threads available for parallel query execution. " "0 means parallel execution is disabled", SESSION_VAR(parallel_worker_threads), CMD_LINE(REQUIRED_ARG), VALID_RANGE(0,100), DEFAULT(0), BLOCK_SIZE(1)); +/* VALID_RANGE(0,100), DEFAULT(1), BLOCK_SIZE(1)); // testing 123 */ static bool