diff --git a/include/my_pthread.h b/include/my_pthread.h index 6fe76c40cba54..0d64090701a05 100644 --- a/include/my_pthread.h +++ b/include/my_pthread.h @@ -624,6 +624,20 @@ typedef uint64 my_thread_id; */ #define MY_THREAD_ID_MAX UINT32_MAX +#ifdef _WIN32 +#define MAX_THREAD_NAME 256 +#elif defined(__linux__) +#define MAX_THREAD_NAME 16 +#elif defined(__FreeBSD__) || defined(__OpenBSD__) +#define MAX_THREAD_NAME 19 +#include +#elif defined(__apple_build_version__) +#include +#define MAX_THREAD_NAME MAXTHREADNAMESIZE +#else +#define MAX_THREAD_NAME 16 +#endif + extern void my_threadattr_global_init(void); extern my_bool my_thread_global_init(void); extern void my_thread_set_name(const char *); diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt index f59354625b345..14f7a12381db9 100644 --- a/libmysqld/CMakeLists.txt +++ b/libmysqld/CMakeLists.txt @@ -72,6 +72,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc ../sql/mf_iocache.cc ../sql/my_decimal.cc ../sql/net_serv.cc ../sql/opt_range.cc ../sql/opt_group_by_cardinality.cc + ../sql/sql_parallel_workers.cc ../sql/opt_rewrite_date_cmp.cc ../sql/opt_rewrite_remove_casefold.cc ../sql/opt_sargable_left.cc diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result index 17458f8cf56ba..c5314ead01525 100644 --- a/mysql-test/main/mysqld--help.result +++ b/mysql-test/main/mysqld--help.result @@ -1012,6 +1012,9 @@ The following specify which files/extra groups are read (specified before remain Cost of checking the row against the WHERE clause. Increasing this will have the optimizer to prefer plans with less row combinations + --parallel-worker-threads=# + Number of worker threads available for parallel query + execution. 0 means parallel execution is disabled --path=name Comma-separated list of schema names that defines the search order for stored routines --performance-schema @@ -2004,6 +2007,7 @@ optimizer-trace optimizer-trace-max-mem-size 1048576 optimizer-use-condition-selectivity 4 optimizer-where-cost 0.032 +parallel-worker-threads 0 path CURRENT_SCHEMA performance-schema FALSE performance-schema-accounts-size -1 diff --git a/mysql-test/main/parallel_query.result b/mysql-test/main/parallel_query.result new file mode 100644 index 0000000000000..a074957f5d199 --- /dev/null +++ b/mysql-test/main/parallel_query.result @@ -0,0 +1,84 @@ +SET DEBUG_SYNC='RESET'; +# +# MDEV-39492 Parallel Query: Study how to create worker threads +# +create table t1 select seq from seq_1_to_2; +set session parallel_worker_threads=3; +# we should currently see 3 warnings; +select SQL_BUFFER_RESULT seq from t1; +seq +1 +2 +1 +2 +1 +2 +set session parallel_worker_threads=10; +select SQL_BUFFER_RESULT seq from t1; +seq +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +1 +2 +set session parallel_worker_threads=0; +connect killee, localhost, root, , ; +# check that kill query on a parallel worker is passed to the manager +connection killee; +set session parallel_worker_threads=3; +SET @save_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; +select SQL_BUFFER_RESULT seq from t1;; +# now use the default connection to view/kill the thread group executing +# the parallel work +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +# wait for all 3 workers to hit our debug sync point +# then have a look at what is in our process list without thread ID +select USER, DB, STATE, SUBSTRING(INFO, 1, 32) +from information_schema.processlist +where (INFO regexp '.*parallel worker.*' or +STATE regexp '.*parallel worker.*') and +NOT INFO regexp '.*information_schema\.processlist.*'; +USER DB STATE SUBSTRING(INFO, 1, 32) +root test debug sync point: now Parallel Worker 3 For Thread ID +root test debug sync point: now Parallel Worker 2 For Thread ID +root test debug sync point: now Parallel Worker 1 For Thread ID +root test Reading data from parallel workers select SQL_BUFFER_RESULT seq fro +kill query ID; +# signal our workers to continue execution +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +# then wait for the manager thread to clean up and go back to sleep +# review error message on --reap +connection killee; +ERROR 70100: Query execution was interrupted +SET DEBUG_SYNC = "RESET"; +# save as above, but kill a worker with a simple kill and see the +# connection drop +select SQL_BUFFER_RESULT seq from t1;; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +kill ID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +connection killee; +ERROR 70100: Query execution was interrupted +# killee connection now dead, confirmed below +connection default; +drop table t1; +SET GLOBAL debug_dbug = @save_dbug; +SET DEBUG_SYNC = "RESET"; diff --git a/mysql-test/main/parallel_query.test b/mysql-test/main/parallel_query.test new file mode 100644 index 0000000000000..54a7b1b354bdd --- /dev/null +++ b/mysql-test/main/parallel_query.test @@ -0,0 +1,108 @@ +# +# Test KILL and KILL QUERY statements. +# + +-- source include/count_sessions.inc +-- source include/not_embedded.inc +# this will be used in the future -- source include/have_innodb.inc +-- source include/have_sequence.inc +--source include/have_debug.inc +--source include/have_debug_sync.inc +SET DEBUG_SYNC='RESET'; + +--disable_service_connection + +--echo # +--echo # MDEV-39492 Parallel Query: Study how to create worker threads +--echo # + +create table t1 select seq from seq_1_to_2; + +set session parallel_worker_threads=3; +--echo # we should currently see 3 warnings; +select SQL_BUFFER_RESULT seq from t1; + +set session parallel_worker_threads=10; +select SQL_BUFFER_RESULT seq from t1; + +set session parallel_worker_threads=0; +connect (killee, localhost, root, , ); + +--echo # check that kill query on a parallel worker is passed to the manager + +--connection killee +let $id= `select connection_id()`; +set session parallel_worker_threads=3; + +# worker THDs don't inherit session vars, we are using a debug_sync_set_action +# to pause execution in our worker threads just after they have done something + +SET @save_dbug= @@global.debug_dbug; +SET GLOBAL debug_dbug = "+d,pwt_worker_pause_before_signal"; + +--send select SQL_BUFFER_RESULT seq from t1; +--echo # now use the default connection to view/kill the thread group executing +--echo # the parallel work +--connection default +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +let $name= "Parallel Worker 1 For Thread ID $id"; +let $killID= `SELECT @kid:=ID from information_schema.processlist + where info like $name limit 1`; +--echo # wait for all 3 workers to hit our debug sync point +let $wait_condition= + select count(*) = 3 from information_schema.processlist + where state like 'debug sync%' and info like '%Thread ID $id'; +--source include/wait_condition.inc +--echo # then have a look at what is in our process list without thread ID +select USER, DB, STATE, SUBSTRING(INFO, 1, 32) + from information_schema.processlist + where (INFO regexp '.*parallel worker.*' or + STATE regexp '.*parallel worker.*') and + NOT INFO regexp '.*information_schema\.processlist.*'; +--replace_result $killID ID +eval kill query $killID; +--echo # signal our workers to continue execution +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +--echo # then wait for the manager thread to clean up and go back to sleep +let $wait_condition= + select count(*) = 1 from information_schema.processlist + where command = 'Sleep' and id = $id; +--source include/wait_condition.inc +#select * from information_schema.processlist; +--echo # review error message on --reap +--connection killee +--error ER_QUERY_INTERRUPTED +reap; + +SET DEBUG_SYNC = "RESET"; + +--echo # save as above, but kill a worker with a simple kill and see the +--echo # connection drop + +--send select SQL_BUFFER_RESULT seq from t1; +connection default; +SET DEBUG_SYNC='now WAIT_FOR pwt_worker_paused'; +let $name= "Parallel Worker 1 For Thread ID $id"; +let $killID= `SELECT @kid:=ID from information_schema.processlist + where info like $name limit 1`; +--replace_result $killID ID +eval kill $killID; +SET DEBUG_SYNC = "now SIGNAL pwt_worker_continue"; +let $wait_condition= + select count(*) = 0 from information_schema.processlist + where command = 'Sleep' and id = $id; +--source include/wait_condition.inc +connection killee; +--disable_warnings +--error ER_QUERY_INTERRUPTED +reap; + +--echo # killee connection now dead, confirmed below + +connection default; + +drop table t1; +SET GLOBAL debug_dbug = @save_dbug; +SET DEBUG_SYNC = "RESET"; + +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/main/parallel_query_excluded.result b/mysql-test/main/parallel_query_excluded.result new file mode 100644 index 0000000000000..f0a85a7c27424 --- /dev/null +++ b/mysql-test/main/parallel_query_excluded.result @@ -0,0 +1,54 @@ +SET @save_optimizer_trace= @@optimizer_trace; +SET optimizer_trace='enabled=on'; +CREATE TABLE t_plain (a INT); +INSERT INTO t_plain VALUES (1),(2),(3); +CREATE TABLE t_blob (a INT, b BLOB); +INSERT INTO t_blob VALUES (1,'x'),(2,'y'),(3,'z'); +CREATE TABLE t_text (a INT, b TEXT); +INSERT INTO t_text VALUES (1,'x'),(2,'y'),(3,'z'); +CREATE TABLE t_m1 (a INT) ENGINE=MyISAM; +INSERT INTO t_m1 VALUES (1),(2),(3); +CREATE TABLE t_m2 (a INT) ENGINE=MyISAM; +INSERT INTO t_m2 VALUES (4),(5),(6); +CREATE TABLE t_merge (a INT) ENGINE=MERGE UNION=(t_m1,t_m2) INSERT_METHOD=LAST; +CREATE TABLE t_ft (a INT, b VARCHAR(64), FULLTEXT(b)) ENGINE=MyISAM; +INSERT INTO t_ft VALUES (1,'alpha beta'),(2,'gamma delta'),(3,'epsilon zeta'); +set session parallel_worker_threads=2; +# plain table: chosen for parallel scan (expect 1) +SELECT a FROM t_plain; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +1 +# BLOB column present: excluded, not chosen (expect 0) +SELECT a FROM t_blob; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +0 +# TEXT column present: excluded, not chosen (expect 0) +SELECT a FROM t_text; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +0 +# MERGE engine: excluded, not chosen (expect 0) +SELECT a FROM t_merge; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +0 +# fulltext-searched (MATCH in select list): excluded, not chosen (expect 0) +SELECT a, MATCH(b) AGAINST('alpha') FROM t_ft; +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL +AS chosen_for_parallel_scan +FROM information_schema.optimizer_trace; +chosen_for_parallel_scan +0 +DROP TABLE t_plain, t_blob, t_text, t_merge, t_m1, t_m2, t_ft; +SET optimizer_trace= @save_optimizer_trace; +set session parallel_worker_threads=default; diff --git a/mysql-test/main/parallel_query_excluded.test b/mysql-test/main/parallel_query_excluded.test new file mode 100644 index 0000000000000..e90c29d2e0003 --- /dev/null +++ b/mysql-test/main/parallel_query_excluded.test @@ -0,0 +1,80 @@ +# +# MDEV-39492 Parallel Query: tables that cannot be shipped row-by-row through a +# worker batch table are excluded from parallel worker scan. +# +# JOIN::choose_parallel_scan() records the table picked for the parallel scan +# in the optimizer trace as "chosen_for_parallel_scan". An ordinary table is +# chosen; a table is rejected by the use_parallel_scan gate when it has a +# BLOB/TEXT (blob-backed) column, uses the MERGE engine, or is fulltext-searched +# (a MATCH ... AGAINST refers to it). We assert presence/absence of that trace +# key (1/0) rather than the temporary row-multiplication behaviour. +# +--source include/not_embedded.inc + +SET @save_optimizer_trace= @@optimizer_trace; +SET optimizer_trace='enabled=on'; + +CREATE TABLE t_plain (a INT); +INSERT INTO t_plain VALUES (1),(2),(3); +CREATE TABLE t_blob (a INT, b BLOB); +INSERT INTO t_blob VALUES (1,'x'),(2,'y'),(3,'z'); +CREATE TABLE t_text (a INT, b TEXT); +INSERT INTO t_text VALUES (1,'x'),(2,'y'),(3,'z'); + +# MERGE table over two MyISAM children +CREATE TABLE t_m1 (a INT) ENGINE=MyISAM; +INSERT INTO t_m1 VALUES (1),(2),(3); +CREATE TABLE t_m2 (a INT) ENGINE=MyISAM; +INSERT INTO t_m2 VALUES (4),(5),(6); +CREATE TABLE t_merge (a INT) ENGINE=MERGE UNION=(t_m1,t_m2) INSERT_METHOD=LAST; + +# fulltext: MATCH in the select list keeps a full table scan and marks the +# table fulltext-searched +CREATE TABLE t_ft (a INT, b VARCHAR(64), FULLTEXT(b)) ENGINE=MyISAM; +INSERT INTO t_ft VALUES (1,'alpha beta'),(2,'gamma delta'),(3,'epsilon zeta'); + +set session parallel_worker_threads=2; + +--echo # plain table: chosen for parallel scan (expect 1) +--disable_result_log +SELECT a FROM t_plain; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +--echo # BLOB column present: excluded, not chosen (expect 0) +--disable_result_log +SELECT a FROM t_blob; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +--echo # TEXT column present: excluded, not chosen (expect 0) +--disable_result_log +SELECT a FROM t_text; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +--echo # MERGE engine: excluded, not chosen (expect 0) +--disable_result_log +SELECT a FROM t_merge; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +--echo # fulltext-searched (MATCH in select list): excluded, not chosen (expect 0) +--disable_result_log +SELECT a, MATCH(b) AGAINST('alpha') FROM t_ft; +--enable_result_log +SELECT JSON_EXTRACT(trace, '$**.chosen_for_parallel_scan') IS NOT NULL + AS chosen_for_parallel_scan + FROM information_schema.optimizer_trace; + +DROP TABLE t_plain, t_blob, t_text, t_merge, t_m1, t_m2, t_ft; +SET optimizer_trace= @save_optimizer_trace; +set session parallel_worker_threads=default; diff --git a/mysql-test/main/parallel_query_oom.result b/mysql-test/main/parallel_query_oom.result new file mode 100644 index 0000000000000..573bbe3df23bc --- /dev/null +++ b/mysql-test/main/parallel_query_oom.result @@ -0,0 +1,24 @@ +# +# MDEV-39492 Parallel Query: OOM in worker error_to_queue surfaces a +# single ER_OUTOFMEMORY warning so worker diagnostics aren't silently +# lost. +# +set @save_dbug=@@global.debug_dbug; +set global debug_dbug="+d,pwt_error_to_queue_oom"; +set session parallel_worker_threads=1; +# The prototype worker emits either a warning or my_error() depending on the +# parity of its thread id. Either path runs through error_to_queue; the +# DBUG injection forces both into the OOM branch, so neither the original +# warning nor the original error reaches the user. We expect just the +# manager-surfaced ER_OUTOFMEMORY warning. +create table t1 select seq from seq_1_to_2; +select SQL_BUFFER_RESULT * from t1; +seq +1 +2 +show warnings; +Level Code Message +Warning 1037 Parallel worker diagnostics were dropped due to memory allocation failure +drop table t1; +set global debug_dbug=@save_dbug; +set session parallel_worker_threads=default; diff --git a/mysql-test/main/parallel_query_oom.test b/mysql-test/main/parallel_query_oom.test new file mode 100644 index 0000000000000..739accefd40d8 --- /dev/null +++ b/mysql-test/main/parallel_query_oom.test @@ -0,0 +1,35 @@ +# +# Test OOM handling in the parallel worker error/warning queue. +# + +-- source include/count_sessions.inc +-- source include/not_embedded.inc +-- source include/have_sequence.inc +-- source include/have_debug.inc + +--disable_service_connection + +--echo # +--echo # MDEV-39492 Parallel Query: OOM in worker error_to_queue surfaces a +--echo # single ER_OUTOFMEMORY warning so worker diagnostics aren't silently +--echo # lost. +--echo # + +set @save_dbug=@@global.debug_dbug; +set global debug_dbug="+d,pwt_error_to_queue_oom"; +set session parallel_worker_threads=1; + +--echo # The prototype worker emits either a warning or my_error() depending on the +--echo # parity of its thread id. Either path runs through error_to_queue; the +--echo # DBUG injection forces both into the OOM branch, so neither the original +--echo # warning nor the original error reaches the user. We expect just the +--echo # manager-surfaced ER_OUTOFMEMORY warning. +create table t1 select seq from seq_1_to_2; +select SQL_BUFFER_RESULT * from t1; +show warnings; +drop table t1; + +set global debug_dbug=@save_dbug; +set session parallel_worker_threads=default; + +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result index 268c06632b8b4..573cb113d9efe 100644 --- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result +++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result @@ -2992,6 +2992,16 @@ NUMERIC_BLOCK_SIZE NULL ENUM_VALUE_LIST NULL READ_ONLY NO COMMAND_LINE_ARGUMENT REQUIRED +VARIABLE_NAME PARALLEL_WORKER_THREADS +VARIABLE_SCOPE SESSION +VARIABLE_TYPE BIGINT UNSIGNED +VARIABLE_COMMENT Number of worker threads available for parallel query execution. 0 means parallel execution is disabled +NUMERIC_MIN_VALUE 0 +NUMERIC_MAX_VALUE 100 +NUMERIC_BLOCK_SIZE 1 +ENUM_VALUE_LIST NULL +READ_ONLY NO +COMMAND_LINE_ARGUMENT REQUIRED VARIABLE_NAME PATH VARIABLE_SCOPE SESSION VARIABLE_TYPE VARCHAR diff --git a/mysys/my_thread_name.cc b/mysys/my_thread_name.cc index 9f32dae269ab0..90418c5cd8825 100644 --- a/mysys/my_thread_name.cc +++ b/mysys/my_thread_name.cc @@ -20,12 +20,7 @@ #include #ifdef _WIN32 -#define MAX_THREAD_NAME 256 typedef HRESULT (*func_SetThreadDescription)(HANDLE,PCWSTR); -#elif defined(__linux__) -#define MAX_THREAD_NAME 16 -#elif defined(__FreeBSD__) || defined(__OpenBSD__) -#include #endif #if defined(HAVE_PSI_THREAD_INTERFACE) && !defined DBUG_OFF diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index e54e894e1d0fc..5ede64143d08a 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -115,6 +115,7 @@ SET (SQL_SOURCE ../sql-common/client_plugin.c opt_range.cc vector_mhnsw.cc opt_group_by_cardinality.cc + sql_parallel_workers.cc opt_rewrite_date_cmp.cc opt_rewrite_remove_casefold.cc opt_sargable_left.cc diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 1aa5e0cfe7ddf..67293cd2553e4 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -9911,6 +9911,7 @@ PSI_stage_info stage_starting= { 0, "starting", 0}; PSI_stage_info stage_waiting_for_flush= { 0, "Waiting for non trans tables to be flushed", 0}; PSI_stage_info stage_waiting_for_ddl= { 0, "Waiting for DDLs", 0}; PSI_stage_info stage_waiting_for_reset_master= { 0, "Waiting for a running RESET MASTER to complete", 0}; +PSI_stage_info stage_reading_data_from_parallel_worker= { 0, "Reading data from parallel workers", 0}; #ifdef WITH_WSREP // Additional Galera thread states @@ -10003,6 +10004,10 @@ PSI_memory_key key_memory_user_var_entry_value; PSI_memory_key key_memory_String_value; PSI_memory_key key_memory_WSREP; PSI_memory_key key_memory_trace_ddl_info; +PSI_memory_key key_memory_pwt_queued_event; +PSI_memory_key key_memory_pwt_error_message; +PSI_memory_key key_memory_pwt_workers; +PSI_memory_key key_memory_pwt_db; #ifdef HAVE_PSI_INTERFACE @@ -10144,7 +10149,8 @@ PSI_stage_info *all_server_stages[]= & stage_reading_semi_sync_ack, & stage_waiting_for_deadlock_kill, & stage_starting, - & stage_waiting_for_reset_master + & stage_waiting_for_reset_master, + & stage_reading_data_from_parallel_worker #ifdef WITH_WSREP , & stage_waiting_isolation, @@ -10256,6 +10262,9 @@ static PSI_memory_info all_server_memory[]= { &key_memory_trace_ddl_info, "TRACE_DDL_INFO", 0} }; + +extern void pwt_init_psi_keys(void); + /** Initialise all the performance schema instrumentation points used by the server. @@ -10342,6 +10351,7 @@ void init_server_psi_keys(void) stmt_info_rpl.m_flags= PSI_FLAG_MUTABLE; mysql_statement_register(category, &stmt_info_rpl, 1); #endif + pwt_init_psi_keys(); } #endif /* HAVE_PSI_INTERFACE */ diff --git a/sql/mysqld.h b/sql/mysqld.h index 9ee5bc33740e2..1264b0d4904a2 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -499,6 +499,10 @@ extern PSI_memory_key key_memory_Table_trigger_dispatcher; extern PSI_memory_key key_memory_native_functions; extern PSI_memory_key key_memory_WSREP; extern PSI_memory_key key_memory_trace_ddl_info; +extern PSI_memory_key key_memory_pwt_queued_event; +extern PSI_memory_key key_memory_pwt_error_message; +extern PSI_memory_key key_memory_pwt_workers; +extern PSI_memory_key key_memory_pwt_db; /* MAINTAINER: Please keep this list in order, to limit merge collisions. @@ -646,6 +650,7 @@ extern PSI_stage_info stage_slave_background_wait_request; extern PSI_stage_info stage_waiting_for_deadlock_kill; extern PSI_stage_info stage_starting; extern PSI_stage_info stage_waiting_for_reset_master; +extern PSI_stage_info stage_reading_data_from_parallel_worker; #ifdef WITH_WSREP // Additional Galera thread states extern PSI_stage_info stage_waiting_isolation; diff --git a/sql/privilege.h b/sql/privilege.h index b3791a8e3a2f5..80d9d3b80f254 100644 --- a/sql/privilege.h +++ b/sql/privilege.h @@ -452,6 +452,8 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MAX_CONNECT_ERRORS= // Was SUPER_ACL prior to 10.5.2 constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_MAX_PASSWORD_ERRORS= CONNECTION_ADMIN_ACL; +constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_PARALLEL_WORKER_THREADS= + CONNECTION_ADMIN_ACL; // Was SUPER_ACL prior to 10.5.2 constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_PROXY_PROTOCOL_NETWORKS= CONNECTION_ADMIN_ACL; diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 5423eb393573d..fa6068a382d09 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -714,7 +714,7 @@ const char *thd_where(THD *thd) THD::THD(my_thread_id id, bool is_wsrep_applier) :Statement(&main_lex, &main_mem_root, STMT_CONVENTIONAL_EXECUTION, /* statement id */ 0), - rli_fake(0), rgi_fake(0), rgi_slave(NULL), + rli_fake(0), rgi_fake(0), rgi_slave(NULL), pwt_worker_info(NULL), protocol_text(this), protocol_binary(this), initial_status_var(0), m_current_stage_key(0), m_psi(0), start_time(0), start_time_sec_part(0), in_sub_stmt(0), log_all_errors(0), @@ -5462,8 +5462,7 @@ void destroy_thd(MYSQL_THD thd) /** Create a THD that only has auxiliary functions - It will never be added to the global connection list - server_threads. It does not represent any client connection. + It does not represent any client connection. It should never be counted, because it will stall the shutdown. It is solely for engine's internal use, diff --git a/sql/sql_class.h b/sql/sql_class.h index 9897df33766b9..d34ce97369622 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -872,6 +872,7 @@ typedef struct system_variables ulong server_id; ulong session_track_transaction_info; ulong threadpool_priority; + ulong parallel_worker_threads; ulong vers_alter_history; /* deadlock detection */ @@ -3154,7 +3155,7 @@ enum class THD_WHERE const char *thd_where(THD *thd); - +class pwt_worker; /** @class THD @@ -3199,6 +3200,8 @@ class THD: public THD_count, /* this must be first */ /* Slave applier execution context */ rpl_group_info* rgi_slave; + pwt_worker *pwt_worker_info; + union { rpl_io_thread_info *rpl_io_info; rpl_sql_thread_info *rpl_sql_info; diff --git a/sql/sql_explain.cc b/sql/sql_explain.cc index ba79a2b78fa6d..0d37f2f13fcda 100644 --- a/sql/sql_explain.cc +++ b/sql/sql_explain.cc @@ -1560,7 +1560,12 @@ int Explain_table_access::print_explain(select_result_sink *output, /* `type` column */ StringBuffer<64> join_type_buf; if (rowid_filter == NULL) - push_str(thd, &item_list, join_type_str[type]); + { + if (type == JT_ALL && use_parallel_scan) + push_str(thd, &item_list, "PARALLEL"); + else + push_str(thd, &item_list, join_type_str[type]); + } else { join_type_buf.append(join_type_str[type], strlen(join_type_str[type])); @@ -2034,7 +2039,10 @@ void Explain_table_access::print_explain_json(Explain_query *query, } } - writer->add_member("access_type").add_str(join_type_str[type]); + if (type == JT_ALL && use_parallel_scan) + writer->add_member("access_type").add_str("PARALLEL"); + else + writer->add_member("access_type").add_str(join_type_str[type]); add_json_keyset(writer, "possible_keys", &possible_keys); diff --git a/sql/sql_explain.h b/sql/sql_explain.h index 7a1ee3911611e..cdade594ed11b 100644 --- a/sql/sql_explain.h +++ b/sql/sql_explain.h @@ -809,6 +809,7 @@ class Explain_table_access : public Sql_alloc enum join_type type; bool used_partitions_set; + bool use_parallel_scan; /* Empty means "NULL" will be printed */ String_list possible_keys; diff --git a/sql/sql_parallel_workers.cc b/sql/sql_parallel_workers.cc new file mode 100644 index 0000000000000..d3186e330c809 --- /dev/null +++ b/sql/sql_parallel_workers.cc @@ -0,0 +1,1072 @@ +/* + Copyright (c) 2026, MariaDB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 + USA */ + +/** + @file + + Implementation of parallel worker threads (PWT) management and execution + logic. + + Contains + error_to_queue + push an error message onto our queue to send to the manager + PWT_error_handler + intercept error and warnings, queue them to the manager + parallel_worker_thread_func + Entry point for our worker threads + abort_worker + pwt_management::free_queue + helper for error conditions + pwt_management::init_parallel_workers + Initialise our parallel worker threads + pwt_init_psi_keys + initialize PSI keys + worker_produce_chunks / worker_scan_table_to_manager + producer: scan the source table and stream rows to the manager + a batch at a time through the worker's reused batch table + pwt_management::handoff_batch + producer: hand the filled batch table to the manager and block + until it is drained + parallel_scan_read_first / parallel_scan_read_next + consumer: feed the first join_tab from the worker batch tables + pwt_management::finalize_parallel_workers + stop the workers, reap them, surface diagnostics, tear down +*/ + + +#include "sql_parallel_workers.h" +#include "sql_select.h" +#include "debug_sync.h" +#include "transaction.h" + +#ifdef HAVE_PSI_INTERFACE +static PSI_thread_key key_thread_pwt; +static PSI_thread_info all_pwt_threads[]= +{ + { &key_thread_pwt, WORKER_NAME, PSI_FLAG_GLOBAL}, +}; + +static PSI_mutex_key key_mutex_pwt_LOCK_thread, + key_mutex_pwt_LOCK_worker, + key_mutex_pwt_LOCK_data; +static PSI_mutex_info all_pwt_mutexes[]= +{ + { &key_mutex_pwt_LOCK_thread, "pwt_management::LOCK_pwt_thread", 0}, + { &key_mutex_pwt_LOCK_worker, "pwt_worker::LOCK_worker", 0}, + { &key_mutex_pwt_LOCK_data, "pwt_management::LOCK_data", 0}, +}; + +static PSI_cond_key key_COND_pwt_data_avail, key_COND_pwt_data_space; +static PSI_cond_info all_pwt_conds[]= +{ + { &key_COND_pwt_data_avail, "pwt_management::COND_data_avail", 0}, + { &key_COND_pwt_data_space, "pwt_management::COND_data_space", 0}, +}; + +static PSI_memory_info all_pwt_memory[]= +{ + { &key_memory_pwt_queued_event, "pwt_queued_event", 0}, + { &key_memory_pwt_error_message, "pwt_error_message", 0}, + { &key_memory_pwt_workers, "pwt_management::workers", 0}, + { &key_memory_pwt_db, "pwt_worker::db", 0}, +}; +#endif /* HAVE_PSI_INTERFACE */ + + +// consumer read function, installed on the first join_tab by init below +static int parallel_scan_read_first(JOIN_TAB *tab); + + +/** + @brief + push an error message onto our queue to send to the manager + + @return + true an error occurred + false error or warning is queued +*/ + +bool error_to_queue(pwt_queued_event **event, uint error, + Sql_condition::enum_warning_level level, const char *msg) +{ + DBUG_EXECUTE_IF("pwt_error_to_queue_oom", + { *event= nullptr; return true; }); + *event= (pwt_queued_event*) my_malloc(key_memory_pwt_queued_event, + sizeof(pwt_queued_event), + MYF(0)); + if (!*event) + return true; + (*event)->error= (pwt_error_message*) my_malloc(key_memory_pwt_error_message, + sizeof(pwt_error_message), + MYF(0)); + if (!(*event)->error) + { + my_free(*event); + *event= nullptr; + return true; + } + (*event)->error->level= level; + (*event)->error->code= error; + (*event)->error->message= (char *) my_malloc(key_memory_pwt_error_message, + strlen(msg)+1, + MYF(0)); + if (!(*event)->error->message) + { + my_free((*event)->error); + my_free(*event); + *event= nullptr; + return true; + } + strmake((*event)->error->message, msg, strlen(msg)); + return false; +} + + +/** + @brief + An instance of this class is used by our worker threads to capture and + relay to the manager +*/ + +class PWT_error_handler : public Internal_error_handler +{ +public: + bool handle_condition(THD *thd, + uint sql_errno, + const char* sql_state, + Sql_condition::enum_warning_level *level, + const char* msg, + Sql_condition ** cond_hdl) override + { + if (pwt_worker *worker= thd->pwt_worker_info) + { + pwt_queued_event *event; + if (error_to_queue(&event, sql_errno, *level, msg)) + { + /* + Couldn't allocate the queued event. The worker THD's diagnostics + area is discarded when the worker exits, so flag the manager so it + can surface a single ER_OUTOFMEMORY warning to the user instead of + letting this condition vanish. + */ + mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); + worker->manager->messages_dropped= true; + mysql_mutex_unlock(&worker->manager->LOCK_pwt_thread); + return true; + } + mysql_mutex_lock(&worker->manager->LOCK_pwt_thread); + worker->manager->parallel_messages.push_back(event); + mysql_mutex_unlock(&worker->manager->LOCK_pwt_thread); + } + return true; // no further processing in worker thread + } + +}; + + +/* + @description + Copy one field's value from one row buffer to another, propagating the null + state. field_conv() copies only the value bytes, so the null bit has to be + transferred separately -- otherwise a NULL source would leave the + destination non-null (showing stale data) and a non-null source would not + clear a null bit left over from a previous row. set_null/set_notnull are + safe no-ops when the field is declared NOT NULL. +*/ +static inline void copy_field_with_null(Field *to, Field *from) +{ + if (from->is_null()) + to->set_null(); + else + { + to->set_notnull(); + field_conv(to, from); + } +} + + +/** + @brief + Hand this worker's filled batch table to the manager (producer side). + + @description + Marks batch_table ready and blocks until the manager has drained it + (clears batch_full) or asks the producers to stop. On return the table is + the worker's again: either empty and ready to refill, or to be abandoned. + + @return + true the consumer asked us to stop (stop scanning) + false the table was drained; refill it +*/ + +bool pwt_management::handoff_batch(pwt_worker *worker) +{ + mysql_mutex_lock(&LOCK_data); + if (stop) + { + mysql_mutex_unlock(&LOCK_data); + return true; + } + worker->batch_full= true; + mysql_cond_signal(&COND_data_avail); // wake the consumer + while (worker->batch_full && !stop) + mysql_cond_wait(&COND_data_space, &LOCK_data); + bool stopped= stop; + mysql_mutex_unlock(&LOCK_data); + return stopped; +} + + +void inline close_worker_scan_table(pwt_worker *worker) +{ + if (worker->our_scan_table) + { + closefrm(worker->our_scan_table); + my_free(worker->our_scan_table); + worker->our_scan_table= nullptr; + } +} + +/** + @brief + Scan the worker's private source table and stream the rows to the manager, + a batch at a time, through the worker's reused batch table. + + @description + Each worker scans its own private copy of the first non-const join table + (worker->our_scan_table, opened with in_use == this worker's thd) so the + workers scan truly concurrently -- no shared-scan lock is needed. It copies + up to PWT_CHUNK_ROWS rows field-by-field into batch_table (built in + the source-table column format, so field_conv is needed rather than a raw + record copy), hands that table to the manager, and blocks until the manager + has drained it (handoff_batch). It then truncates the table and refills it + for the next batch, until the source scan is exhausted. + + The worker only reproduces the source scan; it does not apply WHERE/JOIN + conditions or run the rest of the join. The manager consumes these rows and + drives the join itself as they arrive (see parallel_scan_read_next). + + @return + 0 on success, or the handler error code; HA_ERR_END_OF_FILE is mapped + to success. A clean stop requested by the manager (handoff_batch -> stop) + also returns success: the manager is done, not in error. +*/ + +static int worker_produce_chunks(pwt_worker *worker) +{ + TABLE *src= worker->our_scan_table; + TABLE *dst= worker->batch_table; + const uint nfields= src->s->fields; + int err; + + src->use_all_columns(); // read every column into record[0] + dst->use_all_columns(); // we write every column of the batch row + + /* + Our handle bypassed lock_tables(), so take the engine-level read lock + ourselves; InnoDB needs this to register the table with its trx before + a scan. Paired with the F_UNLCK below. + */ + if ((err= src->file->ha_external_lock(worker->thd, F_RDLCK))) + return err; + + src->file->ha_index_or_rnd_end(); // in case a prior scan was open + if ((err= src->file->ha_rnd_init_with_error(1))) + { + src->file->ha_external_lock(worker->thd, F_UNLCK); + return err; + } + + bool eof= false, killed= false; + while (!eof && !killed) + { + /* + The batch table is ours again (freshly created on the first pass, or + drained by the manager on later passes). Take ownership and empty it. + */ + dst->in_use= worker->thd; + if ((err= dst->file->ha_delete_all_rows())) + break; + + uint rows= 0; + while (rows < PWT_CHUNK_ROWS) + { + // honour a direct KILL of this worker's thread + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + killed= worker->thd->killed; + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + if (killed) + { + my_error(ER_QUERY_INTERRUPTED, MYF(0)); + break; // stop now; do not hand off + } + + if ((err= src->file->ha_rnd_next(src->record[0]))) + { + if (err == HA_ERR_END_OF_FILE) + { + err= 0; + eof= true; + } + break; + } + for (uint k= 0; k < nfields; k++) + copy_field_with_null(dst->field[k], src->field[k]); + if ((err= dst->file->ha_write_tmp_row(dst->record[0]))) + break; + rows++; + } + + if (err && err != HA_ERR_END_OF_FILE) + break; // real error; do not hand off + err= 0; + + if (rows && !killed) // hand the filled batch to the manager + { + if (worker->manager->handoff_batch(worker)) // manager asked us to stop + break; + } + } + + src->file->ha_rnd_end(); + src->file->ha_external_lock(worker->thd, F_UNLCK); + return err; +} + + +/** + @brief Write rows to our manager, when done, tidy up. Entry point for + worker_produce_chunks. + + @return true on error, false on success +*/ + +bool worker_scan_table_to_manager(pwt_worker *worker) +{ + pwt_management *mgr= worker->manager; + int err= worker_produce_chunks(worker); + + /* + End the worker's read transaction now, while we are still on the worker + thread. destroy_background_thd() -> THD::cleanup() rolls the transaction + back and asserts (trans_check) that the statement transaction is already + empty, so we must close it out here. Any commit failure is captured by the + installed PWT_error_handler. + */ + trans_commit_stmt(worker->thd); + trans_commit(worker->thd); + + /* + Mark this producer done so the consumer can detect EOF, and wake it in + case it is blocked waiting for data. A real engine error trips fatal_error + so the consumer aborts the join instead of returning a truncated result. + If this worker was killed (e.g. a user KILL aimed at it), record the kill + so the consumer can propagate it to the manager's THD and abort the join + with ER_QUERY_INTERRUPTED before any result is sent. + */ + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + killed_state killed= worker->thd->killed; + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + + mysql_mutex_lock(&mgr->LOCK_data); + if (err) + mgr->fatal_error= true; + if (killed && mgr->kill_signal == NOT_KILLED) + mgr->kill_signal= killed; + mgr->active_workers--; + mysql_cond_broadcast(&mgr->COND_data_avail); + mysql_mutex_unlock(&mgr->LOCK_data); + + if (err) + { + worker->our_scan_table->file->print_error(err, MYF(0)); + return true; + } + return false; +} + + +/** + @brief + Entry point for our worker threads, arg supplied by manager details what + needs to be run +*/ + +static void *parallel_worker_thread_func(void *arg) +{ + pwt_worker *worker= (pwt_worker*) arg; + PWT_error_handler error_handler; + + /* + Set current_thd and thread local storage (my_thread_var) for our new THD + to ensure they have their own local objects/errors/warnings etc + */ + void *save= thd_attach_thd(worker->thd); + my_thread_set_name(worker->thd->connection_name.str); + THD_STAGE_INFO(worker->thd, stage_sending_data); + worker->thd->push_internal_handler(&error_handler); + + DBUG_EXECUTE_IF("pwt_error_to_queue_oom", + { + push_warning(current_thd, Sql_condition::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, + "This is an example warning to show we can push a " + "warning from a worker thread to its manager "); + }); +#ifdef ENABLED_DEBUG_SYNC + /* + we can't sync on the managers or our THD, spin the whole thing about + and use the global signal pool, NO_CLEAR_EVENT is needed because we have + multiple workers and the wrong one will likely consume the signal. + */ + DBUG_EXECUTE_IF("pwt_worker_pause_before_signal", + DBUG_ASSERT(!debug_sync_set_action(worker->thd, STRING_WITH_LEN( + "now SIGNAL pwt_worker_paused WAIT_FOR pwt_worker_continue NO_CLEAR_EVENT" + )));); +#endif + + mysql_mutex_lock(&worker->thd->LOCK_thd_kill); + if (worker->thd->killed) + { + my_error(ER_QUERY_INTERRUPTED, MYF(0)); + } + mysql_mutex_unlock(&worker->thd->LOCK_thd_kill); + + worker_scan_table_to_manager(worker); + + /* + Null worker->thd under LOCK_worker so abort_worker() -- which takes + LOCK_worker before deciding whether to awake() -- sees either a live THD or + nullptr, never a THD mid-teardown. + */ + mysql_mutex_lock(&worker->LOCK_worker); + worker->thd->pop_internal_handler(); // maybe not needed + THD *thd= worker->thd; + worker->thd= nullptr; + mysql_mutex_unlock(&worker->LOCK_worker); + + /* + Close our private source table while we are still attached to our THD + (current_thd == thd) and, crucially, before destroy_background_thd() + tears down the THD's transaction: the engine handle's close frees state + that references that transaction (InnoDB's prebuilt). The manager never + touches a started worker's our_scan_table, so no lock is needed here. + */ + close_worker_scan_table(worker); + + /* + executing thd_detach_thd sets my_thread_var to null, stopping our ability + use the normal mutex mechanisms, so we operate this outside the locked + region on a copy of our THD pointer + */ + thd_detach_thd(save); + server_threads.erase(thd); + destroy_background_thd(thd); + + return nullptr; +} + + +/** + @brief + Abort this worker, called as part of an error condition + + The worker may already be tearing itself down: parallel_worker_thread_func + nulls worker->thd and destroys the THD under LOCK_worker. Take that lock + and only awake() if the worker hasn't yet entered its exit section; if + it has, the worker is on its way out and pthread_join will reap it. +*/ + +void abort_worker(pwt_worker *worker) +{ + mysql_mutex_lock(&worker->LOCK_worker); + if (worker->thd) + worker->thd->awake(ABORT_QUERY); + mysql_mutex_unlock(&worker->LOCK_worker); + pthread_join(worker->pthread, nullptr); + mysql_mutex_destroy(&worker->LOCK_worker); +} + + +/** + @brief + Free our message queue, discard the messages +*/ + +void pwt_management::free_queue() +{ + // process queue + if (!parallel_messages.head()) + return; + + mysql_mutex_lock(&LOCK_pwt_thread); + pwt_queued_event *event; + while ((event= parallel_messages.get())) + { + if (pwt_error_message *err= event->error) + { + my_free(err->message); + my_free(err); + } + my_free(event); + } + mysql_mutex_unlock(&LOCK_pwt_thread); +} + + + +/** + @brief + Initialise our parallel worker threads, setting their own new THD objects. + Set up our mutexs for synchronization. + Register our new threads in server_threads. + + Called from the management thread for applicable queries at the top level. + + @return + false on success + true on error +*/ + +bool pwt_management::init_parallel_workers(THD *thd, JOIN *join) +{ + bool result= false; + uint i= 0; + + if (const uint n= thd->variables.parallel_worker_threads) + { + this->join= join; + this->thd= thd; + JOIN_TAB *tab= join->parallel_scan_join_tab; + if (!tab) // nothing to do + { + nworkers= 0; + return false; + } + + /* + The parallel-scanned table is always the first non-const table: the gate + in get_best_combination() only ever sets use_parallel_scan there. The + workers reproduce a plain full table scan (ha_rnd_*) of it and feed the + manager row images, but never position the real handler on a row. Fall + back to a serial scan when that is unsafe: + + - select->quick: a range/quick access method was chosen (possibly after + use_parallel_scan was set, e.g. a range built from the WHERE clause); + a full scan would not honour it. + + - filesort: a sort tied to this table drives its own read of it, which + the injected read functions do not feed. + + - keep_current_rowid: the plan needs this table's engine rowid via + handler::position() -- e.g. DuplicateWeedout semijoin elimination or + a rowid-ordered sort. Injected rows carry no valid handler position, + so position() returns a stale/duplicate rowid for engines whose rowid + is positional (MyISAM), and weedout then drops rows. (InnoDB derives + the rowid from the PK in record[0], so it happens to survive, but we + must not rely on that.) + + The batch tables already created for this query are left unused (freed at + cleanup). + */ + JOIN_TAB *scan_tab= join->join_tab + join->const_tables; + if ((scan_tab->select && scan_tab->select->quick) || + scan_tab->filesort || + scan_tab->keep_current_rowid) + { + nworkers= 0; + return false; + } + + /* + One batch table per worker was created up front by + create_parallel_workers_tmp_tables; bail if that count is out of step + with the worker count we are about to spawn. + */ + if (n != tab->parallel_tmp_tables.elements()) + return true; + + workers= (pwt_worker *) my_malloc(key_memory_pwt_workers, + n * sizeof(pwt_worker), + MYF(MY_WME | MY_ZEROFILL)); + if (!workers) + return true; + + nworkers= n; + + mysql_mutex_init(key_mutex_pwt_LOCK_thread, &LOCK_pwt_thread, + MY_MUTEX_INIT_SLOW); + + /* + Set up the streaming channel before any worker starts: a worker's first + action is to hand off a batch through handoff_batch(), which needs + LOCK_data and the conds live. active_workers must already equal n so the + consumer does not mistake "not started yet" for EOF. + */ + mysql_mutex_init(key_mutex_pwt_LOCK_data, &LOCK_data, MY_MUTEX_INIT_FAST); + mysql_cond_init(key_COND_pwt_data_avail, &COND_data_avail, nullptr); + mysql_cond_init(key_COND_pwt_data_space, &COND_data_space, nullptr); + active_workers= n; + fatal_error= false; + stop= false; + reaped= false; + cur_worker= nullptr; + kill_signal= NOT_KILLED; + + for (i= 0; i < n; i++) + { + workers[i].thd= create_background_thd(); + if (!workers[i].thd) + { + result= true; + goto cleanup_old_workers; + } + + workers[i].manager= this; + mysql_mutex_init(key_mutex_pwt_LOCK_worker, &workers[i].LOCK_worker, + MY_MUTEX_INIT_FAST); + workers[i].thd->system_thread= SYSTEM_THREAD_GENERIC; + size_t len= my_snprintf(workers[i].conn_name, MAX_THREAD_NAME, + WORKER_NAME); + workers[i].thd->connection_name.str= workers[i].conn_name; + workers[i].thd->connection_name.length= len; + workers[i].thd->security_ctx= thd->security_ctx; + workers[i].thd->set_command(thd->get_command()); + if (thd->db.str) + { + // explicit call in ~THD/THD::free_connection()/my_free, so we do this + workers[i].thd->db.str= (char*)my_malloc(key_memory_pwt_db, + thd->db.length+1, + MYF(0)); + if (!workers[i].thd->db.str) + { + result= true; + goto cleanup_db_string; + } + + strmake(const_cast(workers[i].thd->db.str), thd->db.str, + thd->db.length); + workers[i].thd->db.length= thd->db.length; + } + else + { + workers[i].thd->db.str= nullptr; + workers[i].thd->db.length= 0; + } + workers[i].thd->start_utime= thd->start_utime; + workers[i].thd->thread_id= next_thread_id(); + my_snprintf(workers[i].info, sizeof(workers[i].info), + WORKER_NAME " %u " CONNECTION_NAME_THREAD " %llu", + i+1, thd->thread_id); + workers[i].thd->query_string= CSET_STRING(workers[i].info, + strlen(workers[i].info), + workers[i].thd->query_charset()); + workers[i].thd->pwt_worker_info= workers+i; + workers[i].batch_full= false; + workers[i].batch_table= tab->parallel_tmp_tables.at(i); + + /* + Give this worker its own TABLE+handler for the manager's first + non-const source table, opened from the shared TABLE_SHARE. + + open_table_from_share() runs here on the manager thread, so the open + must happen with in_use == current_thd: handler::ha_thd() asserts + table->in_use == current_thd, and ha_innobase::open() calls it. So we + open under the manager's thd, then repoint in_use at the worker. + + That is enough for engines that cache the THD: InnoDB sets m_user_thd + lazily on the first operation (update_thd()), not at open time, so it + binds to the worker when the worker scans on its own thread. Each + worker thus gets a private handler and they scan concurrently without + a shared-scan lock. + + Arguments to open_table_from_share() below: + thd open under the manager's THD (the in_use rule above); + in_use is repointed to the worker afterwards. + src->s the shared TABLE_SHARE of the first non-const source + table -- the worker's TABLE is built from it. + &src->s->table_name + alias (name) for the opened TABLE. + HA_OPEN_KEYFILE | HA_TRY_READ_ONLY + db_stat (handler open mode): open the index/key file, + read-only -- the worker only scans the table. + EXTRA_RECORD + prgflag: allocate a second record buffer (record[1]) + in addition to record[0], as for a normal table open. + thd->open_options + ha_open_flags, the handler open options from the THD. + st outparam: the TABLE we just allocated, initialised here + as this worker's private table. + false is_create_table: this open is not part of CREATE TABLE. + nullptr partitions_to_open: open all partitions (no subset). + */ + { + TABLE *src= (join->join_tab + join->const_tables)->table; + TABLE *st= (TABLE*) my_malloc(key_memory_TABLE, sizeof(TABLE), + MYF(MY_WME | MY_ZEROFILL)); + if (!st) + { + result= true; + goto cleanup_db_string; + } + if (open_table_from_share(thd, src->s, &src->s->table_name, + HA_OPEN_KEYFILE | HA_TRY_READ_ONLY, + EXTRA_RECORD, thd->open_options, st, + false, nullptr)) + { + my_free(st); + result= true; + goto cleanup_db_string; + } + st->in_use= workers[i].thd; + workers[i].our_scan_table= st; + } + + server_threads.insert(workers[i].thd); // +information_schema.processlist + + if (mysql_thread_create(key_thread_pwt, &workers[i].pthread, nullptr, + parallel_worker_thread_func, &workers[i])) + { + result= true; + goto cleanup_thread_create; + } + } + + /* + Feed the first join_tab from the channel instead of scanning the real + first table. exec_inner() runs after this returns and drives the join + through these read functions, consuming worker rows as they arrive. + */ + { + JOIN_TAB *first= join->join_tab + join->const_tables; + first->table->reginfo.join_tab= first; + first->read_first_record= parallel_scan_read_first; + } + return result; + } + else + return false; + +cleanup_thread_create: + server_threads.erase(workers[i].thd); + close_worker_scan_table(workers+i); + +cleanup_db_string: + /* + destroy_background_thd() requires current_thd to be NULL because it + re-attaches the background THD to this thread's TLS. We are running on + the user's query thread (current_thd == manager thd), so save/null/ + restore around the call. Mirrors the create_background_thd() pattern. + */ + { + THD *save_thd= current_thd; + set_current_thd(nullptr); + destroy_background_thd(workers[i].thd); + set_current_thd(save_thd); + } + mysql_mutex_destroy(&workers[i].LOCK_worker); + +cleanup_old_workers: + /* + A worker spawned before the failure may be blocked in handoff_batch() + waiting for the manager to drain its batch. Release them (stop + broadcast) + so abort_worker()'s join can complete. + */ + mysql_mutex_lock(&LOCK_data); + stop= true; + mysql_cond_broadcast(&COND_data_space); + mysql_mutex_unlock(&LOCK_data); + for (uint j= 0; j < i; j++) + abort_worker(workers+j); + free_queue(); + my_free(workers); + workers= nullptr; + nworkers= 0; + mysql_mutex_destroy(&LOCK_pwt_thread); + mysql_cond_destroy(&COND_data_avail); + mysql_cond_destroy(&COND_data_space); + mysql_mutex_destroy(&LOCK_data); + + return result; +} + +#ifdef HAVE_PSI_INTERFACE +void pwt_init_psi_keys(void) +{ + const char *category= "sql"; + int count; + count= array_elements(all_pwt_threads); + PSI_server->register_thread(category, all_pwt_threads, count); + count= array_elements(all_pwt_mutexes); + mysql_mutex_register(category, all_pwt_mutexes, count); + count= array_elements(all_pwt_conds); + mysql_cond_register(category, all_pwt_conds, count); + count= array_elements(all_pwt_memory); + mysql_memory_register(category, all_pwt_memory, count); +} +#endif + + +/* + @brief + function installed into the manager join execution to extract rows from + the worker threads. + + @description + Consumer side of the streaming channel. These pluggable read functions feed + the first join_tab from the worker batch tables instead of scanning the real + first table: each row a worker wrote into its batch table is copied + field-by-field into the first table's record[0], so the manager's + nested-loop join evaluates exactly as if it had scanned the table itself, but + driven by the worker rows as they arrive. parallel_scan_read_first installs + the read function and returns the first row; parallel_scan_read_next returns + each subsequent row, blocking when no worker batch is momentarily ready. + + The manager drains one worker's batch table at a time (mgr->cur_worker): it + rnd-scans that table, and on EOF releases the worker to refill (clears + batch_full, signals COND_data_space) and picks the next ready worker. + + @returns + 0 = row produced, + -1 = end of data + 1 = error (matching report_error()). +*/ +static int parallel_scan_read_next(READ_RECORD *info) +{ + TABLE *dst= info->table; // real first table + pwt_management *mgr= dst->reginfo.join_tab->join->parallel_work_manager; + const uint nfields= dst->s->fields; + struct timespec wait; + wait.tv_nsec= 0; + + for (;;) + { + if (mgr->cur_worker) // draining a worker's batch + { + TABLE *bt= mgr->cur_worker->batch_table; + int err= bt->file->ha_rnd_next(bt->record[0]); + if (!err) + { + /* + We are injecting a scanned row into the real first table's record[0]. + Some Field::store() paths (e.g. Field_bit_as_char) assert the field + is marked in write_set, but a plain scanned table has an empty + write_set. Mark all columns writable for the duration of the copy + (debug-only; a no-op in release). + */ + MY_BITMAP *old_map= dbug_tmp_use_all_columns(dst, &dst->write_set); + for (uint k= 0; k < nfields; k++) + copy_field_with_null(dst->field[k], bt->field[k]); + dbug_tmp_restore_column_map(&dst->write_set, old_map); + return 0; + } + // batch drained (EOF) or a read error; end the scan and release worker + bt->file->ha_rnd_end(); + pwt_worker *w= mgr->cur_worker; + mysql_mutex_lock(&mgr->LOCK_data); + mgr->cur_worker= nullptr; + w->batch_full= false; // table is the worker's again + mysql_cond_broadcast(&mgr->COND_data_space); // wake it to refill + mysql_mutex_unlock(&mgr->LOCK_data); + if (err != HA_ERR_END_OF_FILE) + return 1; + // fall through and look for the next ready worker + } + + // find the next worker whose batch table is filled and ready + pwt_worker *next= nullptr; + PSI_stage_info old_stage; + mysql_mutex_lock(&mgr->LOCK_data); + for (;;) + { + for (uint i= 0; i < mgr->nworkers; i++) + if (mgr->workers[i].batch_full) + { + next= &mgr->workers[i]; + break; + } + if (next) + break; + /* + A worker exited because it was killed: propagate the kill to the + manager's own THD so the join aborts now with ER_QUERY_INTERRUPTED, + before any result is sent. (The join's sub_select kill checks turn + thd->killed into the error message.) + */ + if (mgr->kill_signal != NOT_KILLED && !mgr->thd->killed) + { + killed_state ks= mgr->kill_signal; + mysql_mutex_unlock(&mgr->LOCK_data); + mysql_mutex_lock(&mgr->thd->LOCK_thd_kill); + mgr->thd->killed= ks; + mysql_mutex_unlock(&mgr->thd->LOCK_thd_kill); + return 1; + } + if (mgr->fatal_error) // a worker failed + { + mysql_mutex_unlock(&mgr->LOCK_data); + return 1; + } + if (!mgr->active_workers) // all producers done, drained + { + mysql_mutex_unlock(&mgr->LOCK_data); + return -1; + } + if (mgr->thd->killed) + { + mysql_mutex_unlock(&mgr->LOCK_data); + return 1; + } + // wait for a batch, a finishing worker, or a 1s tick to re-check killed. + // ENTER_COND/EXIT_COND publish the "Reading data from parallel workers" + // stage and register the cond so a KILL of the manager wakes it. + wait.tv_sec= time(0) + 1; + mgr->thd->ENTER_COND(&mgr->COND_data_avail, &mgr->LOCK_data, + &stage_reading_data_from_parallel_worker, &old_stage); + mysql_cond_timedwait(&mgr->COND_data_avail, &mgr->LOCK_data, &wait); + mgr->thd->EXIT_COND(&old_stage); // unlocks LOCK_data + mysql_mutex_lock(&mgr->LOCK_data); // re-lock for the next pass + } + mgr->cur_worker= next; + mysql_mutex_unlock(&mgr->LOCK_data); + + // open a scan on the chosen worker's batch table, then loop to read it + TABLE *bt= next->batch_table; + bt->in_use= mgr->thd; + bt->file->ha_index_or_rnd_end(); // in case a scan was open + if (bt->file->ha_rnd_init_with_error(1)) + return 1; + } +} + + +static int parallel_scan_read_first(JOIN_TAB *tab) +{ + tab->table->status= 0; + tab->read_record.table= tab->table; + tab->read_record.read_record_func= parallel_scan_read_next; + return parallel_scan_read_next(&tab->read_record); +} + + +/** + @brief + Stop the producers and pthread_join them. + + @description + The workers read this join's source and batch tables, so they must be + reaped before JOIN::join_free()->cleanup() frees those tables -- otherwise a + worker that has not yet observed the stop request dereferences a freed + table->file. That is why this is called from join_free(), ahead of the table + teardown, and again (idempotently, guarded by 'reaped') from finalize. + + On a normal completion the workers have already finished; on an early-out + (LIMIT) or an abort (KILL/error) we ask them to stop here. A worker + re-checks 'stop' at each batch hand-off, so it exits within at most one + batch -- without raising ER_QUERY_INTERRUPTED, which matters for a normal + early-out. +*/ + +void pwt_management::quiesce_workers() +{ + if (!workers || reaped) + return; + + /* + On an early-out/error the manager may have stopped mid-batch with a scan + still open on cur_worker's table; end it before that table is freed. + */ + if (cur_worker) + { + cur_worker->batch_table->file->ha_rnd_end(); + cur_worker= nullptr; + } + + mysql_mutex_lock(&LOCK_data); + stop= true; + mysql_cond_broadcast(&COND_data_space); + mysql_mutex_unlock(&LOCK_data); + + for (uint i= 0; i < nworkers; i++) + { + pthread_join(workers[i].pthread, nullptr); + mysql_mutex_destroy(&workers[i].LOCK_worker); + } + reaped= true; +} + + +/** + @brief + Reap the workers (if not already) and tear the channel down. + + @description + Called from JOIN::exec() once exec_inner() has finished. Worker errors and + warnings collected by PWT_error_handler are surfaced here, after the join's + own result has been produced. +*/ + +void pwt_management::finalize_parallel_workers(THD *thd, JOIN *join) +{ + if (!workers) + return; + + quiesce_workers(); // stop + join (no-op if already reaped) + + /* + Surface errors/warnings the workers queued via PWT_error_handler. A worker + error that mattered to the result has already aborted the join during + execution (fatal_error or a propagated kill), so thd is already in error by + the time we get here; raising another error would trip the "can't overwrite + status" assertion in the diagnostics area. So only raise a queued ERROR + when thd is not already in error -- otherwise keep it as a warning. Plain + warnings are always safe to add. + */ + bool surface_drop; + mysql_mutex_lock(&LOCK_pwt_thread); + surface_drop= messages_dropped; + messages_dropped= false; + pwt_queued_event *event; + while ((event= parallel_messages.get())) + { + if (pwt_error_message *err= event->error) + { + if (err->level == Sql_condition::enum_warning_level::WARN_LEVEL_ERROR && + !thd->is_error()) + my_message_sql(err->code, err->message, MYF(0)); + else + push_warning(thd, Sql_condition::WARN_LEVEL_WARN, err->code, + err->message); + my_free(err->message); + my_free(err); + } + my_free(event); + } + mysql_mutex_unlock(&LOCK_pwt_thread); + + if (surface_drop) + push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN, ER_OUTOFMEMORY, + "Parallel worker diagnostics were dropped due to " + "memory allocation failure"); + + mysql_cond_destroy(&COND_data_avail); + mysql_cond_destroy(&COND_data_space); + mysql_mutex_destroy(&LOCK_data); + mysql_mutex_destroy(&LOCK_pwt_thread); + my_free(workers); + workers= nullptr; + nworkers= 0; +} diff --git a/sql/sql_parallel_workers.h b/sql/sql_parallel_workers.h new file mode 100644 index 0000000000000..126030e0d575c --- /dev/null +++ b/sql/sql_parallel_workers.h @@ -0,0 +1,194 @@ +#ifndef SQL_PARALLEL_WORKERS_H +#define SQL_PARALLEL_WORKERS_H + +#include "mariadb.h" +#include "sql_class.h" +#include "mysqld.h" +#include "sql_error.h" + +extern MYSQL_THD create_background_thd(); +extern void destroy_background_thd(MYSQL_THD thd); +extern void *thd_attach_thd(MYSQL_THD thd); +extern void thd_detach_thd(void *save); + +// PWT Parallel Worker Thread + + +/* + Message Types +*/ +class pwt_error_message +{ +public: + uint code; + Sql_condition::enum_warning_level level; + char *message; +}; + +/* + Event type. Inherits ilink so it can live in an I_List. +*/ +class pwt_queued_event : public ilink +{ +public: + pwt_error_message *error; +}; + + +/* + Number of rows a worker packs into its batch temporary table before handing + it to the manager. The worker hands rows to the manager a batch at a time + rather than one at a time so the channel mutex is touched once per + PWT_CHUNK_ROWS rows instead of once per row. Each worker reuses a single + batch table (see pwt_worker::batch_table): it fills the table, hands + it to the manager and blocks until the manager has drained it, then truncates + and refills it for the next batch. +*/ +#define PWT_CHUNK_ROWS 64 + +class pwt_management; + + +#define WORKER_NAME "Parallel Worker" +#define WORKER_ID_LENGTH 3 +#define WORKER_NAME_LENGTH 15 +#define CONNECTION_NAME_THREAD "For Thread ID" +#define CONNECTION_NAME_THREAD_LENGTH 13 +#define THREAD_ID_LENGTH 20 // ull can occupy 20 chars + +/* + Parallel Worker Thread specific attributes +*/ +class pwt_worker +{ +public: + THD *thd; + pwt_management *manager; + pthread_t pthread; + /* + Guards worker->thd while the worker nulls it on exit, so abort_worker() + sees either a live THD to awake() or nullptr. See parallel_worker_thread_func. + */ + mysql_mutex_t LOCK_worker; + char conn_name[MAX_THREAD_NAME+1]; + /* + This is displayed in information_schema.processlist.info + Currently "Parallel Worker {1..N} For Thread M" + */ + char info[WORKER_NAME_LENGTH+ + 1+WORKER_ID_LENGTH+1+ + CONNECTION_NAME_THREAD_LENGTH+ + 1+THREAD_ID_LENGTH+1]; + /* + Per-worker copy of the our scan table, opened from the same TABLE_SHARE + with in_use == this worker's thd. Gives the worker a private handler so + it can scan concurrently with the other workers and the manager. + Engines like InnoDB cache the THD pointer (m_user_thd) at open time, + so a shared handler with a swapped in_use is not enough; each worker needs + its own. Opened in init_parallel_workers, closed in the worker thread + before its THD is destroyed. + */ + TABLE *our_scan_table; + /* + This worker's single batch temporary table, built in our_scan_table column + format and created up front by JOIN::create_parallel_workers_tmp_tables + (one per worker, stored in JOIN_TAB::parallel_tmp_tables). + The worker reuses it for every chunk: fill it with up to PWT_CHUNK_ROWS + rows, hand it to the manager, block until the manager has drained it, + then truncate and refill. The worker and the manager never touch it at + the same time, so it needs no per-row locking. + */ + TABLE *batch_table; + /* + Hand-off flag for batch_table, guarded by pwt_management::LOCK_data. + The worker sets it true once the table is filled and ready for the manager; + the manager clears it once the table is drained, releasing the worker to + refill. See pwt_management::handoff_batch / the consumer read functions. + */ + bool batch_full; +}; + + +/* + Class to create, manage and eventually destroy a "team" of worker threads. +*/ +class pwt_management : public Sql_alloc +{ +public: + pwt_worker *workers; + uint nworkers; + I_List parallel_messages; + mysql_mutex_t LOCK_pwt_thread; + THD *thd; + /* + Set under LOCK_pwt_thread when a worker fails to allocate a queued event. + The manager surfaces a single ER_OUTOFMEMORY warning so the user sees + that worker diagnostics were dropped instead of silently disappearing. + */ + bool messages_dropped; + + /* + Streaming channel. Each worker (producer) fills its single reused batch + table (batch_table) and hands it to the manager (single consumer) + by setting its batch_full flag; the manager drains the table from its first + join_tab read function and runs the rest of the join as the batches arrive, + instead of waiting for every worker to finish first. + + LOCK_data guards cur_worker, the workers' batch_full flags, active_workers + and the flags below. COND_data_avail wakes the consumer when a worker + fills its table or finishes; COND_data_space wakes a worker when the + manager has drained its table so it may refill. Because each worker owns + one table and blocks until it is drained, at most one batch per worker is + ever outstanding -- the single table is the natural backpressure bound. + EOF for the consumer is the state (no worker has batch_full set && + active_workers == 0). + */ + mysql_mutex_t LOCK_data; + mysql_cond_t COND_data_avail; + mysql_cond_t COND_data_space; + pwt_worker *cur_worker; // worker whose table the consumer drains + uint active_workers; // producers still running + bool fatal_error; // a producer hit a real engine error + /* + Set (under LOCK_data) to a worker's killed_state when that worker exits + because it was killed -- e.g. a user KILL [QUERY] aimed at a parallel + worker. The consumer propagates it to the manager's own THD so the join + aborts with the right error (ER_QUERY_INTERRUPTED) before any result is + sent, rather than completing and trying to raise the error too late. + */ + killed_state kill_signal; + + pwt_management(): + workers(nullptr), + nworkers(0), + messages_dropped(false), + cur_worker(nullptr), + active_workers(0), + fatal_error(false), + kill_signal(NOT_KILLED), + stop(false), + reaped(false) + {} + ~pwt_management() + { + finalize_parallel_workers(current_thd, join); + } + bool init_parallel_workers(THD *thd, JOIN *join); + void quiesce_workers(); + void finalize_parallel_workers(THD *thd, JOIN *join); + bool handoff_batch(pwt_worker *worker); + void free_queue(); + +private: + JOIN *join; // the join these workers serve + bool stop; // consumer wants producers to stop + /* + Set once the workers have been stopped and pthread_join'd (quiesce_workers). + Workers read this join's source and batch tables, so they must be reaped + before JOIN::join_free()->cleanup() frees those tables; quiesce_workers is + therefore called from join_free, and again (idempotently) from finalize. + */ + bool reaped; +}; + +#endif diff --git a/sql/sql_select.cc b/sql/sql_select.cc index dbd3283c45c5c..3296ccd0359e1 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -26,6 +26,7 @@ */ #include "mariadb.h" +#include "mysqld_error.h" #include "sql_priv.h" #include "unireg.h" #include "sql_select.h" @@ -70,6 +71,7 @@ #include "derived_handler.h" #include "opt_hints.h" #include "opt_group_by_cardinality.h" +#include "sql_parallel_workers.h" /* A key part number that means we're using a fulltext scan. @@ -523,6 +525,7 @@ void JOIN::init(THD *thd_arg, List &fields_arg, spl_opt_info= 0; need_tmp= 0; hidden_group_fields= 0; /*safety*/ + parallel_scan_join_tab= nullptr; error= 0; select= 0; return_tab= 0; @@ -4372,6 +4375,116 @@ bool JOIN::make_aggr_tables_info() } +/** + @brief + Create one temporary scan table per parallel worker. + + Per-worker temporary tables for the parallel scan of the first non-const + join table. Each worker reuses its table to ship that table's rows to the + manager a batch at a time; the manager reads those rows back into the first + join_tab and drives the rest of the join itself. + + These tmp tables are therefore built in the *source table's* format -- one + column per field of the first table -- not in the result-projection format + of the post-join aggregation table. Only meaningful when that first table is + read by a full table scan (JT_ALL). +*/ + +bool JOIN::create_parallel_workers_tmp_tables(JOIN_TAB *tab) +{ + const uint n= thd->variables.parallel_worker_threads; + TABLE *src= (join_tab + const_tables)->table; + + List scan_fields; + for (Field **f= src->field; *f; f++) + { + Item *item= new (thd->mem_root) Item_field(thd, *f); + if (!item || scan_fields.push_back(item, thd->mem_root)) + return true; + } + + TMP_TABLE_PARAM *sparam= new (thd->mem_root) TMP_TABLE_PARAM; + if (!sparam) + return true; + sparam->init(); + count_field_types(select_lex, sparam, scan_fields, false); + sparam->skip_create_table= true; + + /* + Force the batch tables into the in-memory HEAP engine. With + tmp_memory_table_size == 0 Create_tmp_table::choose_engine would otherwise + pick the on-disk Aria engine; but a batch table is created and freed by the + manager thread while its rows are written by a worker thread, and the + temp-file size callback charges Aria file growth to whichever thread is + current (the worker), releasing it only when the file is deleted (on the + manager). That cross-thread split leaves the worker's + status_var.tmp_space_used non-zero at thread exit, tripping + THD::free_connection()'s assertion. A HEAP table creates no temp file, so + the callback never fires and the accounting stays balanced. BLOB/TEXT + tables are excluded from parallel scan, so batch rows are fixed-width and a + PWT_CHUNK_ROWS-sized batch stays comfortably in memory. + */ + ulonglong saved_tmp_memory_table_size= thd->variables.tmp_memory_table_size; + if (!saved_tmp_memory_table_size) + thd->variables.tmp_memory_table_size= thd->variables.max_heap_table_size; + + bool error= false; + tab->parallel_tmp_tables.init(PSI_INSTRUMENT_MEM); + for (uint i= 0; i < n; i++) + { + TABLE* tmp_table= create_tmp_table(thd, sparam, scan_fields, + nullptr, false, false, + select_options, HA_POS_ERROR, + &empty_clex_str, true, false); + if (!tmp_table) + { + error= true; + break; + } + tmp_table->reginfo.join_tab= tab; + tab->parallel_tmp_tables.append(tmp_table); + // instantiate this now so the parallel workers can write to it + if (instantiate_tmp_table(tmp_table, sparam->keyinfo, + sparam->start_recinfo, + &sparam->recinfo, + select_options, + true /*cross_thread*/)) + { + error= true; + break; + } + } + + thd->variables.tmp_memory_table_size= saved_tmp_memory_table_size; + if (error) + { + /* + Our caller just returns on failure without reaching create_postjoin_ + aggr_table's err: cleanup, so free the batch tables we already created + here rather than leak them. + */ + free_parallel_tmp_tables(tab); + return true; + } + parallel_scan_join_tab= tab; + + return false; +} + + +/* + Free the per-worker batch tables created by create_parallel_workers_tmp_tables + and reset the array. Safe to call when none were created (no-op) and safe to + call more than once. +*/ + +void JOIN::free_parallel_tmp_tables(JOIN_TAB *tab) +{ + while (tab->parallel_tmp_tables.elements()) + free_tmp_table(thd, tab->parallel_tmp_tables.pop()); + tab->parallel_tmp_tables.free_memory(); +} + bool JOIN::create_postjoin_aggr_table(JOIN_TAB *tab, List *table_fields, @@ -4405,13 +4518,37 @@ JOIN::create_postjoin_aggr_table(JOIN_TAB *tab, List *table_fields, if (tmp_table_keep_current_rowid) add_fields_for_current_rowid(tab, table_fields); tab->tmp_table_param->skip_create_table= true; + + /* + When the first non-const table is read by a full scan (JT_ALL) and parallel + workers are enabled, create one per-worker batch table now. At execution + JOIN::exec spins up workers that stream that table's rows through those + batch tables into the first join_tab instead of the manager scanning it + itself. (create_parallel_workers_tmp_tables sets parallel_scan_join_tab.) + */ + if (!parallel_scan_join_tab && + thd->variables.parallel_worker_threads && + top_join_tab_count > const_tables && + (join_tab + const_tables)->use_parallel_scan && + (join_tab + const_tables)->type == JT_ALL) + { + if (create_parallel_workers_tmp_tables(tab)) + DBUG_RETURN(true); + } + TABLE* table= create_tmp_table(thd, tab->tmp_table_param, *table_fields, table_group, distinct, save_sum_fields, select_options, table_rows_limit, &empty_clex_str, true, keep_row_order); if (!table) - DBUG_RETURN(true); + goto err; + // instantiate this now so the parallel worker manager can write to it + if (instantiate_tmp_table(table, tab->tmp_table_param->keyinfo, + tab->tmp_table_param->start_recinfo, + &tab->tmp_table_param->recinfo, + select_options)) + goto err; tmp_table_param.using_outer_summary_function= tab->tmp_table_param->using_outer_summary_function; tab->join= this; @@ -4477,6 +4614,12 @@ JOIN::create_postjoin_aggr_table(JOIN_TAB *tab, List *table_fields, if (table != NULL) free_tmp_table(thd, table); tab->table= NULL; + /* + On every path that reaches here tab->aggr is not set yet, so JOIN::cleanup + will not free the per-worker batch tables -- release them here. No-op when + parallel scan was not used for this query. + */ + free_parallel_tmp_tables(tab); DBUG_RETURN(true); } @@ -4909,10 +5052,43 @@ int JOIN::exec() select_lex->select_number)) dbug_serve_apcs(thd, 1); ); + + /* + If we need worker threads, are a top level select statement and + we haven't initialized our parallel work manager + */ + if (thd->variables.parallel_worker_threads && + !select_lex->outer_select() && + thd->lex->sql_command == SQLCOM_SELECT && + !parallel_work_manager) + { + if (!(parallel_work_manager= new (thd->mem_root) pwt_management) || + parallel_work_manager->init_parallel_workers(thd, this)) + { + delete parallel_work_manager; + parallel_work_manager= NULL; + my_error(ER_INTERNAL_ERROR, MYF(0), + "Failed to initialize parallel work mgr"); + return 1; + } + } + ANALYZE_START_TRACKING(thd, &explain->time_tracker); res= exec_inner(); ANALYZE_STOP_TRACKING(thd, &explain->time_tracker); + /* + exec_inner() drove the join by consuming the worker channel. Stop the + workers, reap them and surface any worker diagnostics, then delete the + manager. + */ + if (parallel_work_manager) + { + parallel_work_manager->finalize_parallel_workers(thd, this); + delete parallel_work_manager; + parallel_work_manager= NULL; + } + DBUG_EXECUTE_IF("show_explain_probe_join_exec_end", if (dbug_user_var_equals_int(thd, "show_explain_probe_select_id", @@ -13454,7 +13630,45 @@ bool JOIN::get_best_combination() j->index= cur_pos->forced_index; } else + { j->type= JT_ALL; + /* + Only the first non-const table (tablenr == const_tables), when read by + a full table scan, is considered for parallel scan: it is the driving + table of the nested-loop join and is therefore scanned exactly once, + which is what the worker streaming model reproduces. Tables deeper in + the join order are re-scanned once per outer row and are never + parallelised. + + Even the first table is excluded when its rows cannot be shipped to + the manager by value through a per-worker batch table: + - BLOB/TEXT and other blob-backed columns (GEOMETRY, JSON) keep + their payload off the record buffer (s->blob_fields > 0), so they + are not reproduced on the manager side; + - internal/temporary tables (tmp_table != NO_TMP_TABLE); + - the MERGE engine (ha_myisammrg): a worker opens its own handler + straight from the TABLE_SHARE, which does not attach the MERGE + children, so ha_myisammrg::rnd_init() asserts; + - fulltext-searched tables: a MATCH ... AGAINST relevance is derived + from the handler's fulltext state for the current row, not a + stored column, so the injected row images would compute it wrongly. + */ + if (tablenr == const_tables && + thd->variables.parallel_worker_threads && + j->table->s->tmp_table == NO_TMP_TABLE && + j->table->s->blob_fields == 0 && + j->table->file->ht->db_type != DB_TYPE_MRG_MYISAM && + !j->table->fulltext_searched) + { + j->use_parallel_scan= true; + if (unlikely(thd->trace_started())) + { + Json_writer_object trace_pscan(thd); + trace_pscan.add("chosen_for_parallel_scan", + j->table->alias.c_ptr()); + } + } + } if (cur_pos->use_join_buffer && tablenr != const_tables) full_join= 1; @@ -16576,6 +16790,7 @@ make_join_readinfo(JOIN *join, ulonglong options, uint no_jbuf_after) tab->read_first_record= join_read_first; /* Read with index_first / index_next */ tab->type= tab->type == JT_ALL ? JT_NEXT : JT_HASH_NEXT; + tab->use_parallel_scan= false; } } if (have_quick_select && @@ -17245,6 +17460,16 @@ void JOIN::join_free() bool can_unlock= full; DBUG_ENTER("JOIN::join_free"); + /* + Parallel-scan workers read this join's source and batch tables. cleanup() + below frees those tables, so the workers must be stopped and reaped first + -- otherwise a worker that has not yet seen the stop request dereferences a + freed table->file. On normal completion the workers have already finished, + so this is a cheap no-op. + */ + if (parallel_work_manager) + parallel_work_manager->quiesce_workers(); + cleanup(full); for (tmp_unit= select_lex->first_inner_unit(); @@ -17360,6 +17585,7 @@ void JOIN::cleanup(bool full) { free_tmp_table(thd, curr_tab->table); curr_tab->table= NULL; + free_parallel_tmp_tables(curr_tab); delete curr_tab->tmp_table_param; curr_tab->tmp_table_param= NULL; curr_tab->aggr= NULL; @@ -23310,12 +23536,13 @@ bool Virtual_tmp_table::check_assignability_from(const TABLE &table, } -bool open_tmp_table(TABLE *table) +bool open_tmp_table(TABLE *table, bool cross_thread) { int error; if (unlikely((error= table->file->ha_open(table, table->s->path.str, O_RDWR, HA_OPEN_TMP_TABLE | - HA_OPEN_INTERNAL_TABLE | + (cross_thread? 0 : + HA_OPEN_INTERNAL_TABLE) | HA_OPEN_SIZE_TRACKING)))) { table->file->print_error(error, MYF(0)); /* purecov: inspected */ @@ -24322,7 +24549,8 @@ do_select(JOIN *join, Procedure *procedure) bool instantiate_tmp_table(TABLE *table, KEY *keyinfo, TMP_ENGINE_COLUMNDEF *start_recinfo, TMP_ENGINE_COLUMNDEF **recinfo, - ulonglong options) + ulonglong options, + bool cross_thread) { DBUG_ASSERT(table->s->keys == 0 || table->key_info == keyinfo); DBUG_ASSERT(table->s->keys <= 1); @@ -24340,7 +24568,7 @@ bool instantiate_tmp_table(TABLE *table, KEY *keyinfo, empty_record(table); table->status= STATUS_NO_RECORD; } - if (open_tmp_table(table)) + if (open_tmp_table(table, cross_thread)) return TRUE; return FALSE; @@ -31193,6 +31421,7 @@ bool JOIN_TAB::save_explain_data(Explain_table_access *eta, tab_type= type == JT_HASH ? JT_HASH_RANGE : JT_RANGE; } eta->type= tab_type; + eta->use_parallel_scan= use_parallel_scan; /* Build "possible_keys" value */ // psergey-todo: why does this use thd MEM_ROOT??? Doesn't this diff --git a/sql/sql_select.h b/sql/sql_select.h index 2507a871a6005..c20b619dce25d 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -33,6 +33,7 @@ #include "sql_update.h" #include "cset_narrowing.h" +#include "sql_parallel_workers.h" typedef struct st_join_table JOIN_TAB; /* Values in optimize */ @@ -252,6 +253,14 @@ class SplM_opt_info; typedef struct st_join_table { TABLE *table; + /* + One batch temporary table per parallel worker, built in this (first + non-const) table's column format and created up front by + JOIN::create_parallel_workers_tmp_tables. Each worker reuses its table to + ship scanned source rows to the manager a batch at a time. Empty unless + this is the parallel-scan source table. + */ + Dynamic_array parallel_tmp_tables; TABLE_LIST *tab_list; KEYUSE *keyuse; /**< pointer to first used key */ KEY *hj_key; /**< descriptor of the used best hash join key @@ -428,6 +437,7 @@ typedef struct st_join_table { bool shortcut_for_distinct; bool sorted; bool cached_pfs_batch_update; + bool use_parallel_scan; /* If it's not 0 the number stored this field indicates that the index @@ -1616,6 +1626,13 @@ class JOIN :public Sql_alloc bool need_tmp; bool hidden_group_fields; + /* + First non-const join_tab whose table is read by a full scan (JT_ALL) and + is therefore eligible for parallel scanning. When set (and + parallel_worker_threads > 0) JOIN::exec spins up workers that stream that + table's rows to the manager through the pwt_management channel. + */ + JOIN_TAB *parallel_scan_join_tab; /* TRUE if there was full cleanup of the JOIN */ bool cleaned; DYNAMIC_ARRAY keyuse; @@ -1753,6 +1770,8 @@ class JOIN :public Sql_alloc */ Sql_cmd_dml *sql_cmd_dml; + pwt_management *parallel_work_manager{0}; + JOIN(THD *thd_arg, List &fields_arg, ulonglong select_options_arg, select_result *result_arg) :fields_list(fields_arg) @@ -1982,6 +2001,9 @@ class JOIN :public Sql_alloc */ void optimize_distinct(); + bool create_parallel_workers_tmp_tables(JOIN_TAB *join_tab); + void free_parallel_tmp_tables(JOIN_TAB *join_tab); + void cleanup_item_list(List &items) const; bool add_having_as_table_cond(JOIN_TAB *tab); bool make_aggr_tables_info(); @@ -2690,8 +2712,9 @@ bool create_internal_tmp_table(TABLE *table, KEY *keyinfo, bool instantiate_tmp_table(TABLE *table, KEY *keyinfo, TMP_ENGINE_COLUMNDEF *start_recinfo, TMP_ENGINE_COLUMNDEF **recinfo, - ulonglong options); -bool open_tmp_table(TABLE *table); + ulonglong options, + bool cross_thread= false); +bool open_tmp_table(TABLE *table, bool cross_thread= false); void fix_list_after_tbl_changes(SELECT_LEX *new_parent, List *tlist); void optimize_keyuse(JOIN *join, DYNAMIC_ARRAY *keyuse_array); bool sort_and_filter_keyuse(JOIN *join, DYNAMIC_ARRAY *keyuse, diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index d6becc62d8632..76975c9e7c1bb 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -2496,6 +2496,17 @@ Sys_slave_parallel_workers( ON_UPDATE(fix_slave_parallel_threads)); +static Sys_var_on_access_global +Sys_parallel_worker_threads( + "parallel_worker_threads", + "Number of worker threads available for parallel query execution. " + "0 means parallel execution is disabled", + SESSION_VAR(parallel_worker_threads), CMD_LINE(REQUIRED_ARG), + VALID_RANGE(0,100), DEFAULT(0), BLOCK_SIZE(1)); +/* VALID_RANGE(0,100), DEFAULT(1), BLOCK_SIZE(1)); // testing 123 */ + + static bool check_slave_domain_parallel_threads(sys_var *self, THD *thd, set_var *var) {