Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/lib/libpthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const CMD_CLEANUP_THREAD = 6;
const CMD_MARK_AS_FINISHED = 7;
const CMD_UNCAUGHT_EXN = 8;
const CMD_CALL_HANDLER = 9;
#if PTHREAD_MANAGER
const CMD_MAKE_MANAGER = 10;
const CMD_TERMINATE = 11;
#endif

#if WASM_ESM_INTEGRATION
const pthreadWorkerScript = TARGET_BASENAME + '.pthread.mjs';
Expand Down Expand Up @@ -124,6 +128,21 @@ var LibraryPThread = {
}
},
initMainThread() {
#if PTHREAD_MANAGER
#if ASSERTIONS
dbg('PThread: initializing manager worker');
assert({{{ ENVIRONMENT_IS_MAIN_THREAD() }}});
#endif
PThread.allocateUnusedWorker();
PThread.managerWorker = PThread.unusedWorkers.pop();
addOnPreRun(async () => {
var managerReady = PThread.loadWasmModuleToWorker(PThread.managerWorker);
addRunDependency('manager-worker');
await managerReady;
PThread.managerWorker.postMessage({ cmd: {{{ CMD_MAKE_MANAGER }}} });
removeRunDependency('manager-worker');
});
#endif // PTHREAD_MANAGER
#if PTHREAD_POOL_SIZE
var pthreadPoolSize = {{{ PTHREAD_POOL_SIZE }}};
// Start loading up the Worker pool, if requested.
Expand Down Expand Up @@ -701,6 +720,27 @@ var LibraryPThread = {
assert(threadParams.pthread_ptr, 'spawnThread called with null pthread ptr');
#endif

#if PTHREAD_MANAGER
if ({{{ ENVIRONMENT_IS_MAIN_THREAD() }}}) {
#if RUNTIME_DEBUG
dbg('creating pthread using managerWorker');
#endif
var workerStub = {
pthread_ptr: threadParams.pthread_ptr,
postMessage: (msg, transfer) => {
msg.targetThread = threadParams.pthread_ptr;
PThread.managerWorker.postMessage(msg, transfer);
},
terminate: () => {
PThread.managerWorker.postMessage({ cmd: {{{ CMD_TERMINATE }}}, thread: threadParams.pthread_ptr });
}
};
PThread.pthreads[threadParams.pthread_ptr] = workerStub;
PThread.managerWorker.postMessage({ cmd: {{{ CMD_SPAWN_THREAD }}}, threadParams }, threadParams.transferList);
return 0;
}
#endif

var worker = PThread.getNewWorker();
if (!worker) {
// No available workers in the PThread pool.
Expand Down
74 changes: 71 additions & 3 deletions src/runtime_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,63 @@ if (ENVIRONMENT_IS_PTHREAD) {
// notified about them.
self.onunhandledrejection = (e) => { throw e.reason || e; };

#if PTHREAD_MANAGER
function handleManagerMessage(e) {
#if PTHREADS_DEBUG
dbg('handleManagerMessage', e);
#endif
var d = e.data;
if (d.targetThread) {
var worker = PThread.pthreads[d.targetThread];
if (worker) {
worker.postMessage(d, d.transferList);
}
return;
}
var cmd = d.cmd;
if (cmd === {{{ CMD_SPAWN_THREAD }}}) {
var threadParams = d.threadParams;
var worker = PThread.getNewWorker();
PThread.runningWorkers.push(worker);
PThread.pthreads[threadParams.pthread_ptr] = worker;
worker.pthread_ptr = threadParams.pthread_ptr;
worker.onmessage = (e) => {
var d = e.data;
if (d.targetThread && d.targetThread != _pthread_self()) {
var targetWorker = PThread.pthreads[d.targetThread];
if (targetWorker) {
targetWorker.postMessage(d, d.transferList);
return;
}
}
postMessage(d, d.transferList);
};
var msg = {
cmd: {{{ CMD_RUN }}},
start_routine: threadParams.startRoutine,
arg: threadParams.arg,
pthread_ptr: threadParams.pthread_ptr,
};
worker.postMessage(msg, threadParams.transferList);
} else if (cmd === {{{ CMD_TERMINATE }}}) {
var worker = PThread.pthreads[d.thread];
if (worker) {
worker.terminate();
delete PThread.pthreads[d.thread];
}
} else if (cmd === {{{ CMD_CHECK_MAILBOX }}}) {
if (initializedJS) {
checkMailbox();
}
}
}
#endif

{{{ asyncIf(ASYNCIFY == 2) }}}function handleMessage(e) {
try {
var msgData = e['data'];
//dbg('msgData: ' + Object.keys(msgData));
var cmd = msgData.cmd;

if (cmd == {{{ CMD_LOAD }}}) { // Preload command that is called once per worker to parse and load the Emscripten code.
#if ASSERTIONS
workerID = msgData.workerID;
Expand All @@ -57,12 +109,21 @@ if (ENVIRONMENT_IS_PTHREAD) {

// Until we initialize the runtime, queue up any further incoming messages.
let messageQueue = [];
self.onmessage = (e) => messageQueue.push(e);
self.onmessage = (e) => {
#if PTHREADS_DEBUG
dbg('worker: queueing message');
#endif
messageQueue.push(e);
}

// And add a callback for when the runtime is initialized.
startWorker = () => {
#if PTHREADS_DEBUG
dbg('worker: startWorker');
#endif
// Notify the main thread that this thread has loaded.
postMessage({ cmd: {{{ CMD_LOADED }}} });

// Process any messages that were queued before the thread was ready.
for (let msg of messageQueue) {
handleMessage(msg);
Expand Down Expand Up @@ -178,6 +239,13 @@ if (ENVIRONMENT_IS_PTHREAD) {
if (initializedJS) {
checkMailbox();
}
#if PTHREAD_MANAGER
} else if (cmd == {{{ CMD_MAKE_MANAGER }}} ) {
#if PTHREADS_DEBUG
dbg('worker: CMD_MAKE_MANAGER');
#endif
self.onmessage = handleManagerMessage;
#endif
} else if (cmd) {
// The received message looks like something that should be handled by this message
// handler, (since there is a cmd field present), but is not one of the
Expand All @@ -190,7 +258,7 @@ if (ENVIRONMENT_IS_PTHREAD) {
err(`worker: onmessage() captured an uncaught exception: ${ex}`);
if (ex?.stack) err(ex.stack);
#endif
__emscripten_thread_crashed();
if (typeof __emscripten_thread_crashed !== 'undefined') __emscripten_thread_crashed();
throw ex;
}
};
Expand Down
5 changes: 5 additions & 0 deletions src/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,11 @@ var WEBAUDIO_DEBUG = 0;
// repeatedly yield back to the JS event loop in order for the thread to
// actually start.
// If your application needs to be able to synchronously create new threads,
// If true, a dedicated worker is used to manage pthread lifecycles.
// This allows synchronous thread creation even when the main thread is
// blocked.
var PTHREAD_MANAGER = false;

// you can pre-create a pthread pool by specifying -sPTHREAD_POOL_SIZE=x,
// in which case the specified number of Workers will be preloaded into a pool
// before the application starts, and that many threads can then be available
Expand Down
16 changes: 16 additions & 0 deletions test/pthread/test_pthread_manager.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include <pthread.h>
#include <stdio.h>

void *thread_main(void* arg) {
printf("thread_main\n");
return NULL;
}

int main() {
printf("main\n");
pthread_t t;
pthread_create(&t, NULL, thread_main, NULL);
pthread_join(t, NULL);
printf("done\n");
return 0;
}
11 changes: 9 additions & 2 deletions test/test_browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3672,13 +3672,20 @@ def test_pthread_in_pthread_pool_size_strict(self):
# Check that it fails when there's a pthread creating another pthread.
self.btest_exit('pthread/test_pthread_create_pthread.c', cflags=['-g2', '-pthread', '-sPTHREAD_POOL_SIZE=1', '-sPTHREAD_POOL_SIZE_STRICT=2', '-DSMALL_POOL'])

def test_pthread_manager(self):
self.btest_exit('pthread/test_pthread_manager.c', cflags=['-pthread', '-sPTHREAD_MANAGER'])

# Test that the emscripten_ atomics api functions work.
@parameterized({
'': (['-sPTHREAD_POOL_SIZE=8'],),
'manager': (['-sPTHREAD_MANAGER'],),
})
@parameterized({
'': ([],),
'closure': (['--closure=1'],),
})
def test_pthread_atomics(self, args):
self.btest_exit('pthread/test_pthread_atomics.c', cflags=['-O3', '-pthread', '-sPTHREAD_POOL_SIZE=8', '-g1'] + args)
def test_pthread_atomics(self, args1, args2):
self.btest_exit('pthread/test_pthread_atomics.c', cflags=['-O3', '-pthread', '-g1'] + args1 + args2)

# Test 64-bit atomics.
def test_pthread_64bit_atomics(self):
Expand Down
1 change: 1 addition & 0 deletions tools/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
# List of incompatible settings, of the form (SETTINGS_A, SETTING_B, OPTIONAL_REASON_FOR_INCOMPAT)
INCOMPATIBLE_SETTINGS = [
('MINIMAL_RUNTIME', 'MAIN_MODULE', None),
('PTHREAD_POOL_SIZE', 'PTHREAD_MANAGER', None),
('WASM_WORKERS', 'MAIN_MODULE', 'dynamic linking is not supported with -sWASM_WORKERS'),
('WASM2JS', 'MAIN_MODULE', 'wasm2js does not support dynamic linking'),
('WASM2JS', 'SIDE_MODULE', 'wasm2js does not support dynamic linking'),
Expand Down
Loading