From 83e1d3a0be070a16e8dae28d2abd4ee4c3844433 Mon Sep 17 00:00:00 2001 From: Sam Clegg Date: Fri, 22 May 2026 15:04:52 -0700 Subject: [PATCH] Implement Pthread Manager Worker for synchronous thread creation Fixes: #18633 --- src/lib/libpthread.js | 40 ++++++++++++++++ src/runtime_pthread.js | 74 +++++++++++++++++++++++++++-- src/settings.js | 5 ++ test/pthread/test_pthread_manager.c | 16 +++++++ test/test_browser.py | 11 ++++- tools/settings.py | 1 + 6 files changed, 142 insertions(+), 5 deletions(-) create mode 100644 test/pthread/test_pthread_manager.c diff --git a/src/lib/libpthread.js b/src/lib/libpthread.js index 67a7b173a0f70..8db138bd4501b 100644 --- a/src/lib/libpthread.js +++ b/src/lib/libpthread.js @@ -40,6 +40,10 @@ const CMD_CLEANUP_THREAD = 6; const CMD_MARK_AS_FINISHED = 7; const CMD_UNCAUGHT_EXN = 8; const CMD_CALL_HANDLER = 9; +#if PTHREAD_MANAGER +const CMD_MAKE_MANAGER = 10; +const CMD_TERMINATE = 11; +#endif #if WASM_ESM_INTEGRATION const pthreadWorkerScript = TARGET_BASENAME + '.pthread.mjs'; @@ -124,6 +128,21 @@ var LibraryPThread = { } }, initMainThread() { +#if PTHREAD_MANAGER +#if ASSERTIONS + dbg('PThread: initializing manager worker'); + assert({{{ ENVIRONMENT_IS_MAIN_THREAD() }}}); +#endif + PThread.allocateUnusedWorker(); + PThread.managerWorker = PThread.unusedWorkers.pop(); + addOnPreRun(async () => { + var managerReady = PThread.loadWasmModuleToWorker(PThread.managerWorker); + addRunDependency('manager-worker'); + await managerReady; + PThread.managerWorker.postMessage({ cmd: {{{ CMD_MAKE_MANAGER }}} }); + removeRunDependency('manager-worker'); + }); +#endif // PTHREAD_MANAGER #if PTHREAD_POOL_SIZE var pthreadPoolSize = {{{ PTHREAD_POOL_SIZE }}}; // Start loading up the Worker pool, if requested. @@ -701,6 +720,27 @@ var LibraryPThread = { assert(threadParams.pthread_ptr, 'spawnThread called with null pthread ptr'); #endif +#if PTHREAD_MANAGER + if ({{{ ENVIRONMENT_IS_MAIN_THREAD() }}}) { +#if RUNTIME_DEBUG + dbg('creating pthread using managerWorker'); +#endif + var workerStub = { + pthread_ptr: threadParams.pthread_ptr, + postMessage: (msg, transfer) => { + msg.targetThread = threadParams.pthread_ptr; + PThread.managerWorker.postMessage(msg, transfer); + }, + terminate: () => { + PThread.managerWorker.postMessage({ cmd: {{{ CMD_TERMINATE }}}, thread: threadParams.pthread_ptr }); + } + }; + PThread.pthreads[threadParams.pthread_ptr] = workerStub; + PThread.managerWorker.postMessage({ cmd: {{{ CMD_SPAWN_THREAD }}}, threadParams }, threadParams.transferList); + return 0; + } +#endif + var worker = PThread.getNewWorker(); if (!worker) { // No available workers in the PThread pool. diff --git a/src/runtime_pthread.js b/src/runtime_pthread.js index dbcb641344523..4df8606228a54 100644 --- a/src/runtime_pthread.js +++ b/src/runtime_pthread.js @@ -42,11 +42,63 @@ if (ENVIRONMENT_IS_PTHREAD) { // notified about them. self.onunhandledrejection = (e) => { throw e.reason || e; }; +#if PTHREAD_MANAGER + function handleManagerMessage(e) { +#if PTHREADS_DEBUG + dbg('handleManagerMessage', e); +#endif + var d = e.data; + if (d.targetThread) { + var worker = PThread.pthreads[d.targetThread]; + if (worker) { + worker.postMessage(d, d.transferList); + } + return; + } + var cmd = d.cmd; + if (cmd === {{{ CMD_SPAWN_THREAD }}}) { + var threadParams = d.threadParams; + var worker = PThread.getNewWorker(); + PThread.runningWorkers.push(worker); + PThread.pthreads[threadParams.pthread_ptr] = worker; + worker.pthread_ptr = threadParams.pthread_ptr; + worker.onmessage = (e) => { + var d = e.data; + if (d.targetThread && d.targetThread != _pthread_self()) { + var targetWorker = PThread.pthreads[d.targetThread]; + if (targetWorker) { + targetWorker.postMessage(d, d.transferList); + return; + } + } + postMessage(d, d.transferList); + }; + var msg = { + cmd: {{{ CMD_RUN }}}, + start_routine: threadParams.startRoutine, + arg: threadParams.arg, + pthread_ptr: threadParams.pthread_ptr, + }; + worker.postMessage(msg, threadParams.transferList); + } else if (cmd === {{{ CMD_TERMINATE }}}) { + var worker = PThread.pthreads[d.thread]; + if (worker) { + worker.terminate(); + delete PThread.pthreads[d.thread]; + } + } else if (cmd === {{{ CMD_CHECK_MAILBOX }}}) { + if (initializedJS) { + checkMailbox(); + } + } + } +#endif + {{{ asyncIf(ASYNCIFY == 2) }}}function handleMessage(e) { try { var msgData = e['data']; - //dbg('msgData: ' + Object.keys(msgData)); var cmd = msgData.cmd; + if (cmd == {{{ CMD_LOAD }}}) { // Preload command that is called once per worker to parse and load the Emscripten code. #if ASSERTIONS workerID = msgData.workerID; @@ -57,12 +109,21 @@ if (ENVIRONMENT_IS_PTHREAD) { // Until we initialize the runtime, queue up any further incoming messages. let messageQueue = []; - self.onmessage = (e) => messageQueue.push(e); + self.onmessage = (e) => { +#if PTHREADS_DEBUG + dbg('worker: queueing message'); +#endif + messageQueue.push(e); + } // And add a callback for when the runtime is initialized. startWorker = () => { +#if PTHREADS_DEBUG + dbg('worker: startWorker'); +#endif // Notify the main thread that this thread has loaded. postMessage({ cmd: {{{ CMD_LOADED }}} }); + // Process any messages that were queued before the thread was ready. for (let msg of messageQueue) { handleMessage(msg); @@ -178,6 +239,13 @@ if (ENVIRONMENT_IS_PTHREAD) { if (initializedJS) { checkMailbox(); } +#if PTHREAD_MANAGER + } else if (cmd == {{{ CMD_MAKE_MANAGER }}} ) { +#if PTHREADS_DEBUG + dbg('worker: CMD_MAKE_MANAGER'); +#endif + self.onmessage = handleManagerMessage; +#endif } else if (cmd) { // The received message looks like something that should be handled by this message // handler, (since there is a cmd field present), but is not one of the @@ -190,7 +258,7 @@ if (ENVIRONMENT_IS_PTHREAD) { err(`worker: onmessage() captured an uncaught exception: ${ex}`); if (ex?.stack) err(ex.stack); #endif - __emscripten_thread_crashed(); + if (typeof __emscripten_thread_crashed !== 'undefined') __emscripten_thread_crashed(); throw ex; } }; diff --git a/src/settings.js b/src/settings.js index 6ab6bdbbcd5de..5b39f1ca9ea67 100644 --- a/src/settings.js +++ b/src/settings.js @@ -1617,6 +1617,11 @@ var WEBAUDIO_DEBUG = 0; // repeatedly yield back to the JS event loop in order for the thread to // actually start. // If your application needs to be able to synchronously create new threads, +// If true, a dedicated worker is used to manage pthread lifecycles. +// This allows synchronous thread creation even when the main thread is +// blocked. +var PTHREAD_MANAGER = false; + // you can pre-create a pthread pool by specifying -sPTHREAD_POOL_SIZE=x, // in which case the specified number of Workers will be preloaded into a pool // before the application starts, and that many threads can then be available diff --git a/test/pthread/test_pthread_manager.c b/test/pthread/test_pthread_manager.c new file mode 100644 index 0000000000000..bf4e97fe0df0b --- /dev/null +++ b/test/pthread/test_pthread_manager.c @@ -0,0 +1,16 @@ +#include +#include + +void *thread_main(void* arg) { + printf("thread_main\n"); + return NULL; +} + +int main() { + printf("main\n"); + pthread_t t; + pthread_create(&t, NULL, thread_main, NULL); + pthread_join(t, NULL); + printf("done\n"); + return 0; +} diff --git a/test/test_browser.py b/test/test_browser.py index c3fe8de49d17a..99eab63cd9192 100644 --- a/test/test_browser.py +++ b/test/test_browser.py @@ -3672,13 +3672,20 @@ def test_pthread_in_pthread_pool_size_strict(self): # Check that it fails when there's a pthread creating another pthread. self.btest_exit('pthread/test_pthread_create_pthread.c', cflags=['-g2', '-pthread', '-sPTHREAD_POOL_SIZE=1', '-sPTHREAD_POOL_SIZE_STRICT=2', '-DSMALL_POOL']) + def test_pthread_manager(self): + self.btest_exit('pthread/test_pthread_manager.c', cflags=['-pthread', '-sPTHREAD_MANAGER']) + # Test that the emscripten_ atomics api functions work. + @parameterized({ + '': (['-sPTHREAD_POOL_SIZE=8'],), + 'manager': (['-sPTHREAD_MANAGER'],), + }) @parameterized({ '': ([],), 'closure': (['--closure=1'],), }) - def test_pthread_atomics(self, args): - self.btest_exit('pthread/test_pthread_atomics.c', cflags=['-O3', '-pthread', '-sPTHREAD_POOL_SIZE=8', '-g1'] + args) + def test_pthread_atomics(self, args1, args2): + self.btest_exit('pthread/test_pthread_atomics.c', cflags=['-O3', '-pthread', '-g1'] + args1 + args2) # Test 64-bit atomics. def test_pthread_64bit_atomics(self): diff --git a/tools/settings.py b/tools/settings.py index 169d2719638dd..70b3b7d53736b 100644 --- a/tools/settings.py +++ b/tools/settings.py @@ -126,6 +126,7 @@ # List of incompatible settings, of the form (SETTINGS_A, SETTING_B, OPTIONAL_REASON_FOR_INCOMPAT) INCOMPATIBLE_SETTINGS = [ ('MINIMAL_RUNTIME', 'MAIN_MODULE', None), + ('PTHREAD_POOL_SIZE', 'PTHREAD_MANAGER', None), ('WASM_WORKERS', 'MAIN_MODULE', 'dynamic linking is not supported with -sWASM_WORKERS'), ('WASM2JS', 'MAIN_MODULE', 'wasm2js does not support dynamic linking'), ('WASM2JS', 'SIDE_MODULE', 'wasm2js does not support dynamic linking'),